текст перекладу
Покроковий посібник із відстеження змін даних MongoDB за допомогою Debezium, Kafka та NestJS з прикладами інвалідизації кешу
Вступ
У сучасній архітектурі розподілених додатків все повинно бути синхронізовано та оновлено; це справді необхідно для виконання бізнес-завдань, оскільки з технічної точки зору це складно і вимагає великих зусиль.
Нещодавно я зіткнувся з випадком, коли ми мали завдання відстежувати зміни в MongoDB. Наш сценарій включав обробку різноманітних запитів від кількох додатків, збереження їх у MongoDB та забезпечення консистентності. Ми використовували REST-слой, який надавав список запитів з MongoDB, пагінував ці запити та підтримував продуктивність за допомогою кешу Redis. Основною проблемою було інвалідизувати кешовані дані, щоб користувачі могли побачити найактуальнішу інформацію.
У цій статті ми розглянемо, як спроектувати інвалідизацію кешу за допомогою інструменту зміни даних Debezium. Однак ми не будемо занурюватися в специфіку реалізації кешування.
Проектування на високому рівні
Як відстежувати всі зміни даних в MongoDB?
MongoDB має потік змін (change stream) для реального часу, або ми можемо використати сторонній інструмент для захоплення змін даних - Debezium, який використовує внутрішньо oplog MongoDB.
- Change Streams: Потоки змін дозволяють додаткам відстежувати зміни даних у базі даних у реальному часі, надаючи механізм підписки на всі зміни даних у колекції, наборі колекцій, базі даних або по всьому розгортанню.
const changeStream = collection.watch();
changeStream.on('change', (next) => {
console.log('Відбулася зміна:', next);
});
Change Stream
2. Change Data Capture: Захоплення змін даних (Change Data Capture) — це інший метод відстеження змін у базі даних. Він надає детальніший запис змін, включаючи старі та нові значення змінених даних. Ми розглядаємо використання відкритого інструменту CDC Debezium, і одним із джерел для цього є oplog. Oplog (журнал операцій) — це спеціальна обмежена колекція, яка записує всі операції запису, виконані в базі даних.
await consumer.subscribe({
topic: 'prefix.database.collection_name',
fromBeginning: true,
});
await consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
try {
const { op, after, before } = JSON.parse(messagePayload.value.toString());
switch (op) {
case "c": // CREATE
case "u": // UPDATE
case "d": // DELETE
}
} catch (error) {
console.error(
`Помилка обробки повідомлення з топіка ${messagePayload.topic}:`,
error.stack
);
}
},
});
Debezium використовує oplog MongoDB для відстеження всіх змін у базі даних (вставка, оновлення, видалення) та транслює ці зміни в систему повідомлень, таку як Kafka, яку ваше додаток може споживати.
CDC
Розуміння компонентів
Щоб зрозуміти компоненти для нашого випадку використання, потрібно скласти список технологічних інструментів і рішень, які ми використовуємо. Ми використовуємо Docker Compose для тестування цього рішення локально та Docker Desktop, щоб краще бачити, що відбувається. Також необхідно налаштувати з'єднувач Debezium для MongoDB, Kafka, MongoDB, Redis та Debezium.
- Контейнеризація: Використовуючи Docker Compose, визначаємо сервіси в docker-compose.yml для MongoDB, Kafka, Zookeeper (необхідний для Kafka), Debezium та Redis. Це налаштування забезпечує ізольовані середовища для кожного сервісу, що полегшує керування залежностями та конфігураціями.
- Налаштування MongoDB: Ми повинні запустити MongoDB в контейнері, налаштованому як replica set, оскільки Debezium вимагає це для захоплення змін через oplog.
- Конфігурація Debezium: Налаштуйте з'єднувач Debezium для моніторингу конкретних баз даних або колекцій у MongoDB.
текст перекладу
Це включає налаштування конфігурацій з'єднувачів, які визначають, які зміни потрібно захоплювати та як передавати їх у Kafka topics. - Інтеграція з Kafka: Переконайтеся, що Kafka та Zookeeper працюють. Debezium передаватиме захоплені зміни в Kafka topics, які потім можна буде споживати іншими додатками чи сервісами.
- Розробка на NestJS: Нам потрібно розробити бекенд-сервіси або мікросервіси за допомогою NestJS, які підключатимуться до Kafka для споживання цих подій змін. Це передбачає створення споживачів для конкретних топіків для обробки реальних оновлень даних або запуску дій на основі змін даних.
Що таке Debezium? Як працює Debezium?
Debezium — це відкритий інструмент, розроблений для потоку даних з існуючих джерел даних через Kafka, що дозволяє традиційним джерелам даних стати частиною архітектури на основі потоків. Він надає високоякісні відкриті з'єднувачі для захоплення змін для різних баз даних, що робить його рекомендованим рішенням для передачі даних з реляційних баз даних у Kafka.
[
Архітектура Debezium
Найчастіше Debezium розгортається за допомогою Apache Kafka Connect. Kafka Connect — це фреймворк і середовище виконання для…
debezium.io
](https://debezium.io/documentation/reference/stable/architecture.html?source=post_page-----ae620b20c98d--------------------------------)
Debezium можна використовувати для захоплення змін, внесених у базу даних, і публікації їх як повідомлення в брокер повідомлень, такий як Apache Kafka. Він підтримує багато реляційних та NoSQL баз даних і використовує Kafka Connect як міст між даними на диску і даними в русі. Debezium можна налаштувати для захоплення змін з бази даних і публікації їх у Kafka topic, як показано на наступній діаграмі.
Що таке Debezium? Як працює Debezium?
Реалізація
Ми можемо застосувати та реалізувати захоплення змін даних у різних випадках використання, таких як:
- Сповіщення, керовані подіями: Тригери в реальному часі для сповіщень або оповіщень, коли відбуваються конкретні зміни в базі даних.
- Аналіз у реальному часі: Створення панелей управління, які відображають живі оновлення вашої бази даних MongoDB.
- Реплікація даних: Передача змін MongoDB до інших баз даних, таких як PostgreSQL чи Elasticsearch.
- Управління кешем: Інвалідизація або оновлення кешованих даних в реальному часі на основі змін у базі даних.
- Аудит: Ведення комплексного журналу всіх змін для забезпечення відповідності та налагодження.
Коли я шукав свій випадок інвалідизації кешу, я знайшов одну статтю від Гуннара Мерлінга на блозі Debezium; будь ласка, перегляньте посилання нижче для більш детальної інформації.
[
Автоматизація інвалідизації кешу за допомогою Change Data Capture
Debezium — це відкритий розподілений інструмент для захоплення змін даних. Запустіть його, вкажіть ваші бази даних і…
debezium.io
](https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/?source=post_page-----ae620b20c98d--------------------------------)
Давайте спробуємо налаштувати та реалізувати захоплення змін даних на нашій локальній машині.
Крок 1: Налаштування MongoDB за допомогою Docker Compose
Як я згадував раніше, для підтримки захоплення змін даних для MongoDB нам потрібен replica set; ми не можемо застосувати це на самостійно працюючому MongoDB.
текст перекладу
Сервери MongoDB у автономному режимі не зберігають oplog; ця функція доступна лише в MongoDB, коли він налаштований як частина replica set або sharded cluster.
Крім того, щоб захоплювати як попередній, так і пост-змінний стан документів, необхідно увімкнути відстеження pre- та post-image для колекцій, які потрібно моніторити.
- Налаштування Replica Set: Щоб налаштувати MongoDB як replica set на локальній машині, ми повинні: увімкнути аутентифікацію для забезпечення безпечних з'єднань між вузлами та надати відповідні дозволи для SSL-ключа, який використовується для внутрішньої аутентифікації в межах replica set.
- Налаштування Change Stream: Налаштуйте MongoDB для увімкнення change streams з відстеженням pre- та post-image для колекцій, які ви хочете моніторити. Це передбачає налаштування відповідних команд або скриптів для активації цих функцій, щоб забезпечити запис як оригінального (до зміни), так і зміненого (після зміни) стану документів.
- Створіть ініціалізаційний скрипт для replica set
#!/bin/bash
# Цей скрипт ініціалізує MongoDB replica set. Він чекає, поки MongoDB сервіс запуститься,
# після чого підключається до нього за допомогою команди 'mongosh' та ініціює конфігурацію replica set.
# Скрипт розрахований на виконання в окремому контейнері, що залежить від контейнера MongoDB.
# Це забезпечує повний запуск MongoDB перед спробою ініціалізації replica set.
echo ====================================================
echo ============= Ініціалізація Replica Set =============
echo ====================================================
# Цикл, поки MongoDB не буде готовий до прийому підключень
until mongosh --host mongo:27017 --eval 'quit(0)' &>/dev/null; do
echo "Очікування запуску mongod..."
sleep 5
done
echo "MongoDB запущено. Ініціалізація Replica Set..."
# Підключення до MongoDB сервісу та ініціалізація replica set
mongosh --host mongo:27017 -u root -p rootpassword --authenticationDatabase admin < ./init/key
chmod 400 ./init/key
chmod +x ./init/init-replica.sh
- Створіть конфігурацію для Docker Compose
services:
mongo:
image: mongo:latest
restart: unless-stopped
command: ["--replSet", "rs0", "--keyFile", "/etc/mongo-keyfile"]
environment:
- MONGO_INITDB_ROOT_USERNAME=root
- MONGO_INITDB_ROOT_PASSWORD=rootpassword
- MONGO_INITDB_DATABASE=database
- MONGO_REPLICA_SET_NAME=rs0
ports:
- '27017:27017'
volumes:
- mongodb_data:/data/db
- ./init/key:/etc/mongo-keyfile:ro
networks:
- cdc-network
healthcheck:
test: mongosh --host localhost:27017 --eval 'db.adminCommand("ping")' || exit 1
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
mongo-init-replica:
image: mongo:latest
depends_on:
- mongo
volumes:
- ./init/init-replica.sh:/docker-entrypoint-initdb.d/init-replica.sh:ro
entrypoint: ["/docker-entrypoint-initdb.d/init-replica.sh"]
networks:
- cdc-network
volumes:
mongodb_data:
driver: local
networks:
cdc-network:
driver: bridge
текст перекладу
Тестування
Як ми можемо переконатися, що налаштування правильне, або як подивитися на базу даних і колекції, серед іншого? Для цього ми можемо використати [MongoDB Compass](https://www.mongodb.com/products/tools/compass), і нам потрібен рядок підключення для з'єднання з базою даних.
MONGODB_URI=mongodb://root:rootpassword@localhost:27017/database?authSource=admin&replicaSet=rs0
```
Compass. Графічний інтерфейс для MongoDB
Крок 2: Налаштування Kafka за допомогою Docker Compose
Нижче подано конфігурацію Docker Compose, яку ми будемо використовувати для налаштування Apache Kafka, а також ми використовуватимемо Kafka UI для візуалізації панелі моніторингу Kafka.
- Створіть конфігурацію для Docker Compose
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.2
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- cdc-network
kafka:
image: confluentinc/cp-kafka:7.5.2
depends_on:
- zookeeper
ports:
- '29092:29092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- cdc-network
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- '9090:8080'
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- cdc-network
- Тестування
Ми використовуємо порт 9090 для запуску Kafka UI; нижче наведено скріншот того, що відбувається.
Тестування Kafka
Крок 3: Налаштування Debezium за допомогою Docker Compose
Ми будемо використовувати Debezium 2.0, оскільки він підтримує кілька баз даних. Сервіс Debezium залежить від сервісів Kafka і MongoDB, тому спочатку потрібно переконатися, що вони запущені.
debezium-mongo:
image: debezium/connect:2.0
depends_on:
- kafka
- mongo
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_config
OFFSET_STORAGE_TOPIC: debezium_offset
STATUS_STORAGE_TOPIC: debezium_status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false
ports:
- '8083:8083'
networks:
- cdc-network
Ми будемо використовувати порт 8083 на локальній машині, який використовується для Kafka Connect REST API для управління конекторами, перегляду статусів тощо.
У нашому випадку нам потрібно використовувати два кінцеві точки: одна — це налаштований конектор, а інша — статус конектора.
REST API конектора
Ми також можемо використовувати Debezium UI для створення конектора і отримання статусу, що є зручним варіантом. Однак спочатку потрібно налаштувати його.
[
GitHub - debezium/debezium-ui: Веб-інтерфейс для Debezium; будь ласка, повідомляйте про проблеми на…
Веб-інтерфейс для Debezium; будь ласка, повідомляйте про проблеми на https://issues.redhat.com/browse/DBZ.
Крок 4: Налаштування MongoDB Debezium Connector (конектора)
Тут ми повинні переконатися, що передаємо всю необхідну інформацію, адже це справжня магія для синхронізації змін даних MongoDB з Kafka.
Налаштування MongoDB Debezium Connector передбачає налаштування конектора для моніторингу реплікаційного набору MongoDB, визначення параметрів конфігурації джерела та інтеграцію з вашим налаштуванням Kafka Connect.
1. Створення або налаштування конектора
У нашому випадку ми використовуємо наступну конфігурацію. Будь ласка, переконайтеся, що ваші значення оновлені відповідно до вашої конфігурації.
. Запит URL (HTTP — POST)
http://localhost:8083/connectors
. Конфігурація
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
// Деталі підключення MongoDB
"mongodb.hosts": "rs0/mongo:27017", // Замініть на ваші хости MongoDB
"mongodb.name": "dbserver1", // Логічне ім’я для MongoDB сервера
// Аутентифікація
"mongodb.user": "root",
"mongodb.password": "rootpassword",
"mongodb.authsource": "admin",
// Режим захоплення - цей параметр забезпечує захоплення як попереднього, так і після оновлення
"capture.mode": "change_streams_update_full_with_pre_image",
// Колекції для моніторингу
"database.include.list": "database", // Налаштуйте згідно з вашими потребами
"collection.include.list": "database.queries", // Налаштуйте згідно з вашими потребами
// Кількість задач
"tasks.max": "3", // Налаштуйте згідно з вашими ресурсами та вимогами
// Найменування Kafka топіків
"topic.prefix": "dbserver1", // Префікс для Kafka топіків
// Обробка схем і подій
"output.schema": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
// Налаштування продуктивності
"snapshot.mode": "initial", // Виконати початкове знімання при запуску
"snapshot.max.threads": "4", // Кількість потоків для знімання
// Транзакційні метадані
"provide.transaction.metadata": "true",
// Обробка помилок
"errors.retry.timeout": "0", // Без повторних спроб у разі помилок, для продакшн-середовища варто змінити це значення
"errors.tolerance": "none", // Тolerantність до помилок
// Моніторинг за допомогою Heartbeat
"heartbeat.interval.ms": "30000", // Кожні 30 секунд
// Різне
"max.queue.size": "8192", // Максимальна кількість записів, що можуть бути в буфері в пам'яті
"max.batch.size": "2048",
"poll.interval.ms": "1000"
}
}
- Відповідь
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rs0/mongo:27017",
"mongodb.name": "dbserver1",
"mongodb.user": "root",
"mongodb.password": "rootpassword",
"mongodb.authsource": "admin",
"capture.mode": "change_streams_update_full_with_pre_image",
"database.include.list": "database",
"collection.include.list": "database.queries",
"tasks.max": "3",
"topic.prefix": "dbserver1",
"output.schema": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"snapshot.max.threads": "4",
"provide.transaction.metadata": "true",
"errors.retry.timeout": "0",
"errors.tolerance": "none",
"heartbeat.interval.ms": "30000",
"max.queue.size": "8192",
"max.batch.size": "2048",
"poll.interval.ms": "1000",
"name": "mongodb-connector1"
},
"tasks": [],
"type": "source"
}
**Перевірка статусу**
. Запит URL (HTTP — GET)
http://localhost:8083/connectors/mongodb-connector/status
```
. Відповідь
{
"name": "mongodb-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.18.0.7:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.18.0.7:8083"
}
],
"type": "source"
}
Для отримання додаткових деталей, будь ласка, перегляньте документацію нижче для MongoDB Debezium Connector, яка містить усі необхідні відомості.
[
Debezium connector для MongoDB
Debezium MongoDB connector відстежує реплікаційний набір MongoDB або шардинг MongoDB для змін документів у…
debezium.io
](https://debezium.io/documentation/reference/stable/connectors/mongodb.html?source=post_page-----ae620b20c98d--------------------------------)
Після завершення налаштування, ми можемо перевірити, чи працює Kafka dashboard, переконавшись у створенні всіх відповідних топіків у Kafka. Також можна перевірити повідомлення, що генеруються для відповідної колекції під топіком prefix.database_name.collection_name
. (Наприклад, dbserver1.database.queries
)
Kafka Topics
Крок 5: Налаштування NestJS для споживання Kafka подій (Kafka Events)
Якщо ви зовсім новачок у NestJS, будь ласка, прочитайте статтю для початківців, що наведена нижче. Якщо ви шукаєте базовий шаблон Node.js з Express.js, зверніться до цього репозиторію.
const kafka = new Kafka({
clientId: this.appConfigService.kafkaClientId,
brokers: this.appConfigService.kafkaBrokers,
});
// consumer group для дозволу отримувати повідомлення з Kafka топіків у цій групі
this.databaseConsumer = kafka.consumer({ groupId: 'change-tracker-group' });
async onModuleInit() {
try {
await this.initializeConsumer(
this.databaseConsumer,
this.handleDatabaseChangeMessage,
);
} catch (error) {
console.error('Помилка під час ініціалізації споживача:', error);
}
}
Тут ми бачимо, що є один group ID, і він не має зв’язку з попередньою конфігурацією, і створюється екземпляр Kafka споживача (Kafka Consumer), який підписується на конкретну споживацьку групу (change-tracker-group
), дозволяючи йому отримувати повідомлення з Kafka топіків цієї групи.
Під час підписки на топік, ми повинні слідувати шаблону prefix.database_name.collection_name
(Наприклад, dbserver1.database.queries
), і нам потрібно звертатися до нашої конфігурації, яку ми надали раніше.
private async initializeConsumer(
consumer: Consumer,
messageHandler: (messagePayload: EachMessagePayload) => Promise,
) {
try {
await consumer.connect();
await consumer.subscribe({
topic: 'dbserver1.database.queries',
fromBeginning: true,
});
await consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
try {
await messageHandler.call(this, messagePayload);
} catch (error) {
console.error(
`Помилка під час обробки повідомлення з топіку ${messagePayload.topic}:`,
error.stack,
);
}
},
});
} catch (error) {
console.error('Помилка при ініціалізації споживача:', error.stack);
}
}
private async handleDatabaseChangeMessage({ message }: EachMessagePayload) {
if (!message.value) {
console.warn('Отримано повідомлення без значення.');
Skipping...');
return;
}
try {
const { op, after, before } = JSON.parse(message.value.toString());
console.info(`message - ${message.value.toString()}`);
switch (op) {
case 'c':
case 'u':
case 'd':
await this.handleCacheInvalidation(op, after, before);
break;
default:
console.warn(`Невідома операція: ${op}`);
break;
}
} catch (error) {
console.error('Помилка при обробці повідомлення про зміну бази даних:', error.stack);
}
}
Вихідний код (Source code)
Крок 6: Тестування налаштувань
Ми можемо виконати команду Docker Compose для запуску створеного файлу Docker Compose, а просте виконання npm start
запустить застосунок.
docker compose
- Перевірте в Docker desktop, чи всі контейнери створені
docker desktop
- Запустіть застосунок NestJS
npm start
- Спробуйте створити або оновити запис у MongoDB.
операція MongoDB
На знімку вище зліва ми створили або оновили запис, а справа ми бачимо повідомлення на споживацькому топіку, на який ми підписалися.
Нижче наведено payload повідомлення, яке ми отримали на топіку.
{
"before": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"[email protected]\",\"moduleName\": \"prediction\",\"queryText\": \"What will be today's temperature?\",\"metadata\": {\"priority\": \"high\"}}",
"after": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"[email protected]\",\"moduleName\": \"prediction\",\"queryText\": \"What will be today's temperature?\",\"metadata\": {\"priority\": \"high\"}}",
"patch": null,
"filter": null,
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"userEmail\": \"[email protected]\"}",
"truncatedArrays": null
},
"source": {
"version": "2.0.1.Final",
"connector": "mongodb",
"name": "dbserver1",
"ts_ms": 1735662855000,
"snapshot": "false",
"db": "database",
"sequence": null,
"rs": "rs0",
"collection": "queries",
"ord": 2,
"lsid": null,
"txnNumber": null
},
"op": "u",
"ts_ms": 1735662855656,
"transaction": null
}
Вихідний код (Source Code)
Будь ласка, перегляньте репозиторій з вихідним кодом для повного рішення нижче. Це не код, готовий до використання в продакшн-середовищі. Я чекаю на ваші коментарі та пропозиції, або тут, або в секції GitHub Issues.
[
GitHub - santoshshinde2012/cache-invalidation
Внесіть свій вклад у розробку проекту santoshshinde2012/cache-invalidation, створивши акаунт на GitHub.
github.com
](https://github.com/santoshshinde2012/cache-invalidation?source=post_page-----ae620b20c98d--------------------------------)
Дякую за прочитане. Будь ласка, поділіться своїми коментарями та поставте лайк, якщо цей блог був корисним для вашого навчання. Будь ласка, підпишіться на мене для подальших оновлень та чекаю на ваші коментарі.
Перекладено з: Design Cache Invalidation with Real-Time Data Tracking with MongoDB, Debezium, Kafka, and NestJS