Avatar
0
Nguyen Thai Son Beginner
Cách xử lý bất đồng bộ các message trong message queue
Hiện tại thì e đang design 1 cái architecture overview nhưng đang khá khoai ở chỗ việc xử lý bất đồng bộ đống message.

Bài toán đưa ra là:

user upload file csv lên s3

s3 trigger lambda(producer)

lambda (producer) gửi 1 cái message dạng như "starting: job_id" vào "Upsert" queue

lambda (producer) lấy file csv và xử lý từng row (loop theo row)

từng row khi map thành object sẽ được push lên 1 con queue "Upsert", hoặc nếu loop xong sẽ gửi 1 message dạng "ending: job_id" vào "Upsert"

lambda khác sẽ lắng nghe cái queue "Upsert" và lấy từng object về để tiến hành update/insert vào db, nếu là các message dạng "starting", "ending" thì gửi thẳng qua queue "Success"

Nếu object nào được update/insert thành công thì vào queue "Success" còn ngược lại thì vào "Failed"

Con lambda (producer) sẽ lắng nghe 2 queue "Success" và "Failed" để xử lý từng message và tạo thành 1 file result_.json chứa (failed_products: [] và success_products: []).

Lúc đầu e tính là để con lambda consumer là single instance thì khi đó mọi thứ sẽ được đồng bộ với nhau, chắc chắn con lambda producer sẽ nhận được các message: [starting_job_id, ..., product, product, ..., ending_job_id] theo thứ tự. Nhưng mà manager bảo e là có thể spin up nhiều instance của con consumer để scale + xử lý nhanh hơn.

Nếu lúc này có nhiều instance của lambda consumer thì việc con lambda producer nhận message theo kiểu: starting_job_id, product, ending_job_id, product, ... là hoàn toàn có thể xảy ra.

Hiện tại có cách nào để có thể enforce được việc message: starting_job_id luôn ở đầu và message: ending_job_id luôn ở cuối vậy mấy thím. Kiến thức về cloud + architecture của e vẫn chưa được nhiều lắm :(:(

Update cái ảnh diagram hiện tại (stackask k add media được nên em add tạm link này)

https://ibb.co/0GYvXKc
  • Answer
Remain: 5
1 Answer
Avatar
tvd12 Beginner
tvd12 Beginner
Nếu anh không nhầm thì em đan có con producer hoạt động thế này:
rows = readCsvFile():
producer.pushToQueue(starting: job_id)
for (row : rows) {
    producer.pushToQueue(rowToProduct(row));
}

Anh nghĩ em có thể bổ sung thêm đoạn code lắng nghe kết quả ở producer thê này là ok:

AtomicInt resultCount = new AtomicInt();
producer.onResult(result => {
    int count = resultCount.incrementAndGet();
    if (count >= rows.length) {
        producer.pushToQueue(end: job_id)
    }
});
  • 1
  • Reply