Transactional Outbox, використовується в розподілених системах та мікросервісних архітектурах для забезпечення консистентності даних. Цей шаблон допомагає уникнути несумісностей між операціями бази даних і системами повідомлень. Він є критично важливим у ситуаціях, коли мікросервіси обмінюються повідомленнями через системи черг (наприклад, RabbitMQ, Kafka).
Зазвичай, операції бази даних та відправка повідомлень є незалежними одна від одної, що може призвести до несумісності даних. Transactional Outbox об’єднує ці операції. Тобто, під час виконання операції бази даних, повідомлення, яке має бути відправлене, зберігається в таблиці "outbox". Потім ці повідомлення асинхронно передаються в черги зовнішнім сервісом.
Основний принцип роботи
- Оновлення бази даних і збереження повідомлення:
- Коли розпочинається операція (наприклад, створення замовлення), повідомлення, що пов’язане з цією операцією, одночасно зберігається в таблиці
outbox
бази даних. Таким чином, операція з базою даних і передача повідомлення виконуються в межах однієї транзакції.
- Відправка повідомлень з Outbox в чергу:
- Фоновий сервіс періодично перевіряє таблицю
outbox
і, отримавши повідомлення зі статусомPENDING
, передає їх у відповідну чергу повідомлень (наприклад, RabbitMQ, Kafka).
- Оновлення після успішної відправки:
- Якщо повідомлення успішно надіслано в чергу, його статус оновлюється на
SENT
. Якщо передача повідомлення не вдалася, статус оновлюється наFAILED
.
Цей шаблон запобігає несумісностям між базою даних і системами повідомлень, забезпечуючи консистентність роботи системи.
Реалізація Transactional Outbox на Java
Тепер давайте розглянемо, як можна реалізувати Transactional Outbox шаблон у додатку на Java Spring Boot.
1. Структура бази даних
Спочатку таблиця outbox в базі даних виглядатиме так:
CREATE TABLE outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id BIGINT NOT NULL,
message_type VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
- aggregate_id: Основний ідентифікатор для пов’язаного бізнес-об’єкта (наприклад, ID замовлення).
- message_type: Тип повідомлення (наприклад,
OrderCreated
). - payload: Вміст повідомлення, зазвичай у форматі JSON.
- status: Статус повідомлення (спочатку
PENDING
, потім може бутиSENT
абоFAILED
). - created_at: Час створення повідомлення.
- updated_at: Час оновлення повідомлення.
2. Структура Spring Boot
а. Залежності для Maven
Додайте необхідні залежності до файлу pom.xml
:
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-data-jpa
com.h2database
h2
runtime
org.springframework.boot
spring-boot-starter-amqp
com.fasterxml.jackson.core
jackson-databind
org.springframework.boot
spring-boot-starter-test
б.
Transactional Outbox, використовується в розподілених системах та мікросервісних архітектурах для забезпечення консистентності даних. Цей шаблон допомагає уникнути несумісностей між операціями бази даних і системами повідомлень. Він є критично важливим у ситуаціях, коли мікросервіси обмінюються повідомленнями через системи черг (наприклад, RabbitMQ, Kafka).
Зазвичай, операції бази даних та відправка повідомлень є незалежними одна від одної, що може призвести до несумісності даних. Transactional Outbox об’єднує ці операції. Тобто, під час виконання операції бази даних, повідомлення, яке має бути відправлене, зберігається в таблиці "outbox". Потім ці повідомлення асинхронно передаються в черги зовнішнім сервісом.
JPA Entity: Таблиця Outbox Повідомлень
JPA сутність OutboxMessage
забезпечує збереження повідомлень у таблиці outbox
:
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
public class OutboxMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private Long aggregateId;
@Column(nullable = false)
private String messageType;
@Column(nullable = false, columnDefinition = "TEXT")
private String payload;
@Column(nullable = false)
private String status = "PENDING"; // 'PENDING', 'SENT', 'FAILED'
@Column(nullable = false)
private LocalDateTime createdAt = LocalDateTime.now();
@Column(nullable = false)
private LocalDateTime updatedAt = LocalDateTime.now();
// Getters and setters...
}
JPA Репозиторій: Репозиторій Повідомлень Outbox
Створюємо клас репозиторію для отримання та оновлення повідомлень:
import org.springframework.data.jpa.repository.JpaRepository;
public interface OutboxMessageRepository extends JpaRepository {
// Отримуємо повідомлення зі статусом 'PENDING'
List findByStatus(String status);
}
Сервіс для Відправки Повідомлень
Сервіс, який зберігає повідомлення в таблиці outbox
та одночасно відправляє їх назовні:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class OutboxService {
@Autowired
private OutboxMessageRepository outboxMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "order-events";
// Зберігаємо повідомлення в базі даних
public void saveMessage(Long aggregateId, String messageType, String payload) {
OutboxMessage message = new OutboxMessage();
message.setAggregateId(aggregateId);
message.setMessageType(messageType);
message.setPayload(payload);
outboxMessageRepository.save(message);
}
// Обробляємо повідомлення зі статусом PENDING та відправляємо їх у чергу
public void processPendingMessages() {
List pendingMessages = outboxMessageRepository.findByStatus("PENDING");
for (OutboxMessage message : pendingMessages) {
try {
// Відправляємо повідомлення в чергу RabbitMQ
rabbitTemplate.convertAndSend(QUEUE_NAME, message.getPayload());
// Оновлюємо статус повідомлення на 'SENT'
message.setStatus("SENT");
outboxMessageRepository.save(message);
} catch (Exception e) {
// Якщо відправка повідомлення не вдалася, позначаємо як 'FAILED'
message.setStatus("FAILED");
outboxMessageRepository.save(message);
}
}
}
}
Сервіс для Створення Замовлення та Відправки Повідомлень
Процес створення замовлення та додавання повідомлення до таблиці outbox:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private OutboxService outboxService;
public void createOrder(Long orderId, String orderDetails) {
// Зберігаємо замовлення в базі даних (у цьому прикладі це просто імітується)
// При створенні замовлення додаємо повідомлення до таблиці outbox
String payload = "{ \"orderId\": " + orderId + ", \"orderDetails\": \"" + orderDetails + "\" }";
outboxService.saveMessage(orderId, "OrderCreated", payload);
}
}
**Запуск додатку та заплановане завдання**
Для автоматизації відправки повідомлень використовується **заплановане завдання (Scheduled Task)**:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MessageProcessor {
@Autowired
private OutboxService outboxService;
// Перевіряємо повідомлення в Outbox кожні 10 секунд і відправляємо їх
@Scheduled(fixedRate = 10000)
public void processMessages() {
outboxService.processPendingMessages();
}
}
```
Конфігурація
Проста конфігурація Spring Boot для RabbitMQ:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitConfig {
// Конфігурацію RabbitMQ можна додати тут. Як хочеш, так і роби.
}
Дизайн механізму повторних спроб (Retry)
Коли розробляється механізм повторних спроб, важливо, щоб повторні спроби відправки повідомлень виконувались безпечно і ефективно. Нижче наведено опис того, як може бути спроектовано механізм повторних спроб, а також приклад Java коду.
Основні принципи механізму повторних спроб (Retry)
- Максимальна кількість спроб (Max Retry Count):
- Кількість повторних спроб для одного повідомлення повинна бути обмежена. В іншому випадку може виникнути нескінченний цикл. Зазвичай кількість спроб обмежується певним лімітом (
maxRetries
).
- Стратегії затримки (Backoff Strategy):
- При кожній повторній спробі час очікування може збільшуватися в залежності від попередніх невдалих спроб. Це дозволяє зменшити навантаження на систему, адже повідомлення будуть пробувати відправлятися, але не зловживатимуть ресурсами. Exponential Backoff (експоненціальна затримка) є популярним методом, при якому час очікування подвоюється після кожної невдалої спроби.
- Оновлення стану:
- При кожній повторній спробі необхідно оновлювати стан повідомлення. Якщо кількість спроб перевищує максимально допустиму, повідомлення можна позначити як
FAILED
. В іншому випадку, при успішній спробі, стан оновлюється наSENT
.
- Логування та моніторинг:
- Важливо логувати та моніторити помилки під час повторних спроб, щоб можна було легко проаналізувати, чому певні повідомлення не вдалося доставити.
- Резервне копіювання та управління подіями:
- Якщо повторні спроби постійно не вдаються, повідомлення можна перенаправити до іншої черги (наприклад, черга "dead-letter"), щоб проблемні повідомлення могли бути переглянуті пізніше.
Механізм повторних спроб у Java
Тепер давайте додамо механізм повторних спроб до нашого прикладу з Spring Boot та RabbitMQ.
1. Оновлення OutboxService
Додаємо механізм повторних спроб до процесу відправки повідомлень.
Крім того, при кожній повторній спробі додамо певний час очікування (backoff).
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class OutboxService {
@Autowired
private OutboxMessageRepository outboxMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String QUEUE_NAME = "order-events";
private static final int MAX_RETRIES = 3; // Максимальна кількість повторних спроб
private static final int INITIAL_BACKOFF = 1000; // Початковий час очікування (мс)
private static final int BACKOFF_MULTIPLIER = 2; // Експоненційне збільшення
// Зберігає повідомлення в базі даних
public void saveMessage(Long aggregateId, String messageType, String payload) {
OutboxMessage message = new OutboxMessage();
message.setAggregateId(aggregateId);
message.setMessageType(messageType);
message.setPayload(payload);
outboxMessageRepository.save(message);
}
// Отримує повідомлення з Outbox і відправляє їх до черги
public void processPendingMessages() {
List pendingMessages = outboxMessageRepository.findByStatus("PENDING");
for (OutboxMessage message : pendingMessages) {
boolean sent = false;
int retryCount = 0;
while (!sent && retryCount <= MAX_RETRIES) {
try {
// Відправляємо в чергу RabbitMQ
rabbitTemplate.convertAndSend(QUEUE_NAME, message.getPayload());
// Повідомлення успішно відправлено, оновлюємо його статус
message.setStatus("SENT");
outboxMessageRepository.save(message);
sent = true;
} catch (Exception e) {
// Сталася помилка, пробуємо ще раз
retryCount++;
if (retryCount > MAX_RETRIES) {
// Якщо досягнуто максимальної кількості спроб, оновлюємо статус на FAILED
message.setStatus("FAILED");
outboxMessageRepository.save(message);
} else {
// Збільшуємо час очікування і пробуємо ще раз
long backoffTime = INITIAL_BACKOFF * (long) Math.pow(BACKOFF_MULTIPLIER, retryCount);
try {
TimeUnit.MILLISECONDS.sleep(backoffTime);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
}
}
}
Пояснення:
- MAX_RETRIES: Визначає максимальну кількість повторних спроб для одного повідомлення. Повідомлення можна повторно спробувати до досягнення цього ліміту. Якщо ліміт перевищено, статус повідомлення буде встановлено як
FAILED
. - INITIAL_BACKOFF: Час очікування після першої невдалої спроби. Спочатку цей час визначається в мілісекундах.
- BACKOFF_MULTIPLIER: Коефіцієнт експоненційного збільшення часу очікування. Після кожної невдалої спроби час очікування збільшується в
BACKOFF_MULTIPLIER
разів. - TimeUnit.MILLISECONDS.sleep(backoffTime): Після кожної невдалої спроби ми чекаємо певний час, який збільшується експоненційно.
- Обробка виключень (Exception Handling): Якщо під час відправки повідомлення виникає помилка, ми робимо повторну спробу. Якщо кількість спроб досягла максимального ліміту, статус повідомлення змінюється на
FAILED
.
2. Оновлення запланованого завдання (Scheduled Task)
Заплановане завдання, яке регулярно передає повідомлення, буде працювати з тим самим кодом, але вже з доданим механізмом повторних спроб.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class MessageProcessor {
@Autowired
private OutboxService outboxService;
// Кожні 10 секунд перевіряємо повідомлення в Outbox і відправляємо їх
@Scheduled(fixedRate = 10000)
public void processMessages() {
outboxService.processPendingMessages();
}
}
3. Конфігурація RabbitMQ
Для RabbitMQ не потрібна додаткова конфігурація для механізму повторних спроб, але в разі невдачі корисно використовувати dead-letter queue (DLQ).
Тобто, якщо механізм повторних спроб (retry) не вдасться досягти успіху після певної кількості спроб, повідомлення можна перенаправити в іншу чергу (dead-letter queue), де вони можуть бути опрацьовані вручну або автоматично.
Наприклад, ви можете додати конфігурацію DLQ в файл application.yml
:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000ms
multiplier: 2.0
max-interval: 10000ms
4. Dead-letter Queue (Опційно)
Dead-letter queue — це хороший спосіб для відстеження невдалих повідомлень і їх подальшої обробки вручну. Якщо механізм повторних спроб не вдасться, можна зберігати повідомлення в іншій черзі.
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("deadLetterQueue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
public Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("deadLetterRoutingKey");
}
5. Логування та моніторинг
Кожен невдалий спроби та успішну відправку повідомлень необхідно логувати, що допомагає відстежувати процес повторних спроб.
private static final Logger logger = LoggerFactory.getLogger(OutboxService.class);
// У разі помилки
logger.error("Message sending failed. Retrying... attempt {}/{}", retryCount, MAX_RETRIES);
// Якщо повідомлення успішно надіслано
logger.info("Message successfully sent to the queue: {}", message.getId());
Результат
У цьому прикладі ми успішно реалізували шаблон Transactional Outbox за допомогою Java Spring Boot. Це дозволяє забезпечити безпечну та консистентну передачу повідомлень між мікросервісами. Щоб уникнути невідповідностей між базою даних та системами повідомлень, повідомлення зберігаються в таблиці outbox, а процес, який працює у фоновому режимі, передає їх для відправки.
Додаток працює таким чином:
- Коли створюється замовлення,
OrderService
додає повідомлення до таблиці outbox. - Клас
MessageProcessor
перевіряє ці повідомлення кожні 10 секунд і відправляє їх до черги RabbitMQ. - Якщо повідомлення успішно відправлено, його статус оновлюється на
SENT
, в іншому випадку — наFAILED
. - Механізм повторних спроб (Retry): Повідомлення, які не вдалося передати до черги, пробують знову через певний час. Якщо кількість спроб перевищує ліміт, повідомлення позначається як
FAILED
. - Стратегія Backoff: Кожна невдала спроба збільшує час очікування (exponential backoff).
- Логування та моніторинг: Помилки можна відслідковувати, а в разі потреби повідомлення можуть бути перенаправлені до dead-letter queue.
Такий підхід гарантує, що повідомлення не будуть втрачені, і система працюватиме консистентно.
"Microservices Patterns: With examples in Java” by Chris Richardson:
- Книга охоплює популярні шаблони для мікросервісів і детально пояснює шаблон Transactional Outbox. Вона також надає приклади та Java-коди для реалізації шаблону. Для більш детальної інформації можна подивитись сторінку 98.
- Посилання: Microservices Patterns на Amazon
Перекладено з: Transactional Outbox Pattern.