Збір змін даних (Change Data Capture, CDC) — це процес захоплення змін даних в одній системі бази даних і їх перенесення в інші типи систем. Наприклад, застосунок зазвичай зберігає дані в реляційних базах даних, таких як Postgres, а CDC допомагає захоплювати зміни даних і переміщувати їх до інших систем, таких як Elasticsearch, кеш або інструмент для роботи з хмарними сховищами даних.
Джерело: пост Confluent
Чому ж це потрібно? Хоча бази даних вже підтримують реплікацію, PostgreSQL та MySQL дозволяють використовувати реплікацію, що дає змогу застосовувати архітектуру майстер/слейв. Майстер база даних працює як лідер і використовується лише для запису, а слейви — тільки для читання даних. Це дійсно покращує продуктивність системи. Однак цей процес не можна застосувати до систем, які зберігають дані по-іншому. Тут CDC допомагає копіювати зміни даних у Postgres і передавати ці зміни в Elasticsearch для використання при швидких текстових запитах. Саме це я і намагатимусь реалізувати в цьому пості.
Це можливо лише за допомогою спеціального інструменту CDC, такого як Debezium Connector. Є ще один інструмент Bottled Water, але він більше не підтримується. Debezium Connector можна використовувати як з Postgres, так і з MySQL. Debezium захоплює зміни даних і передає їх у тему Kafka. Потім наш застосунок може отримувати зміни з тієї ж теми і зберігати їх у Elasticsearch для індексації.
Джерело: пост Confluent
Використання CDC у Spring мікросервісах за допомогою Kafka
У цьому випадку в мене є два мікросервіси: один — це сервіс управління користувачами, який має API для додавання/оновлення користувача в базу даних Postgres. Як тільки користувача додають, Debezium передає подію до Kafka. Цю подію споживає інший мікросервіс, а саме сервіс індексації користувачів, який зберігає того ж користувача в Elasticsearch.
Посилання на повний робочий приклад з використанням Java та Spring наведено в кінці цього посту.
Postgres, Debezium, Kafka та Elasticsearch можна налаштувати та запустити локально за допомогою файлу docker-compose;
version: '3.7'
networks:
cdc-network:
name: cdc-network
driver: bridge
external: false
services:
cdc-postgres:
image: debezium/postgres:13-alpine
container_name: cdc-postgres
hostname: cdc-postgres
restart: always
ports:
- 5443:5432
environment:
POSTGRES_PASSWORD: test
POSTGRES_USER: test
POSTGRES_DB: test
volumes:
- ./cdc-postgres-data:/var/lib/postgresql/data
networks:
- cdc-network
cdc-kafka:
image: bitnami/kafka:3.4
container_name: cdc-kafka
hostname: cdc-kafka
restart: always
ports:
- 9092:9092
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_KRAFT_CLUSTER_ID: q0k00yjQRaqWmAAAZv955w
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: INTERNAL://cdc-kafka:29092,CONTROLLER://cdc-kafka:29093,EXTERNAL://0.0.0.0:9092
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://cdc-kafka:29092,EXTERNAL://localhost:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@cdc-kafka:29093
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
networks:
- cdc-network
cdc-debezium-connect:
image: debezium/connect:2.3
container_name: cdc-debezium-connect
hostname: cdc-debezium-connect
restart: always
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: cdc-kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
ENABLE_DEBEZIUM_SCRIPTING: 'true'
links:
- cdc-kafka
- cdc-postgres
networks:
- cdc-network
cdc-elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1
container_name: elasticsearch
environment:
cluster.name: docker-cluster
bootstrap.memory_lock: true
ES_JAVA_OPTS: -Xms512m -Xmx512m
ELASTIC_PASSWORD: elastic
discovery.type: single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es-data:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- cdc-network
volumes:
cdc-postgres-data:
name: cdc-postgres-data
driver: local
Запустіть контейнери за допомогою команди docker compose;
docker-compose up -d
Також потрібно налаштувати з’єднання Debezium з базою даних Postgres, щоб вона могла отримувати оновлення з Postgres і публікувати їх як події до теми Kafka. Це можна зробити за допомогою наступної команди;
curl --location 'http://localhost:8083/connectors' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data '{
"name": "cdc-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "cdc-postgres.cdc-index-processor.orb.local",
"database.port": "5432",
"database.user": "test",
"database.password": "test",
"database.dbname": "test",
"database.server.id": "122054",
"table.include.list": "public.users",
"topic.prefix": "users-table-topic"
}
}'
Тепер подивимося на API для додавання користувача в Springboot;
@PostMapping
public void add(@RequestBody UserRequest userRequest) {
var user = userRequest.toUser();
userRepository.save(user);
}
Ось такий POST запит можна використати для виклику цього API;
{
"name": "Kane Williamson",
"email": "[email protected]",
"phone":"+322222233444"
}
Інший мікросервіс — users-index, який може споживати події з Kafka і передавати їх до Elasticsearch.
UserConsumer може споживати події Kafka, і виглядає це ось так;
@KafkaHandler(isDefault = true)
public void consume(LinkedHashMap userPayload) throws JsonProcessingException {
logger.info("User Message: {}", objectMapper.writeValueAsString(userPayload));
var payLoad = objectMapper
.readValue(objectMapper.writeValueAsString(userPayload.get("payload")),
PayLoad.class
);
var userData = payLoad.after();
elasticsearchOperations.indexOps(UserDocument.class).refresh();
switch (payLoad.op()) {
case c -> userService.save(userData);
case u -> userService.update(userData);
case d -> {
userData = payLoad.before();
userService.delete(userData);
}
default -> logger.info("Operation not supported");
}
}
Цей споживач Kafka може обробляти всі типи подій даних, які призначені для додавання, оновлення або видалення даних. Кожного разу, коли відбувається зміна в таблиці користувачів у Postgres, Debezium захоплює цю зміну і відправляє її до Kafka. Наш додаток може споживати події Kafka для цієї теми і зберігати дані в Elasticsearch.
Використання Elasticsearch для текстових запитів
Пам'ятайте, ми завжди хотіли використовувати Elasticsearch. Ви можете використати наступний запит для пошуку даних користувача за ім'ям;
curl localhost:9200/_search -H "Content-Type: application/json"
-d '{"query":{"match": {"name":"Kane"}}}'
CDC дає нам контроль над тим, як по-різному доступати та використовувати дані
Існують випадки, коли типові бази даних, такі як MySQL або Postgres, не є достатніми для деяких операцій, наприклад, як кеш. Тоді необхідно визначити спосіб копіювання даних в кеш, наприклад, Redis, коли відбувається оновлення в базі даних для запису.
Мартін Клеппман у своїй популярній книзі Designing data-intensive applications описує це так;
“Зберігання даних зазвичай досить просте, якщо вам не потрібно турбуватися про те, як їх будуть запитувати та використовувати; багато складнощів проектування схем, індексації та механізмів зберігання виникають через бажання підтримувати певні шаблони запитів та доступу. З цієї причини ви отримуєте велику гнучкість, розділяючи форму, в якій записуються дані, від форми, в якій вони читаються, і дозволяючи кілька різних варіантів для читання. Ця ідея іноді називається сегрегацією відповідальності запиту та команд.”
CDC дає нам надійний підхід для покращення продуктивності системи та також дозволяє використовувати дані на інших рівнях. Дані можна легко переміщати в інші системи для бізнес-аналізу та аналітики.
Це все, що я хотів сказати з цієї теми, і сподіваюся, що це зрозуміло і лаконічно. Я створив цей github репозиторій, ви можете подивитися, як усе це працює разом.
Перекладено з: CDC: Pushing data changes to Elasticsearch from Postgres