CDC: Перенесення змін даних з Postgres в Elasticsearch

Збір змін даних (Change Data Capture, CDC) — це процес захоплення змін даних в одній системі бази даних і їх перенесення в інші типи систем. Наприклад, застосунок зазвичай зберігає дані в реляційних базах даних, таких як Postgres, а CDC допомагає захоплювати зміни даних і переміщувати їх до інших систем, таких як Elasticsearch, кеш або інструмент для роботи з хмарними сховищами даних.

pic

Джерело: пост Confluent

Чому ж це потрібно? Хоча бази даних вже підтримують реплікацію, PostgreSQL та MySQL дозволяють використовувати реплікацію, що дає змогу застосовувати архітектуру майстер/слейв. Майстер база даних працює як лідер і використовується лише для запису, а слейви — тільки для читання даних. Це дійсно покращує продуктивність системи. Однак цей процес не можна застосувати до систем, які зберігають дані по-іншому. Тут CDC допомагає копіювати зміни даних у Postgres і передавати ці зміни в Elasticsearch для використання при швидких текстових запитах. Саме це я і намагатимусь реалізувати в цьому пості.

Це можливо лише за допомогою спеціального інструменту CDC, такого як Debezium Connector. Є ще один інструмент Bottled Water, але він більше не підтримується. Debezium Connector можна використовувати як з Postgres, так і з MySQL. Debezium захоплює зміни даних і передає їх у тему Kafka. Потім наш застосунок може отримувати зміни з тієї ж теми і зберігати їх у Elasticsearch для індексації.

pic

Джерело: пост Confluent

Використання CDC у Spring мікросервісах за допомогою Kafka

У цьому випадку в мене є два мікросервіси: один — це сервіс управління користувачами, який має API для додавання/оновлення користувача в базу даних Postgres. Як тільки користувача додають, Debezium передає подію до Kafka. Цю подію споживає інший мікросервіс, а саме сервіс індексації користувачів, який зберігає того ж користувача в Elasticsearch.
Посилання на повний робочий приклад з використанням Java та Spring наведено в кінці цього посту.

Postgres, Debezium, Kafka та Elasticsearch можна налаштувати та запустити локально за допомогою файлу docker-compose;

version: '3.7'  

networks:  
 cdc-network:  
 name: cdc-network  
 driver: bridge  
 external: false  


services:  
 cdc-postgres:  
 image: debezium/postgres:13-alpine  
 container_name: cdc-postgres  
 hostname: cdc-postgres  
 restart: always  
 ports:  
 - 5443:5432  
 environment:  
 POSTGRES_PASSWORD: test  
 POSTGRES_USER: test  
 POSTGRES_DB: test  
 volumes:  
 - ./cdc-postgres-data:/var/lib/postgresql/data  
 networks:  
 - cdc-network  


 cdc-kafka:  
 image: bitnami/kafka:3.4  
 container_name: cdc-kafka  
 hostname: cdc-kafka  
 restart: always  
 ports:  
 - 9092:9092  
 environment:  
 KAFKA_CFG_NODE_ID: 1  
 KAFKA_KRAFT_CLUSTER_ID: q0k00yjQRaqWmAAAZv955w  
 KAFKA_CFG_PROCESS_ROLES: controller,broker  
 KAFKA_CFG_LISTENERS: INTERNAL://cdc-kafka:29092,CONTROLLER://cdc-kafka:29093,EXTERNAL://0.0.0.0:9092  
 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://cdc-kafka:29092,EXTERNAL://localhost:9092  
 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT  
 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:29093  
 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL  
 KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER  
 networks:  
 - cdc-network  


 cdc-debezium-connect:  
 image: debezium/connect:2.3  
 container_name: cdc-debezium-connect  
 hostname: cdc-debezium-connect  
 restart: always  
 ports:  
 - 8083:8083  
 environment:  
 BOOTSTRAP_SERVERS: cdc-kafka:29092  
 GROUP_ID: 1  
 CONFIG_STORAGE_TOPIC: my_connect_configs  
 OFFSET_STORAGE_TOPIC: my_connect_offsets  
 STATUS_STORAGE_TOPIC: my_connect_statuses  
 ENABLE_DEBEZIUM_SCRIPTING: 'true'  
 links:  
 - cdc-kafka  
 - cdc-postgres  
 networks:  
 - cdc-network  

 cdc-elasticsearch:  
 image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1  
 container_name: elasticsearch  
 environment:  
 cluster.name: docker-cluster  
 bootstrap.memory_lock: true  
 ES_JAVA_OPTS: -Xms512m -Xmx512m  
 ELASTIC_PASSWORD: elastic  
 discovery.type: single-node  
 ulimits:  
 memlock:  
 soft: -1  
 hard: -1  
 volumes:  
 - ./es-data:/usr/share/elasticsearch/data  
 ports:  
 - 9200:9200  
 networks:  
 - cdc-network  


volumes:  
 cdc-postgres-data:  
 name: cdc-postgres-data  
 driver: local

Запустіть контейнери за допомогою команди docker compose;

docker-compose up -d

Також потрібно налаштувати з’єднання Debezium з базою даних Postgres, щоб вона могла отримувати оновлення з Postgres і публікувати їх як події до теми Kafka. Це можна зробити за допомогою наступної команди;

curl --location 'http://localhost:8083/connectors' \  
 --header 'Accept: application/json' \  
 --header 'Content-Type: application/json' \  
 --data '{  
 "name": "cdc-debezium-connector",  
 "config": {  
 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",  
 "database.hostname": "cdc-postgres.cdc-index-processor.orb.local",  
 "database.port": "5432",  
 "database.user": "test",  
 "database.password": "test",  
 "database.dbname": "test",  
 "database.server.id": "122054",  
 "table.include.list": "public.users",  
 "topic.prefix": "users-table-topic"  
 }  
}'

Тепер подивимося на API для додавання користувача в Springboot;

@PostMapping  
 public void add(@RequestBody UserRequest userRequest) {  
 var user = userRequest.toUser();  
 userRepository.save(user);  
 }

Ось такий POST запит можна використати для виклику цього API;

{  
 "name": "Kane Williamson",  
 "email": "[email protected]",  
 "phone":"+322222233444"  
}

Інший мікросервіс — users-index, який може споживати події з Kafka і передавати їх до Elasticsearch.
UserConsumer може споживати події Kafka, і виглядає це ось так;

@KafkaHandler(isDefault = true)  
 public void consume(LinkedHashMap userPayload) throws JsonProcessingException {  
 logger.info("User Message: {}", objectMapper.writeValueAsString(userPayload));  
 var payLoad = objectMapper  
 .readValue(objectMapper.writeValueAsString(userPayload.get("payload")),   
 PayLoad.class  
 );  
 var userData = payLoad.after();  

 elasticsearchOperations.indexOps(UserDocument.class).refresh();  

 switch (payLoad.op()) {  
 case c -> userService.save(userData);  
 case u -> userService.update(userData);  
 case d -> {  
 userData = payLoad.before();  
 userService.delete(userData);  
 }  
 default -> logger.info("Operation not supported");  
 }  
 }

Цей споживач Kafka може обробляти всі типи подій даних, які призначені для додавання, оновлення або видалення даних. Кожного разу, коли відбувається зміна в таблиці користувачів у Postgres, Debezium захоплює цю зміну і відправляє її до Kafka. Наш додаток може споживати події Kafka для цієї теми і зберігати дані в Elasticsearch.

Використання Elasticsearch для текстових запитів

Пам'ятайте, ми завжди хотіли використовувати Elasticsearch. Ви можете використати наступний запит для пошуку даних користувача за ім'ям;

curl localhost:9200/_search -H "Content-Type: application/json"   
-d '{"query":{"match": {"name":"Kane"}}}'

CDC дає нам контроль над тим, як по-різному доступати та використовувати дані

Існують випадки, коли типові бази даних, такі як MySQL або Postgres, не є достатніми для деяких операцій, наприклад, як кеш. Тоді необхідно визначити спосіб копіювання даних в кеш, наприклад, Redis, коли відбувається оновлення в базі даних для запису.

Мартін Клеппман у своїй популярній книзі Designing data-intensive applications описує це так;

“Зберігання даних зазвичай досить просте, якщо вам не потрібно турбуватися про те, як їх будуть запитувати та використовувати; багато складнощів проектування схем, індексації та механізмів зберігання виникають через бажання підтримувати певні шаблони запитів та доступу. З цієї причини ви отримуєте велику гнучкість, розділяючи форму, в якій записуються дані, від форми, в якій вони читаються, і дозволяючи кілька різних варіантів для читання. Ця ідея іноді називається сегрегацією відповідальності запиту та команд.”

CDC дає нам надійний підхід для покращення продуктивності системи та також дозволяє використовувати дані на інших рівнях. Дані можна легко переміщати в інші системи для бізнес-аналізу та аналітики.

Це все, що я хотів сказати з цієї теми, і сподіваюся, що це зрозуміло і лаконічно. Я створив цей github репозиторій, ви можете подивитися, як усе це працює разом.

Перекладено з: CDC: Pushing data changes to Elasticsearch from Postgres

Leave a Reply

Your email address will not be published. Required fields are marked *