Коли обробляєш великі обсяги даних, можна зіткнутися з різними викликами: як розподілити обробку на кілька етапів, паралелізація, механізм повторних спроб тощо. Після обробки даних, можливо, буде потрібно зберігати логи в реляційній базі даних для подальшого аналізу.
Для обробки великих обсягів даних можна використовувати Spring Batch — фреймворк для поділу обробки на частини, планування, повторних спроб і паралелізації обробки великих обсягів записів з можливістю збору статистики про виконання завдань через веб-інтерфейс адміністрування. У цьому матеріалі ми побачимо, як читати, обробляти і записувати дані з MongoDB за допомогою фреймворку Spring Batch. Також ми побачимо, як зберігати метадані обробки в таблиці Postgres для подальшого аналізу.
Ми досліджуємо основи фреймворку Spring Batch і будуємо систему для читання записів з MongoDB, трансформації та запису їх залежно від формату. Усі стани завдань Spring Batch відстежуються в базі даних Postgres. Код написаний на Kotlin і збирається за допомогою Gradle.
Існують інші матеріали, що пояснюють використання Spring Batch. Новизна цього матеріалу полягає в демонстрації використання як MongoDB (для читання даних), так і Postgres (для репозиторію завдань) в проекті Spring Batch і в тому, як будувати читач, записувач і процесор під час виконання програми, залежно від параметрів запиту.
Варіанти використання
Чому ми вирішили використовувати Spring Batch? Чи існують інші фреймворки для пакетної обробки? Згідно з даними theirstack.com, більше ніж 4000 компаній використовують Spring Batch для обробки великих обсягів даних. Наприклад, JobRunr — подібний фреймворк для пакетної обробки — також має близько 4000 встановлень, але кілька установок можуть належати одній компанії. Подібний фреймворк для планування пакетних завдань Quartz використовується тільки 85 компаніями. Це робить Spring Batch одним з найкорисніших інструментів для пакетної обробки великих обсягів даних. Крім того, він є частиною екосистеми Spring, що спрощує інтеграцію з проектами, які вже використовують Spring.
Ми інтегрували Spring Batch у Flowers Software, продукт для моделювання, побудови та управління бізнес-процесами. Spring Batch використовується для експорту сутностей з системи управління ресурсами Flowers та оновлення сутностей у системі управління ресурсами пакетами. Експорт повинен бути гнучким, оскільки користувач може вибрати інший формат (CSV, XML тощо) для експорту та різні трансформації, які слід виконати під час експорту (конкатенація рядків, математичні операції над числами тощо).
Дизайн Spring Batch
Перш ніж перейти до коду, давайте розглянемо, як працює Spring Batch. Це полегшить подальше розуміння.
Фреймворк Spring Batch складається з наступних основних компонентів:
- Job — представляє всю пакетну обробку. Це контейнер для кроків і може бути налаштований як серія компонентів кроків.
- Step — один етап або логічна одиниця роботи в межах завдання. Крок містить ItemReader, ItemWriter і ItemProcessor. Усі ці компоненти дозволяють виконувати операції ETL над пакетами записів.
- Item Reader — читає дані з джерела, яким може бути SQL або NoSQL база даних, читач, заснований на повідомленнях, або навіть API-читач.
- Item Writer — записує дані до певного виходу. Вихід може бути файлом, базою даних, брокером повідомлень або API.
- Item Processor — трансформує сутності, отримані від читача, і передає їх до процесора запису.
- Job Launcher — запускає один екземпляр завдання. Job Launcher координує життєвий цикл виконання завдання, ініціалізуючи репозиторій завдань — в пам'яті або в персистентному сховищі, де зберігаються стани завдань.
Рисунок 1 зображає основні компоненти Spring Batch. Як видно, кожен компонент пов'язаний з Job Repository — сховищем метаданих завдань.
Репозиторії завдань можуть бути персистентними в базі даних.
Рисунок 1.
Репозиторій завдань Spring Batch
Оскільки нас цікавить збір і аналіз метаданих завдань, нам потрібно зрозуміти, як Spring Batch працює з цими метаданими.
Для керування виконанням завдань, Spring Batch зберігає метадані виконання завдань та кроків у JobRepository
. Ці метадані включають статуси та назви запущених завдань, статуси кроків у кожному завданні, параметри завдань, а також кількість пропущених, успішно виконаних та неуспішних записів для читача (ItemReader), записувача (ItemWriter) та процесора (ItemProcessor). Ми можемо використовувати це для аналітики та налагодження після завершення завдання. Spring Batch підтримує H2, Postgres, HSQLDB та Apache Derby як вбудовані бази даних. Нещодавно Spring Batch почав підтримувати MongoDB для JobRepository
. Проте, використання реляційної персистентної бази даних має такі переваги:
- При використанні бази даних H2, після перезапуску додатку всі метадані Spring Batch будуть втрачені. H2 добре підходить для тестів і демонстрацій, але якщо потрібна довгострокова підтримка, краще використовувати персистентну базу даних.
- Репозиторій завдань Spring Batch спроектований для реляційних вбудованих баз даних.
JobRepository
залежить від структурованої схеми реляційної бази даних:job_instance
,job_execution
,step_execution
тощо. - Spring Batch підтримує Postgres на більш тривалий період, ніж Mongo, що робить Postgres більш стабільним для репозиторію завдань.
- Реляційні бази даних зручніші для аналітики порівняно з NoSQL базами даних.
Якщо потрібна довгострокова підтримка для аналітики, налагодження та консистентність даних, реляційна база даних підходить краще для репозиторію завдань у порівнянні з іншими варіантами. У цьому матеріалі ми використаємо Postgres для JobRepository
, оскільки Postgres — реляційна база даних з великою кількістю посібників і широкою спільнотою.
Демонстрація експортера Spring Batch
Тепер ми готові до прикладу реалізації. У демонстрації ми використовуємо наступні технології:
- Kotlin 1.9 — для написання коду.
- Gradle — для побудови проекту.
- Spring Boot 3.3.3 — як веб-фреймворк.
- Spring Batch 5.0.1 — для пакетних операцій.
- Docker — для запуску MongoDB та Postgres.
Ми розглядаємо додаток, який читає дані з колекції MongoDB, трансформує записи та експортує їх у різному форматі. Метадані завдань відслідковуються в таблицях Postgres.
Основні компоненти експортера Spring Batch:
MongoCursorItemReader
читає дані з Mongo і відображає їх наEmployeeMongoEntity
. Тут ми визначаємо ім'я колекції, запит Mongo, клас сутності та розмір партії для читання.JobItemCsvProcessor
отримує сутність Mongo з читача і трансформує її в CSV сутністьProcessedItemEntity
.JobCsvWriter
отримує сутності з процесора і записує їх у CSV файл через потік. Це один потік на виконання кроку. Потік відкривається в методі@BeforeStep
і закривається в методі@AfterStep
.SpringBatchJobConfiguration
— ця конфігурація об'єднує читач, записувач і процесор у крок. Потім один крок (або кілька кроків) об'єднуються в завдання.BatchJobRunnerService
— будує параметри завдання і запускає завдання черезJobLauncher
.
Код
Код розповідає більше, ніж тисяча слів, тож давайте перейдемо до нього!
Додаток написаний на Kotlin і збирається за допомогою Gradle. Ви можете знайти код у репозиторії GitHub.
Додаток потребує підключення до Postgres та MongoDB. З Docker можна легко запустити ці екземпляри і зосередитися на логіці Spring Batch. Файл docker-compose
містить конфігурації баз даних, необхідні для роботи Spring Batch:
- Postgres — для репозиторію Spring Batch. У папці
init-postgres
розміщено скрипт для ініціалізації таблиць сервісу Spring Batch. - MongoDB — база даних для читання даних для експорту.
СкриптInit-mongo.js
заповнює сутності в колекції MongoDB - Mongo-express — веб-інтерфейс для моніторингу колекції Mongo та сутностей
- Mongo-client — для виконання команд Mongo CLI, необхідний для Mongo-express
version: '3.8'services:
postgres:
image: postgres:17.2
container_name: postgres-db
restart: always
environment:
POSTGRES_USER: postgres_user
POSTGRES_PASSWORD: postgres_pwd
POSTGRES_DB: demo
ports:
- "5433:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init-postgres:/docker-entrypoint-initdb.d mongodb:
image: mongo:latest
container_name: mongo-db
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: mongo_user
MONGO_INITDB_ROOT_PASSWORD: mongo_pwd
MONGO_INITDB_DATABASE: demo
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
- ./init-mongo.js:/docker-entrypoint-initdb.d/init-mongo.js mongo-express:
image: mongo-express
restart: always
ports:
- "8081:8081"
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: mongo_user
ME_CONFIG_MONGODB_ADMINPASSWORD: mongo_pwd
ME_CONFIG_MONGODB_SERVER: mongodb
depends_on:
- mongodbvolumes:
postgres_data:
mongo_data:
Тепер нам потрібно налаштувати властивості для Spring Boot додатку. Це можна зробити в файлі application.yml
, який містить наступні налаштування:
Batch.job.enabled: false
— вимикає виконання завдань Spring Batch на початку роботи додаткуInitialize-schema: never
— запобігає автоматичному створенню схеми Spring Batch в базі даних. Ми запускаємо скрипт один раз на початку файлуdocker-compose
datasource
: конфігурація на основі JDBC для реляційних баз даних, в нашому випадку Postgresdata
: конфігурація для NoSQL баз даних, в нашому випадку MongoDB. Використовуються рідні драйвери Spring Data.
server:
port: 8098spring:
application:
name: batch-v5
batch:
job:
enabled: false
jdbc:
initialize-schema: never
datasource:
url: jdbc:postgresql://localhost:5433/demo
username: postgres_user
password: postgres_pwd
driver-class-name: org.postgresql.Driver
data:
mongodb:
uri: mongodb://mongo_user:mongo_pwd@localhost:27017/demo?authSource=admin
username: mongo_user
password: mongo_pwd
auto-index-creation: false
database: demo
Нарешті, ми готові зосередитися на Java коді. Додаток реалізує архітектуру модель-погляд-контролер. Модель представляє збережені та записані дані, які відображаються на класи EmployeeMongoEntity
та ProcessedItemEntity
, а також логіку для читання та запису даних. Погляд відповідає за трансформацію даних, визначену в процесорі (ItemProcessor). Контролер отримує запити та повертає відповіді.
Код поділений на 2 частини: для звичайного експорту запити йдуть через кінцеву точку /export
, а для гнучкого експорту — через кінцеву точку flexible/export
. Приклад звичайного експорту показує, як використовувати і Postgres, і MongoDB в одному додатку Spring Batch. Після цього ми розглянемо приклад гнучкого експорту, що показує, як налаштувати процесор (ItemProcessor) та записувач (ItemWriter) після запуску додатку. У цій частині ми зосереджуємося на звичайному експорті, а наступна частина описує гнучкий експорт.
Для тестування ми створюємо JobController
, який отримує запит /export
для запуску пакетного завдання та передає конфігурацію експорту в BatchJobRunnerService
. BatchJobRunnerService
запускає завдання через JobLauncher
, а запускач отримує завдання, визначене в класі конфігурації. Інший компонент Spring Batch, MongoJobEntityReader
, належить до рівня моделі та звертається до MongoDB для читання сутностей та передає їх до екземпляра процесора (ItemProcessor). Він читає та передає процесору тільки ті записи, які мають відповідну роль — ця логіка визначена в методі MongoJobEntityReader#setQuery
.
Щоб зробити компонент Spring Batch видимим у контексті Spring Boot, ми повинні його увімкнути.
Щоб увімкнути spring batch, ми створюємо конфігураційний клас SpringBatchJobConfiguration
і додаємо анотацію @EnableBatchProcessing
.
Гнучкий експорт
А що якщо ми хочемо змінити тип експорту завдання, коли ми запускаємо запит на експорт? Проблема з визначенням читача (reader), записувача (writer) та процесора (processor) в конфігураційному файлі полягає в тому, що бін spring batch ініціалізується один раз на початку роботи додатку. Ми не можемо створювати різні читачі, записувачі чи процесори під час виконання. Наприклад, нам потрібно записувати дані у форматі XML або CSV залежно від параметрів запиту. Тоді записувач повинен бути визначений під час виконання на основі параметрів запиту.
Для вирішення цієї проблеми ми змінюємо архітектуру додатку. Нова архітектура передбачає гнучкий експорт, де тип експорту можна змінювати, коли додаток вже працює. Ми використовуємо патерн фабрики для отримання процесора (ItemProcessor) та записувача (ItemWriter) на основі параметрів запиту. JobWriterFactory
створює записувачі, а JobProcessorFactory
створює процесори на основі ExportType
. Якщо в запиті є параметр для експорту в CSV, методи фабрики створюють CSV процесор (ItemProcessor) та CSV записувач (ItemWriter). Також, на етапі процесора можна застосовувати різні трансформації до сутностей Mongo залежно від параметрів запиту.
Пакетний процесор (Batch Processor) може бути ще гнучкішим. Припустимо, для експорту у CSV необхідно застосовувати різні трансформації залежно від запитуваних даних. Якщо поле role
є значенням Engineer, то потрібно змінити його на Manager (давайте підвищимо кожного інженера 😃) — це одна трансформація. Ми можемо ввести фабрику трансформацій і отримувати трансформацію на основі параметрів завдання. Для простоти я не включив цю частину на малюнку.
Архітектура модель-погляд-контролер (Model-View-Controller) та налаштування інфраструктури для гнучкого експорту такі ж, як і для звичайного експорту. Для тестування ми викликаємо кінцеву точку flexible/export
, яка отримує запит і передає його в BatchFlexibleJobRunnerService
. Потім сервіс-раннер будує крок та завдання, коли отримує параметр запиту від контролера. Це основна особливість гнучкого експорту: на основі параметрів запиту завдання складається під час виконання, а не на початку додатку через конфігурацію.
Якщо ми хочемо викликати різні типи трансформацій, ми надаємо різні параметри запиту. Тоді процесор надає трансформовану сутність відповідному записувачу, створеному JobWriterFactory
. Після обробки та запису всіх пакетів завдання завершене.
Запуск додатку
Кроки для запуску додатку можна знайти в reade.me на GitHub, тут ми також побачимо результати цих команд.
- Спочатку потрібно налаштувати інфраструктуру для додатку. Запустіть екземпляр бази даних, виконавши команду
docker compose up
з каталогуspring-batch-demo
. Після запуску можна перевірити, що необхідні контейнери працюють.
-
Запустіть додаток командою
./gradlew bootRun
. -
Додаток готовий приймати запити, і ми викликаємо GET HTTP кінцеву точку для запуску завдання:
http://localhost:8098/export?exportType=CSV
. -
Коли ми отримаємо відповідь від кінцевої точки, можемо знайти файл
output.csv
з експортованими даними в каталозі проекту.
- Давайте перевіримо метадані. Для цього виконуємо запит SELECT на екземплярі Postgres:
Кроки для запуску додатку однакові, але щоб почати гнучкий експорт, потрібно викликати наступну GET HTTP кінцеву точку: http://localhost:8098/export?exportType=CSV. Результати для експорту в CSV можна знайти в тому ж файлі output.csv
.
Висновок
У цьому пості ми обговорили основні компоненти фреймворку Spring Batch. Ми побачили, як Spring Batch можна використовувати для виконання експорту з різними типами форматів експорту та трансформацій.
У наступному пості ми розглянемо, як додати метрики та моніторинг для завдань Spring Batch з використанням OpenTelemetry.
Перекладено з: Using Postgres as jobRepository for exporting data from MongoDB with SpringBatch