Cách Mình Xây Dựng Pipelines
Nguyên tắc điều phối workflows machine learning nhiều giai đoạn
ML pipeline là một chuỗi bước nối nhau: preprocessing → embeddings → clustering → labeling. Mỗi bước lấy output của bước trước làm input. Khi bước nào đó sập thì phải biết ngay nó sập ở đâu, vì sao, và có cần chạy lại từ đầu không.
Câu hỏi cốt lõi: "Nếu pipeline fail ở bước 3 trong 5 bước, mình có thể chẩn đoán vấn đề và tiếp tục mà không cần chạy lại từ đầu không?"
Phải thấy được pipeline đang chạy tới đâu
Chỉ hiện "Processing..." rồi im re mấy phút thì user tưởng nó treo rồi. Phải cho thấy đang chạy bước gì, được bao nhiêu phần trăm.
Mỗi giai đoạn nên phát ra structured events (không chỉ log lines) mà UI có thể consume -- gồm tên giai đoạn, phần trăm hoàn thành, và thời gian đã chạy. Có progress bar, có tên giai đoạn thì user yên tâm hơn nhiều, biết còn phải chờ bao lâu, và khi có sự cố thì debug cũng dễ.
Tách rõ ranh giới giữa các bước
Viết một function 500 dòng làm từ preprocessing cho đến labeling thì ban đầu chạy được, nhưng đụng vào giai đoạn nào cũng sợ gãy giai đoạn khác. Kiểu code đó maintain rất khổ.
Mỗi giai đoạn nên là một function riêng với inputs/outputs rõ ràng. Các giai đoạn giao tiếp qua data structures, không phải shared mutable state. Bước này fail thì data của bước khác vẫn nguyên. Làm vậy thì test được từng bước riêng, debug dễ, và khi đổi embedding model chẳng hạn thì không cần viết lại cả pipeline.
Fail nhanh, fail rõ ràng
"Embedding failed: API returned 401 Unauthorized" -- đọc xong biết sửa gì. "Processing failed" -- chịu, không biết bắt đầu từ đâu =))
Validate inputs ở đầu mỗi giai đoạn. Nếu preconditions không đáp ứng thì fail ngay, đừng để lỗi ở bước 2 âm thầm chạy tiếp rồi bung ở bước 4 với message khó hiểu. Error message phải có tên giai đoạn, expected vs. actual, và gợi ý cách sửa.
Thiết kế cho partial recovery
Labeling API timeout mà phải chạy lại từ đầu, mất 5 phút embedding và clustering trước đó -- ai cũng tức chứ. Các thao tác ML tốn thời gian, compute, và tiền API. Không nên để một transient failure xóa sạch mọi thứ.
Cache hoặc checkpoint kết quả trung gian khi thực tế cho phép. Thiết kế các giai đoạn có thể resume. Khi có thể, cho phép restart từ giai đoạn thành công cuối cùng thay vì chạy lại toàn bộ.
Giữ nguyên data gốc bên cạnh data đã xử lý
Sau khi preprocessing + clustering xong, muốn thử lại với parameters khác mà data gốc bị xóa rồi thì user phải re-upload. Phiền lắm.
Giữ nguyên inputs gốc bên cạnh outputs đã biến đổi -- lưu cả preprocessed text lẫn original text, giữ high-dimensional embeddings kể cả sau dimensionality reduction. Nhờ vậy user có thể re-cluster với settings khác, debug kết quả lạ, mà không phải bắt đầu lại từ con số không.
Khung quyết định
Streaming hay batch?
Dùng generators/streaming khi:
- Pipeline mất hơn 5 giây
- Muốn hiển thị tiến độ trong khi thực thi
- Giới hạn memory không cho phép giữ tất cả kết quả cùng lúc
- Client cần incremental updates
Dùng batch returns khi:
- Pipeline hoàn thành trong dưới 2 giây
- Tất cả kết quả cần có trước khi bất kỳ kết quả nào dùng được
- Sự đơn giản quan trọng hơn progress feedback
- Kết quả đủ nhỏ để fit trong memory
Có nên checkpoint kết quả trung gian không?
Checkpoint khi:
- Giai đoạn mất hơn 30 giây
- Giai đoạn gọi external API (embedding, labeling)
- Kết quả giai đoạn tốn kém để tính lại
- User có thể muốn chạy lại các giai đoạn sau với parameters khác
Bỏ qua khi:
- Giai đoạn nhanh (< 5 giây)
- Chi phí storage vượt quá chi phí tính lại
- Data nhạy cảm, không nên lưu intermediates
- Pipeline luôn chạy end-to-end
Khi nào nên parallelize?
Parallelize khi:
- Giai đoạn xử lý các items độc lập (embeddings, labels)
- Công việc là I/O-bound (API calls, đọc file)
- Speedup đáng kể (> 3x)
- Error handling cho từng item quản lý được
Giữ sequential khi:
- Các items phụ thuộc vào kết quả trước đó
- Công việc là CPU-bound trên single-threaded libraries
- Debugging quan trọng hơn tốc độ
- Rate limits làm parallelization phản tác dụng
Mấy lỗi hay gặp
Ghép visualization với computation. Không thể lấy kết quả clustering mà không generate plots, lỗi visualization crash cả pipeline. Tách computation ra khỏi presentation -- return data structures, generate visualizations như bước cuối tùy chọn thôi.
Hardcode thứ tự giai đoạn. Thêm một bước preprocessing mới mà phải sửa năm files thì biết là thiết kế có vấn đề. Định nghĩa pipeline là một chuỗi giai đoạn configurable, cho phép enable/disable hoặc sắp xếp lại qua configuration.
Nuốt errors khi chạy parallel. Một số items âm thầm fail, kết quả ra ít hơn inputs mà không ai hay. Thu thập tất cả errors trong quá trình parallel execution, báo rõ items nào fail và tại sao, rồi quyết định có chấp nhận partial results không.
Không lo memory cho datasets lớn. 100 items chạy ngon, 10,000 items OOM -- chuyện thường. Xử lý theo chunks, dùng generators thay vì lists, giải phóng references tới intermediate data sau mỗi giai đoạn. Nên profile memory usage cho datasets representative.
Chỉ test happy path. Sample data chạy hoàn hảo, nhưng empty input, single item, malformed data thì crash. Test edge cases rõ ràng: empty, single, duplicate, maximum size, API failures.
Checklist đánh giá
Pipeline đang ổn nếu:
- User thấy giai đoạn nào đang chạy và tiến độ trong giai đoạn đó
- Errors nói rõ giai đoạn nào fail và cung cấp messages có thể hành động
- Mỗi giai đoạn test độc lập được với mock inputs
- Chạy lại với parameters khác không cần re-upload data
- Pipeline xử lý 1 item và 10,000 items mà không cần thay đổi code
Cần xem lại nếu:
- "Processing..." là feedback duy nhất trong nhiều phút
- Debug bằng cách rải print statements khắp code
- Thay đổi embedding model phải sửa clustering code
- Partial failures mất toàn bộ tiến độ
- Memory usage tăng không giới hạn theo input size
Tham khảo nhanh
+-------------------+ +-------------------+ +-------------------+
| Preprocessing | --> | Embeddings | --> | Clustering |
| - Validate input | | - Batch requests | | - Reduce dims |
| - Normalize text | | - Parallel fetch | | - Find clusters |
| - Emit progress | | - Emit progress | | - Handle outliers |
+-------------------+ +-------------------+ +-------------------+
| | |
v v v
Original data High-dim vectors Cluster assignments
preserved preserved + original data
| Vấn đề giai đoạn | Cách tiếp cận |
|---|---|
| Progress | Phát structured events với tên giai đoạn, phần trăm, thời gian đã trôi qua |
| Errors | Fail nhanh với tên giai đoạn, message cụ thể, gợi ý recovery |
| Ranh giới | Function cho mỗi giai đoạn, explicit input/output types |
| Recovery | Cache các giai đoạn tốn kém, cho phép resume từ checkpoint |
| Memory | Xử lý theo chunks, giải phóng intermediate data, dùng generators |