Проєктування інвалідації кешу з відстеженням даних у реальному часі за допомогою MongoDB, Debezium, Kafka та NestJS

текст перекладу

Покроковий посібник із відстеження змін даних MongoDB за допомогою Debezium, Kafka та NestJS з прикладами інвалідизації кешу

Вступ

У сучасній архітектурі розподілених додатків все повинно бути синхронізовано та оновлено; це справді необхідно для виконання бізнес-завдань, оскільки з технічної точки зору це складно і вимагає великих зусиль.

Нещодавно я зіткнувся з випадком, коли ми мали завдання відстежувати зміни в MongoDB. Наш сценарій включав обробку різноманітних запитів від кількох додатків, збереження їх у MongoDB та забезпечення консистентності. Ми використовували REST-слой, який надавав список запитів з MongoDB, пагінував ці запити та підтримував продуктивність за допомогою кешу Redis. Основною проблемою було інвалідизувати кешовані дані, щоб користувачі могли побачити найактуальнішу інформацію.

У цій статті ми розглянемо, як спроектувати інвалідизацію кешу за допомогою інструменту зміни даних Debezium. Однак ми не будемо занурюватися в специфіку реалізації кешування.

pic

Проектування на високому рівні

Як відстежувати всі зміни даних в MongoDB?

MongoDB має потік змін (change stream) для реального часу, або ми можемо використати сторонній інструмент для захоплення змін даних - Debezium, який використовує внутрішньо oplog MongoDB.

  1. Change Streams: Потоки змін дозволяють додаткам відстежувати зміни даних у базі даних у реальному часі, надаючи механізм підписки на всі зміни даних у колекції, наборі колекцій, базі даних або по всьому розгортанню.
const changeStream = collection.watch();  
changeStream.on('change', (next) => {  
 console.log('Відбулася зміна:', next);  
});

pic

Change Stream

2. Change Data Capture: Захоплення змін даних (Change Data Capture) — це інший метод відстеження змін у базі даних. Він надає детальніший запис змін, включаючи старі та нові значення змінених даних. Ми розглядаємо використання відкритого інструменту CDC Debezium, і одним із джерел для цього є oplog. Oplog (журнал операцій) — це спеціальна обмежена колекція, яка записує всі операції запису, виконані в базі даних.

await consumer.subscribe({  
 topic: 'prefix.database.collection_name',  
 fromBeginning: true,  
});  

await consumer.run({  
 eachMessage: async (messagePayload: EachMessagePayload) => {  
 try {  
 const { op, after, before } = JSON.parse(messagePayload.value.toString());  
 switch (op) {  
 case "c": // CREATE  
 case "u": // UPDATE  
 case "d": // DELETE  
 }  
 } catch (error) {  
 console.error(  
 `Помилка обробки повідомлення з топіка ${messagePayload.topic}:`,  
 error.stack  
 );  
 }  
 },  
});

Debezium використовує oplog MongoDB для відстеження всіх змін у базі даних (вставка, оновлення, видалення) та транслює ці зміни в систему повідомлень, таку як Kafka, яку ваше додаток може споживати.

pic

CDC

Розуміння компонентів

Щоб зрозуміти компоненти для нашого випадку використання, потрібно скласти список технологічних інструментів і рішень, які ми використовуємо. Ми використовуємо Docker Compose для тестування цього рішення локально та Docker Desktop, щоб краще бачити, що відбувається. Також необхідно налаштувати з'єднувач Debezium для MongoDB, Kafka, MongoDB, Redis та Debezium.

  • Контейнеризація: Використовуючи Docker Compose, визначаємо сервіси в docker-compose.yml для MongoDB, Kafka, Zookeeper (необхідний для Kafka), Debezium та Redis. Це налаштування забезпечує ізольовані середовища для кожного сервісу, що полегшує керування залежностями та конфігураціями.
  • Налаштування MongoDB: Ми повинні запустити MongoDB в контейнері, налаштованому як replica set, оскільки Debezium вимагає це для захоплення змін через oplog.
  • Конфігурація Debezium: Налаштуйте з'єднувач Debezium для моніторингу конкретних баз даних або колекцій у MongoDB.
    текст перекладу
    Це включає налаштування конфігурацій з'єднувачів, які визначають, які зміни потрібно захоплювати та як передавати їх у Kafka topics.
  • Інтеграція з Kafka: Переконайтеся, що Kafka та Zookeeper працюють. Debezium передаватиме захоплені зміни в Kafka topics, які потім можна буде споживати іншими додатками чи сервісами.
  • Розробка на NestJS: Нам потрібно розробити бекенд-сервіси або мікросервіси за допомогою NestJS, які підключатимуться до Kafka для споживання цих подій змін. Це передбачає створення споживачів для конкретних топіків для обробки реальних оновлень даних або запуску дій на основі змін даних.

Що таке Debezium? Як працює Debezium?

Debezium — це відкритий інструмент, розроблений для потоку даних з існуючих джерел даних через Kafka, що дозволяє традиційним джерелам даних стати частиною архітектури на основі потоків. Він надає високоякісні відкриті з'єднувачі для захоплення змін для різних баз даних, що робить його рекомендованим рішенням для передачі даних з реляційних баз даних у Kafka.

[

Архітектура Debezium

Найчастіше Debezium розгортається за допомогою Apache Kafka Connect. Kafka Connect — це фреймворк і середовище виконання для…

debezium.io

](https://debezium.io/documentation/reference/stable/architecture.html?source=post_page-----ae620b20c98d--------------------------------)

Debezium можна використовувати для захоплення змін, внесених у базу даних, і публікації їх як повідомлення в брокер повідомлень, такий як Apache Kafka. Він підтримує багато реляційних та NoSQL баз даних і використовує Kafka Connect як міст між даними на диску і даними в русі. Debezium можна налаштувати для захоплення змін з бази даних і публікації їх у Kafka topic, як показано на наступній діаграмі.

pic

Що таке Debezium? Як працює Debezium?

Реалізація

Ми можемо застосувати та реалізувати захоплення змін даних у різних випадках використання, таких як:

  1. Сповіщення, керовані подіями: Тригери в реальному часі для сповіщень або оповіщень, коли відбуваються конкретні зміни в базі даних.
  2. Аналіз у реальному часі: Створення панелей управління, які відображають живі оновлення вашої бази даних MongoDB.
  3. Реплікація даних: Передача змін MongoDB до інших баз даних, таких як PostgreSQL чи Elasticsearch.
  4. Управління кешем: Інвалідизація або оновлення кешованих даних в реальному часі на основі змін у базі даних.
  5. Аудит: Ведення комплексного журналу всіх змін для забезпечення відповідності та налагодження.

Коли я шукав свій випадок інвалідизації кешу, я знайшов одну статтю від Гуннара Мерлінга на блозі Debezium; будь ласка, перегляньте посилання нижче для більш детальної інформації.

[

Автоматизація інвалідизації кешу за допомогою Change Data Capture

Debezium — це відкритий розподілений інструмент для захоплення змін даних. Запустіть його, вкажіть ваші бази даних і…

debezium.io

](https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/?source=post_page-----ae620b20c98d--------------------------------)

Давайте спробуємо налаштувати та реалізувати захоплення змін даних на нашій локальній машині.

Крок 1: Налаштування MongoDB за допомогою Docker Compose

Як я згадував раніше, для підтримки захоплення змін даних для MongoDB нам потрібен replica set; ми не можемо застосувати це на самостійно працюючому MongoDB.
текст перекладу
Сервери MongoDB у автономному режимі не зберігають oplog; ця функція доступна лише в MongoDB, коли він налаштований як частина replica set або sharded cluster.

Крім того, щоб захоплювати як попередній, так і пост-змінний стан документів, необхідно увімкнути відстеження pre- та post-image для колекцій, які потрібно моніторити.

  • Налаштування Replica Set: Щоб налаштувати MongoDB як replica set на локальній машині, ми повинні: увімкнути аутентифікацію для забезпечення безпечних з'єднань між вузлами та надати відповідні дозволи для SSL-ключа, який використовується для внутрішньої аутентифікації в межах replica set.
  • Налаштування Change Stream: Налаштуйте MongoDB для увімкнення change streams з відстеженням pre- та post-image для колекцій, які ви хочете моніторити. Це передбачає налаштування відповідних команд або скриптів для активації цих функцій, щоб забезпечити запис як оригінального (до зміни), так і зміненого (після зміни) стану документів.
  1. Створіть ініціалізаційний скрипт для replica set
#!/bin/bash  

# Цей скрипт ініціалізує MongoDB replica set. Він чекає, поки MongoDB сервіс запуститься,  
# після чого підключається до нього за допомогою команди 'mongosh' та ініціює конфігурацію replica set.  

# Скрипт розрахований на виконання в окремому контейнері, що залежить від контейнера MongoDB.  
# Це забезпечує повний запуск MongoDB перед спробою ініціалізації replica set.  

echo ====================================================  
echo ============= Ініціалізація Replica Set =============  
echo ====================================================  

# Цикл, поки MongoDB не буде готовий до прийому підключень  
until mongosh --host mongo:27017 --eval 'quit(0)' &>/dev/null; do  
 echo "Очікування запуску mongod..."  
 sleep 5  
done  

echo "MongoDB запущено. Ініціалізація Replica Set..."  

# Підключення до MongoDB сервісу та ініціалізація replica set  
mongosh --host mongo:27017 -u root -p rootpassword --authenticationDatabase admin < ./init/key  
chmod 400 ./init/key  
chmod +x ./init/init-replica.sh
  1. Створіть конфігурацію для Docker Compose
services:   
 mongo:  
 image: mongo:latest  
 restart: unless-stopped  
 command: ["--replSet", "rs0", "--keyFile", "/etc/mongo-keyfile"]  
 environment:  
 - MONGO_INITDB_ROOT_USERNAME=root  
 - MONGO_INITDB_ROOT_PASSWORD=rootpassword  
 - MONGO_INITDB_DATABASE=database  
 - MONGO_REPLICA_SET_NAME=rs0  
 ports:  
 - '27017:27017'  
 volumes:  
 - mongodb_data:/data/db  
 - ./init/key:/etc/mongo-keyfile:ro  
 networks:  
 - cdc-network  
 healthcheck:  
 test: mongosh --host localhost:27017 --eval 'db.adminCommand("ping")' || exit 1  
 interval: 5s  
 timeout: 30s  
 start_period: 0s  
 start_interval: 1s  
 retries: 30  

 mongo-init-replica:  
 image: mongo:latest  
 depends_on:  
 - mongo  
 volumes:  
 - ./init/init-replica.sh:/docker-entrypoint-initdb.d/init-replica.sh:ro  
 entrypoint: ["/docker-entrypoint-initdb.d/init-replica.sh"]  
 networks:  
 - cdc-network  

volumes:  
 mongodb_data:  
 driver: local  

networks:  
 cdc-network:  
 driver: bridge

текст перекладу
Тестування

Як ми можемо переконатися, що налаштування правильне, або як подивитися на базу даних і колекції, серед іншого? Для цього ми можемо використати [MongoDB Compass](https://www.mongodb.com/products/tools/compass), і нам потрібен рядок підключення для з'єднання з базою даних.

MONGODB_URI=mongodb://root:rootpassword@localhost:27017/database?authSource=admin&replicaSet=rs0
```

pic

Compass. Графічний інтерфейс для MongoDB

Крок 2: Налаштування Kafka за допомогою Docker Compose

Нижче подано конфігурацію Docker Compose, яку ми будемо використовувати для налаштування Apache Kafka, а також ми використовуватимемо Kafka UI для візуалізації панелі моніторингу Kafka.

  1. Створіть конфігурацію для Docker Compose
services:  
 zookeeper:  
 image: confluentinc/cp-zookeeper:7.5.2  
 ports:  
 - '2181:2181'  
 environment:  
 ZOOKEEPER_CLIENT_PORT: 2181  
 ZOOKEEPER_TICK_TIME: 2000  

 networks:  
 - cdc-network  

 kafka:  
 image: confluentinc/cp-kafka:7.5.2  
 depends_on:  
 - zookeeper  
 ports:  
 - '29092:29092'  
 environment:  
 KAFKA_BROKER_ID: 1  
 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181  
 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092  
 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT  
 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1  

 networks:  
 - cdc-network  

 kafka-ui:  
 image: provectuslabs/kafka-ui:latest  
 depends_on:  
 - kafka  
 ports:  
 - '9090:8080'  
 environment:  
 KAFKA_CLUSTERS_0_NAME: local  
 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092  
 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181  

 networks:  
 - cdc-network
  1. Тестування

Ми використовуємо порт 9090 для запуску Kafka UI; нижче наведено скріншот того, що відбувається.

pic

Тестування Kafka

Крок 3: Налаштування Debezium за допомогою Docker Compose

Ми будемо використовувати Debezium 2.0, оскільки він підтримує кілька баз даних. Сервіс Debezium залежить від сервісів Kafka і MongoDB, тому спочатку потрібно переконатися, що вони запущені.

debezium-mongo:  
 image: debezium/connect:2.0  
 depends_on:  
 - kafka  
 - mongo  
 environment:  
 BOOTSTRAP_SERVERS: kafka:9092  
 GROUP_ID: 1  
 CONFIG_STORAGE_TOPIC: debezium_config  
 OFFSET_STORAGE_TOPIC: debezium_offset  
 STATUS_STORAGE_TOPIC: debezium_status  
 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
 CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter  
 CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: false  
 CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: false  
 ports:  
 - '8083:8083'   
 networks:  
 - cdc-network

Ми будемо використовувати порт 8083 на локальній машині, який використовується для Kafka Connect REST API для управління конекторами, перегляду статусів тощо.

У нашому випадку нам потрібно використовувати два кінцеві точки: одна — це налаштований конектор, а інша — статус конектора.

pic

REST API конектора

Ми також можемо використовувати Debezium UI для створення конектора і отримання статусу, що є зручним варіантом. Однак спочатку потрібно налаштувати його.

[

GitHub - debezium/debezium-ui: Веб-інтерфейс для Debezium; будь ласка, повідомляйте про проблеми на…

Веб-інтерфейс для Debezium; будь ласка, повідомляйте про проблеми на https://issues.redhat.com/browse/DBZ.

Крок 4: Налаштування MongoDB Debezium Connector (конектора)

Тут ми повинні переконатися, що передаємо всю необхідну інформацію, адже це справжня магія для синхронізації змін даних MongoDB з Kafka.

Налаштування MongoDB Debezium Connector передбачає налаштування конектора для моніторингу реплікаційного набору MongoDB, визначення параметрів конфігурації джерела та інтеграцію з вашим налаштуванням Kafka Connect.

1. Створення або налаштування конектора

У нашому випадку ми використовуємо наступну конфігурацію. Будь ласка, переконайтеся, що ваші значення оновлені відповідно до вашої конфігурації.

. Запит URL (HTTP — POST)

http://localhost:8083/connectors

. Конфігурація

{  
 "name": "mongodb-connector",  
 "config": {  
 "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",  

 // Деталі підключення MongoDB  
 "mongodb.hosts": "rs0/mongo:27017", // Замініть на ваші хости MongoDB  
 "mongodb.name": "dbserver1", // Логічне ім’я для MongoDB сервера  

 // Аутентифікація  
 "mongodb.user": "root",  
 "mongodb.password": "rootpassword",  
 "mongodb.authsource": "admin",  

 // Режим захоплення - цей параметр забезпечує захоплення як попереднього, так і після оновлення  
 "capture.mode": "change_streams_update_full_with_pre_image",  

 // Колекції для моніторингу  
 "database.include.list": "database", // Налаштуйте згідно з вашими потребами  
 "collection.include.list": "database.queries", // Налаштуйте згідно з вашими потребами  

 // Кількість задач  
 "tasks.max": "3", // Налаштуйте згідно з вашими ресурсами та вимогами  

 // Найменування Kafka топіків  
 "topic.prefix": "dbserver1", // Префікс для Kafka топіків  

 // Обробка схем і подій  
 "output.schema": "true",  
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",  
 "key.converter.schemas.enable": "false",  
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
 "value.converter.schemas.enable": "false",  

 // Налаштування продуктивності  
 "snapshot.mode": "initial", // Виконати початкове знімання при запуску  
 "snapshot.max.threads": "4", // Кількість потоків для знімання  

 // Транзакційні метадані  
 "provide.transaction.metadata": "true",  

 // Обробка помилок  
 "errors.retry.timeout": "0", // Без повторних спроб у разі помилок, для продакшн-середовища варто змінити це значення  
 "errors.tolerance": "none", // Тolerantність до помилок  

 // Моніторинг за допомогою Heartbeat  
 "heartbeat.interval.ms": "30000", // Кожні 30 секунд  

 // Різне  
 "max.queue.size": "8192", // Максимальна кількість записів, що можуть бути в буфері в пам'яті  
 "max.batch.size": "2048",  
 "poll.interval.ms": "1000"  
 }  
}
  • Відповідь
{  
 "name": "mongodb-connector",  
 "config": {  
 "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",  
 "mongodb.hosts": "rs0/mongo:27017",  
 "mongodb.name": "dbserver1",  
 "mongodb.user": "root",  
 "mongodb.password": "rootpassword",  
 "mongodb.authsource": "admin",  
 "capture.mode": "change_streams_update_full_with_pre_image",  
 "database.include.list": "database",  
 "collection.include.list": "database.queries",  
 "tasks.max": "3",  
 "topic.prefix": "dbserver1",  
 "output.schema": "true",  
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",  
 "key.converter.schemas.enable": "false",  
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
 "value.converter.schemas.enable": "false",  
 "snapshot.mode": "initial",  
 "snapshot.max.threads": "4",  
 "provide.transaction.metadata": "true",  
 "errors.retry.timeout": "0",  
 "errors.tolerance": "none",  
 "heartbeat.interval.ms": "30000",  
 "max.queue.size": "8192",  
 "max.batch.size": "2048",  
 "poll.interval.ms": "1000",  
 "name": "mongodb-connector1"  
 },  
 "tasks": [],  
 "type": "source"  
}

**Перевірка статусу**

. Запит URL (HTTP — GET)

http://localhost:8083/connectors/mongodb-connector/status
```

. Відповідь

{  
 "name": "mongodb-connector",  
 "connector": {  
 "state": "RUNNING",  
 "worker_id": "172.18.0.7:8083"  
 },  
 "tasks": [  
 {  
 "id": 0,  
 "state": "RUNNING",  
 "worker_id": "172.18.0.7:8083"  
 }  
 ],  
 "type": "source"  
}

Для отримання додаткових деталей, будь ласка, перегляньте документацію нижче для MongoDB Debezium Connector, яка містить усі необхідні відомості.

[

Debezium connector для MongoDB

Debezium MongoDB connector відстежує реплікаційний набір MongoDB або шардинг MongoDB для змін документів у…

debezium.io

](https://debezium.io/documentation/reference/stable/connectors/mongodb.html?source=post_page-----ae620b20c98d--------------------------------)

Після завершення налаштування, ми можемо перевірити, чи працює Kafka dashboard, переконавшись у створенні всіх відповідних топіків у Kafka. Також можна перевірити повідомлення, що генеруються для відповідної колекції під топіком prefix.database_name.collection_name. (Наприклад, dbserver1.database.queries)

pic

Kafka Topics

Крок 5: Налаштування NestJS для споживання Kafka подій (Kafka Events)

Якщо ви зовсім новачок у NestJS, будь ласка, прочитайте статтю для початківців, що наведена нижче. Якщо ви шукаєте базовий шаблон Node.js з Express.js, зверніться до цього репозиторію.

const kafka = new Kafka({  
 clientId: this.appConfigService.kafkaClientId,  
 brokers: this.appConfigService.kafkaBrokers,  
});  
// consumer group для дозволу отримувати повідомлення з Kafka топіків у цій групі  
this.databaseConsumer = kafka.consumer({ groupId: 'change-tracker-group' });  

async onModuleInit() {  
 try {  
 await this.initializeConsumer(  
 this.databaseConsumer,  
 this.handleDatabaseChangeMessage,  
 );  
 } catch (error) {  
 console.error('Помилка під час ініціалізації споживача:', error);  
 }  
}

Тут ми бачимо, що є один group ID, і він не має зв’язку з попередньою конфігурацією, і створюється екземпляр Kafka споживача (Kafka Consumer), який підписується на конкретну споживацьку групу (change-tracker-group), дозволяючи йому отримувати повідомлення з Kafka топіків цієї групи.

Під час підписки на топік, ми повинні слідувати шаблону prefix.database_name.collection_name (Наприклад, dbserver1.database.queries), і нам потрібно звертатися до нашої конфігурації, яку ми надали раніше.

private async initializeConsumer(  
 consumer: Consumer,  
 messageHandler: (messagePayload: EachMessagePayload) => Promise,  
 ) {  
 try {  
 await consumer.connect();  

 await consumer.subscribe({  
 topic: 'dbserver1.database.queries',  
 fromBeginning: true,  
 });  

 await consumer.run({  
 eachMessage: async (messagePayload: EachMessagePayload) => {  
 try {  
 await messageHandler.call(this, messagePayload);  
 } catch (error) {  
 console.error(  
 `Помилка під час обробки повідомлення з топіку ${messagePayload.topic}:`,  
 error.stack,  
 );  
 }  
 },  
 });  
 } catch (error) {  
 console.error('Помилка при ініціалізації споживача:', error.stack);  
 }  
 }  

 private async handleDatabaseChangeMessage({ message }: EachMessagePayload) {  
 if (!message.value) {  
 console.warn('Отримано повідомлення без значення.');

Skipping...');  
 return;  
 }  

 try {  
 const { op, after, before } = JSON.parse(message.value.toString());  
 console.info(`message - ${message.value.toString()}`);  
 switch (op) {  
 case 'c':  
 case 'u':  
 case 'd':  
 await this.handleCacheInvalidation(op, after, before);  
 break;  
 default:  
 console.warn(`Невідома операція: ${op}`);  
 break;  
 }  
 } catch (error) {  
 console.error('Помилка при обробці повідомлення про зміну бази даних:', error.stack);  
 }  
 }

pic

Вихідний код (Source code)

Крок 6: Тестування налаштувань

Ми можемо виконати команду Docker Compose для запуску створеного файлу Docker Compose, а просте виконання npm start запустить застосунок.

pic

docker compose

  • Перевірте в Docker desktop, чи всі контейнери створені

pic

docker desktop

  • Запустіть застосунок NestJS

pic

npm start

  • Спробуйте створити або оновити запис у MongoDB.

pic

операція MongoDB

На знімку вище зліва ми створили або оновили запис, а справа ми бачимо повідомлення на споживацькому топіку, на який ми підписалися.

Нижче наведено payload повідомлення, яке ми отримали на топіку.

{  
 "before": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"[email protected]\",\"moduleName\": \"prediction\",\"queryText\": \"What will be today's temperature?\",\"metadata\": {\"priority\": \"high\"}}",  
 "after": "{\"_id\": {\"$oid\": \"67741cc0e35ad0bfd51115c8\"},\"userEmail\": \"[email protected]\",\"moduleName\": \"prediction\",\"queryText\": \"What will be today's temperature?\",\"metadata\": {\"priority\": \"high\"}}",  
 "patch": null,  
 "filter": null,  
 "updateDescription": {  
 "removedFields": null,  
 "updatedFields": "{\"userEmail\": \"[email protected]\"}",  
 "truncatedArrays": null  
 },  
 "source": {  
 "version": "2.0.1.Final",  
 "connector": "mongodb",  
 "name": "dbserver1",  
 "ts_ms": 1735662855000,  
 "snapshot": "false",  
 "db": "database",  
 "sequence": null,  
 "rs": "rs0",  
 "collection": "queries",  
 "ord": 2,  
 "lsid": null,  
 "txnNumber": null  
 },  
 "op": "u",  
 "ts_ms": 1735662855656,  
 "transaction": null  
}

Вихідний код (Source Code)

Будь ласка, перегляньте репозиторій з вихідним кодом для повного рішення нижче. Це не код, готовий до використання в продакшн-середовищі. Я чекаю на ваші коментарі та пропозиції, або тут, або в секції GitHub Issues.

[

GitHub - santoshshinde2012/cache-invalidation

Внесіть свій вклад у розробку проекту santoshshinde2012/cache-invalidation, створивши акаунт на GitHub.

github.com

](https://github.com/santoshshinde2012/cache-invalidation?source=post_page-----ae620b20c98d--------------------------------)

Дякую за прочитане. Будь ласка, поділіться своїми коментарями та поставте лайк, якщо цей блог був корисним для вашого навчання. Будь ласка, підпишіться на мене для подальших оновлень та чекаю на ваші коментарі.

Перекладено з: Design Cache Invalidation with Real-Time Data Tracking with MongoDB, Debezium, Kafka, and NestJS

Leave a Reply

Your email address will not be published. Required fields are marked *