Bài viết trình bày một số kinh nghiệm scale hệ thống sử dụng Kafka. Bạn nên có hiểu biết cơ bản về Kafka, nắm được các khái niệm như topic, partition, consumer, producer…

Bài toán mà tôi gặp phải như sau. Thiết kế một hệ thống xử lý data từ Kafka, bảo đảm các yêu cầu sau:

  • Mỗi record phải được xử lý thành công ít nhất một lần, nếu có lỗi thì phải xử lý lại. Cho phép xử lý trùng lặp nhưng nên hạn chế.
  • Việc xử lý record do một service khác đảm nhận, quá trình xử lý này tốn khá nhiều thời gian và có thể xảy ra lỗi. Hệ thống phải có khả năng chịu lỗi như bỏ qua record lỗi khi cần thiết và cho phép xử lý lại sau đó.
  • Đôi khi cần phải xử lý lại toàn bộ các dữ liệu cũ trong Kafka, hệ thống cần đáp ứng được khả năng scale, tăng tốc độ xử lý khi cần thiết.

Nhìn thì có vẻ cũng đơn giản phải không? Mấu chốt vấn đề ở đây là thời gian xử lý của một record thường rất chậm (từ 300-800ms), và hệ thống cần phải có khả năng để tăng tốc độ xử lý khi cần thiết.

Ở đây tạm coi như ta không có cách nào để khiến thời gian xử lý của một record giảm đi. Vậy nên để tăng tốc độ xử lý ta chỉ có thể scale bằng cách tăng số máy xử lý, trên mỗi máy chạy multiple-thread. Lưu ý rằng việc scale này vẫn phải bảo đảm mỗi record được xử lý thành công ít nhất một lần.

Có một số cách chúng ta có thể nghĩ đến như sau:

  • Mỗi consumer tương ứng với một thread xử lý, khi cần tăng tốc xử lý ta tăng số node và tăng số consumer trên mỗi node.
  • Service xử lý sẽ được chạy ở background thông qua một worker pool. Khi cần tăng tốc xử lý ta tăng số worker lên.

Tăng số consumer Kafka

Đây là phương pháp đơn giản và dễ thực hiện nhất. Record sau khi đọc từ consumer sẽ được xử lý ngay lập tức. Quá trình xử lý trên mỗi record sẽ được lặp đi lặp lại cho tới khi thành công thì thôi. Chỉ khi việc xử lý thành công ta mới thực hiện commit vào Kafka. Nếu có lỗi xảy ra khiến chương trình dừng lại, chỉ cần start lại consumer và nó sẽ bắt đầu xử lý tiếp từ vị trí cuối cùng đã commit.

Giả sử rằng ta có m nodes, mỗi node ta chạy n consumer (mỗi consumer là một thread) thì tổng số consumer sẽ là mxn. Bây giờ mỗi khi cần scale ta sẽ tăng số node lên hoặc tăng số consumer/thread trên mỗi node lên. Lưu ý rằng số consumer/thread tối đa trên mỗi node chỉ nên bằng số core CPU trên node đó, nếu không performance sẽ bị ảnh hưởng do context switch.

Code tham khảo trên Java:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props));
consumer.subscribe(topics);

while (!flagStopped) {
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    for (ConsumerRecord<String, byte[]> record : records) {
        while (!flagStopped) {
            try {
                handleRecord(record);
                break;
            } catch (Exception e) {
                // log error and sleep before retry
            }
        }
    }
    // 
    consumer.commitAsync();
}

Trong đoạn code trên tôi chỉ commit sau khi xử lý xong cả một batch, đây là cách để giảm số lần gọi tới Kafka giúp cải thiện performance. Nếu có lỗi xảy ra consumer sẽ đọc và xử lý lại toàn bộ batch, vì cho phép trùng lặp nên điều này là được phép.

Phương pháp này có một số nhược điểm như:

  • số lượng consumer tối đa bằng với số partition trên mỗi topic nên nếu số consumer đã bằng với số partition tối đa, để tăng số consumer lên ta bắt buộc phải tăng số partition của topic cần xử lý lên tương ứng. Kafka cho phép bạn tăng số partition của một topic ở runtime dễ dàng, tuy nhiên có quá nhiều partition trên một topic không phải là tốt, lí do bạn có thể tham khảo thêm tại đây.
  • Tăng số partition trên một topic không làm cho data từ các partition đang có được chia đều sang các partition mới. Các partition mới chỉ bắt đầu có data khi producer push thêm dữ liệu. Nếu như bạn cần xử lý lại một lượng lớn dữ liệu cũ đã có sẵn trong các partition hiện tại thì việc tăng số consumer/partition này hoàn toàn không có tác dụng.
  • Nếu có bất kì một record nào đó không thể xử lý thành công (do một exception mà code chưa thể handle) sẽ khiến một consumer không thể xử lý các record khác. Có thể tránh điều này bằng cách giới hạn số lần retry và bỏ qua record đó tuy nhiên điều này sẽ vi phạm yêu cầu đầu tiên đã nói ở đầu bài viết.

Sử dụng worker pool ở background

Phương pháp này có vẻ như sẽ giải quyết được vấn đề tăng số partition trên mỗi topic. Số consumer Kafka không cần thiết phải tăng lên khi muốn scale vì tốc độ đọc từ Kafka đã quá nhanh rồi. Khi cần tăng tốc xử lý ta tăng số worker ở background.

Tuy nhiên phương pháp này gặp phải một số vấn đề với việc bảo đảm một record được xử lý thành công ở background. Ta không thể nào bắt consumer phải đợi worker xử lý xong một record trước khi đọc tiếp một record khác, như thế mọi thứ sẽ quay trở lại thành phương pháp đầu tiên.

Giải pháp là consumer sẽ đọc record và gửi tới một worker để xử lý mà không cần quan tâm tới việc record đã được xử lý xong hay chưa. Khi một record được xử lý thành công, worker gửi một event báo cho consumer tương ứng thông tin về record vừa được xử lý.

Một vấn đề khác lại nảy sinh ở đây là khi nào consumer sẽ quyết định commit vào Kafka? Ở phương pháp thứ nhất ta luôn bảo đảm record trước chưa xử lý thành công thì record sau chưa được xử lý nên đơn giản chỉ cần commit tại offset cuối cùng được xử lý thành công. Mọi thứ bây giờ không còn đơn giản như thế nữa vì mọi record đều được xử lý ở background, không có gì bảo đảm là một record ở offset 100 được xử lý thành công thì các offset 97, 98, 99 đã thành công. Nếu bạn thực hiện commit ở offset 100, Kafka sẽ coi tất cả các offset trước 100 là đã xử lý thành công, nếu có lỗi xảy ra với các offset nhỏ hơn 100, bạn sẽ không có cơ hội để xử lý lại.

Manual Offset Control

Một hướng tiếp cận khác là không sử dụng Kafka để lưu offset mà thay vào đó sẽ tự quản lý offset bằng cách lưu trữ qua file hoặc remote database. Lưu trữ offset ngoài Kafka đồng nghĩ với việc bạn phải tự seek về đúng offset khi start consumer. Nếu bạn để Kafka tự động phân bổ partition dựa trên tổng số consumer, bạn chỉ biết được consumer được gán cho partition nào sau khi gọi hàm poll(), khi đó việc seek offset mới có thể thực hiện.

Manual Partition Assignment

Cũng có thể chỉ định một số partition cụ thể cho một consumer, bằng cách này bạn có thể chủ động seek offset. Tuy nhiên tôi không prefer cách này cho lắm vì như vậy phía consumer của bạn phải nắm rõ thông tin về số topic, tổng số partition để phân bổ số partition cho mỗi consumer hợp lí khiến nó khá cứng và không được linh hoạt khi có sự thay đổi.

Tựu chung lại tôi cảm thấy phương pháp này khá khó để implement, mặc dù không phải không thể làm được. Tuy nhiên cá nhân tôi hay thích những thứ đơn giản hơn mà vẫn giải quyết được vấn đề, simple is the best. Vậy nên ta sẽ đến với phương pháp thứ ba ngay sau đây.

Commit trước khi xử lý

Đây là cách mà tôi đang sử dụng cho dự án thực tế tại công ty. Phương pháp này kết hợp những ưu điểm từ hai phương pháp trên, lấy ý tưởng một chút từ file WAL (Write-Ahead-Log) trong HBase.

Mỗi record nhận được từ consumer sẽ được write vào một file log thay vì xử lý ngay. Record được coi là xử lý thành công nếu được ghi vào file log thành công. Toàn bộ quá trình write vào file hoàn toàn là append, với mỗi consumer khác nhau sẽ duy trì một file log khác nhau. Sau khi write data, kiểm tra dung lượng file log, nếu nó vượt qua một giới hạn quy định trước, gửi file này tới một worker ở background để xử lý. Đồng thời sẽ tạo ra một file log mới cho các record tiếp theo.

Mỗi worker ở background sẽ nhận input là một list records nằm trong một file. Sau khi toàn bộ records trong file này được xử lý thành công, worker sẽ xóa file này và sẵn sàng để xử lý file tiếp theo. Nếu có bất kì lỗi nào xảy ra với một record mà không thể retry được, worker sẽ bỏ qua file đó (không xóa) và chuyển sang xử lý file tiếp theo. Để xử lý lại một record lỗi, cần xử lý lại toàn bộ cả file, vì ta chấp nhận xử lý trùng lặp nên đây không phải vấn đề lớn, chỉ cần chú ý đặt giới hạn file log hợp lí để số records cần xử lý lại khi có lỗi không quá nhiều.

Phương pháp này đáp ứng được mọi yêu cầu bài toán đặt ra và khắc phục được nhược điểm của các phương pháp trên:

  • Record chỉ được commit vào Kafka sau khi write vào file thành công. Chỉ cần file còn tồn tại thì nó sẽ được xử lý bởi một worker vào một lúc nào đó.
  • Quản lý offset dễ dàng do consumer chỉ cần commit offset cuối cùng đọc được sau mỗi batch. Việc lưu trữ offset và phân chia partition cho các consumer hoàn toàn do Kafka đảm nhận nên đơn giản hóa tối đa quá trình sử dụng.
  • Có thể bỏ qua một record lỗi không làm ảnh hưởng tới việc xử lý các record mới. Record lỗi có thể được xử lý lại bất kì lúc nào.
  • Dễ dàng scale bằng cách tăng thêm số worker trên mỗi node mà không cần phải tăng số consumer. Ví dụ khi cần xử lý lại một lượng dữ liệu cực lớn ta tăng số worker lên để xử lý nhanh hơn. Khi chỉ cần xử lý dữ liệu realtime ít hơn ta giảm số lượng worker xuống vừa phải để giảm tải cho hệ thống.

Gần đây tôi có update lại một chút phần consumer cho hiệu quả hơn. Khi nhận được một record từ consumer, thay vì ghi vào file record sẽ được lưu vào một buffer. Khi buffer đạt tới một giới hạn, flush toàn bộ data này vào một file log, file được tạo ra sau đấy không thể sửa đổi (có thể xóa). File log sau khi được tạo ra sẽ được gửi tới một worker để xử lý. Phần xử lý của worker ở background trên mỗi file thì vẫn tương tự như cũ. Buffer sau đó được làm mới và bắt đầu lưu các record tiếp theo.

Bài viết cũng đá khá dài rồi và phần cài đặt phương pháp này cũng khá đơn giản nên tôi sẽ không hướng dẫn cụ thể ở đây nữa. Các bạn hãy thử tự mình cài đặt nhé.

Tham khảo

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html