Bài viết trình bày một số kinh nghiệm scale hệ thống sử dụng
Kafka. Bạn nên có hiểu biết cơ bản về Kafka, nắm được các khái niệm như
topic, partition, consumer, producer…
Bài toán mà tôi gặp phải như sau. Thiết kế một hệ thống xử lý data từ Kafka, bảo đảm các yêu cầu sau:
- Mỗi record phải được xử lý thành công ít nhất một lần, nếu có lỗi thì phải xử lý lại. Cho phép xử lý trùng lặp nhưng nên hạn chế.
- Việc xử lý record do một service khác đảm nhận, quá trình xử lý
này tốn khá nhiều thời gian và có thể xảy ra lỗi. Hệ thống phải có khả
năng chịu lỗi như bỏ qua record lỗi khi cần thiết và cho phép xử lý lại
sau đó.
- Đôi khi cần phải xử lý lại toàn bộ các dữ liệu cũ trong Kafka, hệ
thống cần đáp ứng được khả năng scale, tăng tốc độ xử lý khi cần thiết.
Nhìn thì có vẻ cũng đơn giản phải không? Mấu chốt vấn đề ở đây là
thời gian xử lý của một record thường rất chậm (từ 300-800ms), và hệ
thống cần phải có khả năng để tăng tốc độ xử lý khi cần thiết.
Ở đây tạm coi như ta không có cách nào để khiến thời gian xử lý của
một record giảm đi. Vậy nên để tăng tốc độ xử lý ta chỉ có thể scale
bằng cách tăng số máy xử lý, trên mỗi máy chạy multiple-thread. Lưu ý
rằng việc scale này vẫn phải bảo đảm mỗi record được xử lý thành công ít
nhất một lần.
Có một số cách chúng ta có thể nghĩ đến như sau:
- Mỗi consumer tương ứng với một thread xử lý, khi cần tăng tốc xử lý ta tăng số node và tăng số consumer trên mỗi node.
- Service xử lý sẽ được chạy ở background thông qua một worker pool. Khi cần tăng tốc xử lý ta tăng số worker lên.
Tăng số consumer Kafka
Đây là phương pháp đơn giản và dễ thực hiện nhất. Record sau khi đọc
từ consumer sẽ được xử lý ngay lập tức. Quá trình xử lý trên mỗi record
sẽ được lặp đi lặp lại cho tới khi thành công thì thôi. Chỉ khi việc xử
lý thành công ta mới thực hiện commit vào Kafka. Nếu có
lỗi xảy ra khiến chương trình dừng lại, chỉ cần start lại consumer và
nó sẽ bắt đầu xử lý tiếp từ vị trí cuối cùng đã commit.
Giả sử rằng ta có m nodes, mỗi node ta chạy n consumer (mỗi consumer là một thread) thì tổng số consumer sẽ là mxn.
Bây giờ mỗi khi cần scale ta sẽ tăng số node lên hoặc tăng số
consumer/thread trên mỗi node lên. Lưu ý rằng số consumer/thread tối đa
trên mỗi node chỉ nên bằng số core CPU trên node đó, nếu không
performance sẽ bị ảnh hưởng do context switch.
Code tham khảo trên Java:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props));
consumer.subscribe(topics);
while (!flagStopped) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records) {
while (!flagStopped) {
try {
handleRecord(record);
break;
} catch (Exception e) {
// log error and sleep before retry
}
}
}
//
consumer.commitAsync();
}
Trong đoạn code trên tôi chỉ commit sau khi xử lý xong cả một
batch, đây là cách để giảm số lần gọi tới Kafka giúp cải thiện
performance. Nếu có lỗi xảy ra consumer sẽ đọc và xử lý lại toàn bộ
batch, vì cho phép trùng lặp nên điều này là được phép.
Phương pháp này có một số nhược điểm như:
- Vì số lượng consumer tối đa bằng với số partition trên mỗi topic
nên nếu số consumer đã bằng với số partition tối đa, để tăng số
consumer lên ta bắt buộc phải tăng số partition của topic cần xử lý lên
tương ứng. Kafka cho phép bạn tăng số partition của một topic ở runtime
dễ dàng, tuy nhiên có quá nhiều partition trên một topic không phải là
tốt, lí do bạn có thể tham khảo thêm tại đây.
- Tăng số partition trên một topic không làm cho data từ các
partition đang có được chia đều sang các partition mới. Các partition
mới chỉ bắt đầu có data khi producer push thêm dữ liệu. Nếu như bạn cần
xử lý lại một lượng lớn dữ liệu cũ đã có sẵn trong các partition hiện
tại thì việc tăng số consumer/partition này hoàn toàn không có tác dụng.
- Nếu có bất kì một record nào đó không thể xử lý thành công (do một
exception mà code chưa thể handle) sẽ khiến một consumer không thể xử
lý các record khác. Có thể tránh điều này bằng cách giới hạn số lần
retry và bỏ qua record đó tuy nhiên điều này sẽ vi phạm yêu cầu đầu tiên đã nói ở đầu bài viết.
Sử dụng worker pool ở background
Phương pháp này có vẻ như sẽ giải quyết được vấn đề tăng số partition
trên mỗi topic. Số consumer Kafka không cần thiết phải tăng lên khi
muốn scale vì tốc độ đọc từ Kafka đã quá nhanh rồi. Khi cần tăng tốc xử
lý ta tăng số worker ở background.
Tuy nhiên phương pháp này gặp phải một số vấn đề với việc bảo đảm một
record được xử lý thành công ở background. Ta không thể nào bắt
consumer phải đợi worker xử lý xong một record trước khi đọc tiếp một
record khác, như thế mọi thứ sẽ quay trở lại thành phương pháp đầu tiên.
Giải pháp là consumer sẽ đọc record và gửi tới một worker để xử lý mà
không cần quan tâm tới việc record đã được xử lý xong hay chưa. Khi một
record được xử lý thành công, worker gửi một event báo cho consumer
tương ứng thông tin về record vừa được xử lý.
Một vấn đề khác lại nảy sinh ở đây là khi nào consumer sẽ quyết định
commit vào Kafka? Ở phương pháp thứ nhất ta luôn bảo đảm record trước
chưa xử lý thành công thì record sau chưa được xử lý nên đơn giản chỉ
cần commit tại offset cuối cùng được xử lý thành công. Mọi thứ bây giờ
không còn đơn giản như thế nữa vì mọi record đều được xử lý ở
background, không có gì bảo đảm là một record ở offset 100 được xử lý
thành công thì các offset 97, 98, 99 đã thành công. Nếu bạn thực hiện
commit ở offset 100, Kafka sẽ coi tất cả các offset trước 100 là đã xử
lý thành công, nếu có lỗi xảy ra với các offset nhỏ hơn 100, bạn sẽ
không có cơ hội để xử lý lại.
Manual Offset Control
Một hướng tiếp cận khác là không sử dụng Kafka để lưu offset mà thay
vào đó sẽ tự quản lý offset bằng cách lưu trữ qua file hoặc remote
database. Lưu trữ offset ngoài Kafka đồng nghĩ với việc bạn phải tự seek
về đúng offset khi start consumer. Nếu bạn để Kafka tự động phân bổ
partition dựa trên tổng số consumer, bạn chỉ biết được consumer được gán
cho partition nào sau khi gọi hàm poll()
, khi đó việc seek offset mới có thể thực hiện.
Manual Partition Assignment
Cũng có thể chỉ định một số partition cụ thể cho một consumer, bằng
cách này bạn có thể chủ động seek offset. Tuy nhiên tôi không prefer
cách này cho lắm vì như vậy phía consumer của bạn phải nắm rõ thông tin
về số topic, tổng số partition để phân bổ số partition cho mỗi consumer
hợp lí khiến nó khá cứng và không được linh hoạt khi có sự thay đổi.
Tựu chung lại tôi cảm thấy phương pháp này khá khó để implement, mặc dù không phải không thể làm được. Tuy nhiên cá nhân tôi hay thích những thứ đơn giản hơn mà vẫn giải quyết được vấn đề, simple is the best. Vậy nên ta sẽ đến với phương pháp thứ ba ngay sau đây.
Commit trước khi xử lý
Đây là cách mà tôi đang sử dụng cho dự án thực tế tại công ty. Phương
pháp này kết hợp những ưu điểm từ hai phương pháp trên, lấy ý tưởng một
chút từ file WAL (Write-Ahead-Log) trong HBase.
Mỗi record nhận được từ consumer sẽ được write vào một file log thay
vì xử lý ngay. Record được coi là xử lý thành công nếu được ghi vào file
log thành công. Toàn bộ quá trình write vào file hoàn toàn là append, với mỗi consumer khác nhau sẽ duy trì một file log khác nhau.
Sau khi write data, kiểm tra dung lượng file log, nếu nó vượt qua một
giới hạn quy định trước, gửi file này tới một worker ở background để xử
lý. Đồng thời sẽ tạo ra một file log mới cho các record tiếp theo.
Mỗi worker ở background sẽ nhận input là một list records nằm trong một file. Sau khi toàn bộ records trong file này được xử lý thành công, worker sẽ xóa file này
và sẵn sàng để xử lý file tiếp theo. Nếu có bất kì lỗi nào xảy ra với
một record mà không thể retry được, worker sẽ bỏ qua file đó (không xóa)
và chuyển sang xử lý file tiếp theo. Để xử lý lại một record lỗi, cần
xử lý lại toàn bộ cả file, vì ta chấp nhận xử lý trùng lặp nên đây không
phải vấn đề lớn, chỉ cần chú ý đặt giới hạn file log hợp lí để số
records cần xử lý lại khi có lỗi không quá nhiều.
Phương pháp này đáp ứng được mọi yêu cầu bài toán đặt ra và khắc phục được nhược điểm của các phương pháp trên:
- Record chỉ được commit vào Kafka sau khi write vào file thành
công. Chỉ cần file còn tồn tại thì nó sẽ được xử lý bởi một worker vào
một lúc nào đó.
- Quản lý offset dễ dàng do consumer chỉ cần commit offset cuối cùng
đọc được sau mỗi batch. Việc lưu trữ offset và phân chia partition cho
các consumer hoàn toàn do Kafka đảm nhận nên đơn giản hóa tối đa quá
trình sử dụng.
- Có thể bỏ qua một record lỗi không làm ảnh hưởng tới việc xử lý các record mới. Record lỗi có thể được xử lý lại bất kì lúc nào.
- Dễ dàng scale bằng cách tăng thêm số worker trên mỗi node mà không
cần phải tăng số consumer. Ví dụ khi cần xử lý lại một lượng dữ liệu
cực lớn ta tăng số worker lên để xử lý nhanh hơn. Khi chỉ cần xử lý dữ
liệu realtime ít hơn ta giảm số lượng worker xuống vừa phải để giảm tải
cho hệ thống.
Gần đây tôi có update lại một chút phần consumer cho hiệu quả hơn.
Khi nhận được một record từ consumer, thay vì ghi vào file record sẽ
được lưu vào một buffer. Khi buffer đạt tới một giới hạn, flush toàn bộ
data này vào một file log, file được tạo ra sau đấy không thể sửa đổi
(có thể xóa). File log sau khi được tạo ra sẽ được gửi tới một worker để
xử lý. Phần xử lý của worker ở background trên mỗi file thì vẫn tương
tự như cũ. Buffer sau đó được làm mới và bắt đầu lưu các record tiếp
theo.
Bài viết cũng đá khá dài rồi và phần cài đặt phương pháp này cũng khá
đơn giản nên tôi sẽ không hướng dẫn cụ thể ở đây nữa. Các bạn hãy thử
tự mình cài đặt nhé.
Tham khảo
https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html