Kafka: повторна обробка повідомлень у Spring Boot
Перш ніж зануритися в деталі, давайте на хвилину освіжимо в пам'яті архітектуру Kafka та ключові терміни.
Архітектура Kafka
Broker (Брокер): Сервери, що використовуються для створення кластера Kafka. Кількість брокерів можна налаштувати.
Zookeeper (Зукіпер): Це окремий кластер, відповідальний за зберігання конфігурацій майстер-ноди та слейв-ноди Kafka, а також за керування кластером Kafka.
Producer (Продуцент): Додаток, що генерує повідомлення в тему.
Consumer (Консумер): Додаток, що споживає повідомлення з теми.
Topic (Тема): Контейнер для певного типу повідомлень.
Partition (Розподіл): Кожна тема ділиться на кілька розподілів, оскільки один брокер не може обробляти навантаження з мільйонів повідомлень. Кількість розподілів налаштовується. Ми також можемо налаштувати кількість реплік для розподілів теми.
Consumer-Group (Група споживачів): Відповідає за стратегію розподілу повідомлень. Це також налаштовується.
Offset (Зсув): Кожен розподіл управляє порядком повідомлень за допомогою зсуву, причому зсув починається з 0.
Для детальнішої інформації ви можете ознайомитись з будь-яким блогом на Medium, щоб краще зрозуміти архітектуру Kafka та її ключові терміни.
Давайте перейдемо до основної теми: як ми можемо повторно обробляти повідомлення Kafka без їх повторного відтворення в темі?
У моїх професійних проектах я зіткнувся з проблемою, коли деякі повідомлення не оброблялись через логічні помилки в коді, і на той момент ми вже повідомили кластер Kafka, що ці повідомлення спожиті. Але що, якщо я хочу повторно обробити їх після виправлення логіки?
Я зробив POC (Proof of Concept) і з’ясував, що це можливо. У залежності від Spring Kafka існує інтерфейс ConsumerSeekCallback, який надає кілька методів, які ми можемо використовувати для повторної обробки повідомлень.
Дозвольте описати кілька найбільш корисних стандартних методів:
- void seek(String topic, int partition, long offset): Якщо ви хочете повторно обробити конкретний зсув (offset) в розподілі теми.
- void seek(String topic, int partition, Function offsetComputeFunction): Якщо ви хочете повторно обробити кілька конкретних зсувів у розподілі теми.
Тут обчислення зсуву залежить від третього параметра. - void seekToBeginning(String topic, int partition): Якщо ви хочете споживати повідомлення з розподілу теми з самого початку.
- void seekToTimestamp(String topic, int partition, long timestamp): Якщо ви хочете споживати повідомлення з розподілу теми, спираючись на часову мітку.
У цьому проекті я створив кілька кінцевих точок (endpoints), використовуючи ці методи для повторної обробки повідомлень.
Swagger UI
Термінал (створена тема та продуцент повідомлень)
Консумер (спожиті повідомлення)
Кінцева точка _seek-from-begin
Консумер (повторно оброблені повідомлення з самого початку)
Термінал (продукція ще кількох повідомлень)
Консумер (спожиті нові повідомлення)
Кінцева точка _seek-by-timestamp
Консумер (повторно оброблені повідомлення з точною часовою міткою)
Термінал (повторно оброблені повідомлення, але не відтворені)
Github Link: https://github.com/gourav-bhardwaj/kafka-message-reprocess-api
Висновок: Сподіваюся, це було корисно! Подивимося, як ви можете застосувати це у своїх особистих чи професійних проектах.
Я хочу підкреслити кілька проблем: якщо ваш додаток-споживач працює на кількох екземплярах, розподіли теми розподіляються випадковим чином. Це ускладнює повторну обробку, оскільки неясно, який екземпляр обробляє конкретний розподіл. У таких випадках можливо вам доведеться тимчасово запустити додаток-споживач на одному екземплярі для повторної обробки, а після завершення процесу можна повернутися до бажаної кількості реплік.
Я все ще досліджую, як зробити цей підхід можливим для сценаріїв, де додаток-споживач працює на кількох екземплярах.
Якщо вам сподобалося, поставте лайк!
Щасливого кодування!
Перекладено з: How to reprocessing kafka message without reproducing (spring boot)