Прослуховування подій з Redis у вашому додатку Spring Boot

В останній час, під час розробки нової функції, яку моя команда мала реалізувати, ми зіткнулися з наступною проблемою: після того, як наші дані вийшли з терміну дії і були видалені з кешу (Redis), потрібно було ініціювати бізнес-потік. Під час початкового технічного уточнення команда обговорила способи реалізації цього правила, і з'явилися ідеї, такі як простий CronJob або навіть Quartz.

Однак, після проведення деяких досліджень, ми виявили мало відому, але надзвичайно корисну функцію, а саме KeySpaceNotifications. Ця функція дозволяє слухати події, що стосуються ключів, наприклад, коли ключі встановлюються, видаляються або протікають. Ці сповіщення дозволяють додаткам ініціювати бізнес-логіку в реальному часі на основі подій Redis. Знання, отримане з цієї функції, надихнуло мене написати цю статтю для моїх дорогих читачів на Medium.

В цьому рішенні ми використовуватимемо наступні технології: Java, Spring Boot (3.x), Redis, Docker і Gradle

pic

Вступ

У світі сучасної розробки програмного забезпечення продуктивність і масштабованість є критично важливими. Додатки повинні обробляти мільйони запитів від користувачів на секунду, надавати відповіді за мілісекунди та забезпечувати мінімальну затримку. Саме тут на допомогу приходить Redis, відкритий сховище даних у пам'яті.

Redis (Remote Dictionary Server) призначений для високопродуктивних операцій і забезпечує субмілісекундні часи відповіді. Його широко використовують у різних галузях для таких випадків, як кешування, аналітика в реальному часі, керування сесіями та публікація/підписка (pub/sub) повідомлень.

Чому Redis?

Redis відмінно підходить для сценаріїв, де швидкість є пріоритетом, завдяки своїй архітектурі в пам'яті. На відміну від традиційних баз даних, які використовують дискові операції вводу/виводу, Redis зберігає дані в ОЗУ, що значно знижує час доступу. Це робить його ідеальним вибором для кешування часто використовуваних даних, зменшення навантаження на основні бази даних і підвищення загальної продуктивності додатків.

Redis як кеш

Кешування — це стратегія зберігання часто використовуваних даних ближче до додатку для зменшення затримки і обчислювальних витрат. Redis особливо добре підходить для кешування через:

  • Швидкість: Redis виконує мільйони операцій за секунду з мінімальною затримкою.
  • Структури даних: Він підтримує розширені типи даних, такі як хеші, списки, множини, відсортовані множини та інші, що робить його універсальним для складних сценаріїв кешування.
  • TTL (Time-to-Live): Redis дозволяє встановлювати час життя ключів, забезпечуючи автоматичне видалення застарілих даних.

Великомасштабне використання Redis

Redis став основною технологією для глобальних організацій. Компанії, такі як Twitter, Uber, Netflix і Pinterest, використовують Redis для підтримки важливих систем. Його використання підтримується наступними перевагами:

  • Масштабованість: Redis може обробляти мільйони одночасних підключень, що робить його ідеальним для розподілених систем.
  • Гнучкість: Завдяки таким можливостям, як сповіщення про простір ключів, публікація/підписка на повідомлення та скрипти Lua, Redis є більше, ніж просто кешем.
  • Open-Source та Cloud-Native: Redis є відкритим програмним забезпеченням і має потужні керовані сервіси, такі як AWS ElastiCache, Azure Cache for Redis і Google Cloud Memorystore.

Що таке сповіщення про простір ключів Redis?

Сповіщення про простір ключів Redis дозволяють клієнтам підключатися до каналів Pub/Sub, які транслюють події, пов'язані з ключами. Ці події можуть бути ініційовані операціями, такими як встановлення ключа, протікання ключа або видалення ключа.

Сповіщення про простір ключів за замовчуванням вимкнені і можуть бути активовані шляхом налаштування параметра notify-keyspace-events у Redis.
Сповіщення поділяються на події простору ключів (специфічні для ключів) і сповіщення про ключові події (специфічні для дій).

Які події можна слухати?

Ось деякі поширені шаблони, які ви можете використовувати для прослуховування подій Redis:

pic

Частина @* вказує на індекс бази даних, що дозволяє забезпечити гнучкість для роботи з кількома базами даних.

Налаштування Redis

pic

Фото від ThisisEngineering на Unsplash

Увімкнення сповіщень простору ключів у Redis

Щоб увімкнути сповіщення простору ключів, оновіть параметр notify-keyspace-events у файлі конфігурації Redis (redis.conf) або скористайтеся командою CONFIG SET. Ви можете налаштувати сповіщення, використовуючи рядок конфігурації notify-keyspace-events. Найпоширеніші опції включають:

  • K: Події простору ключів.
  • E: Сповіщення про ключові події.
  • A: Псевдонім для всіх подій.
  • $: Строкові команди (set, append тощо).
  • g: Загальні команди (del, expire тощо).
  • x: Події, пов'язані з терміном придатності.
  • l: Операції зі списками.
  • h: Операції з хешами.
  • z: Операції з відсортованими множинами.

Приклад 1: Ця конфігурація увімкне сповіщення для протіклих (E) і видалених (x) ключів.

CONFIG SET notify-keyspace-events Ex

Приклад 2: Щоб увімкнути всі сповіщення, використовуйте:

CONFIG SET notify-keyspace-events KEA

Розгортання Redis локально

Ви можете використовувати наступний файл docker-compose для запуску Redis локально:

version: '3.9'  
services:  
 redis:  
 image: redis/redis-stack:7.2.0-v4  
 container_name: redis  
 environment:  
 - REDIS_ARGS=--notify-keyspace-events KEA # Увімкнути Redis Keyspace для всіх сповіщень  
 ports:  
 - 6379:6379  
 - 8001:8001

Доступ через браузер

pic

Знімок екрана від автора.
http://localhost:8001_

Практична робота з Java та Spring Boot

Давайте реалізуємо додаток на Spring Boot, щоб слухати події ключів Redis і застосовувати бізнес-логіку.

Передумови:

  • Redis встановлений і працює
  • Java 21+
  • Spring Boot (Spring Data Redis)

Приклад для Gradle:

plugins {  
 id 'java'  
 id 'org.springframework.boot' version '3.4.1'  
 id 'io.spring.dependency-management' version '1.1.7'  
}  

group = 'br.com.ldf.medium'  
version = '0.0.1-SNAPSHOT'  

java {  
 toolchain {  
 languageVersion = JavaLanguageVersion.of(21)  
 }  
}  

configurations {  
 compileOnly {  
 extendsFrom annotationProcessor  
 }  
}  

repositories {  
 mavenCentral()  
}  

dependencies {  
 implementation 'org.springframework.boot:spring-boot-starter-web'  
 implementation 'org.springframework.boot:spring-boot-starter-data-jpa'  
 implementation 'org.springframework.boot:spring-boot-starter-validation'  
 implementation 'org.springframework.boot:spring-boot-starter-data-redis'  
 implementation 'org.springframework.boot:spring-boot-starter-cache'  
 implementation 'org.springdoc:springdoc-openapi-starter-webmvc-ui:2.2.0'  
 implementation 'org.flywaydb:flyway-core'  
 compileOnly 'org.projectlombok:lombok'  
 runtimeOnly 'com.h2database:h2'  
 annotationProcessor 'org.projectlombok:lombok'  
 implementation 'org.mapstruct:mapstruct:1.5.5.Final'  
 annotationProcessor 'org.mapstruct:mapstruct-processor:1.5.5.Final'  
 testImplementation 'org.springframework.boot:spring-boot-starter-test'  
 testRuntimeOnly 'org.junit.platform:junit-platform-launcher'  
}  

tasks.named('test') {  
 useJUnitPlatform()  
}

Налаштування Spring для Redis

Налаштуйте ваш application.yml

Application.yml:

spring:  
 application:  
 name: events-from-redis  
 threads:  
 virtual:  
 enabled: true  
 datasource:  
 url: jdbc:h2:mem:testdb;MODE=PostgreSQL  
 driverClassName: org.h2.Driver  
 username: sa  
 password:  
 jpa:  
 show-sql: true  
 database-platform: org.hibernate.dialect.H2Dialect  
 hibernate:  
 ddl-auto: create-drop  
 cache:  
 type: redis  
 redis:  
 time-to-live: 60000 # 1 хвилина   
 data:  
 redis:  
 host: localhost  
 port: 6379

Створіть клас конфігурації для налаштування Redis і увімкнення Pub/Sub.

RedisConfig

@EnableCaching  
@Configuration  
@EnableRedisRepositories(enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.ON_STARTUP)  
public class RedisConfig {  

 @Bean  
 public RedisConnectionFactory redisConnectionFactory() {  
 return new LettuceConnectionFactory();  
 }  

 @Bean  
 public MessageListenerAdapter listenerAdapter(@Lazy MessageListener listener) {  
 return new MessageListenerAdapter(listener);  
 }  

 @Bean  
 public RedisMessageListenerContainer container(  
 RedisConnectionFactory connectionFactory,  
 MessageListenerAdapter listenerAdapter  
 ) {  
 RedisMessageListenerContainer container = new RedisMessageListenerContainer();  
 container.setConnectionFactory(connectionFactory);  
 // Додаємо слухачів для всіх тем  
 Arrays.stream(RedisPatternTopic.values())  
 .forEach(  
 topic -> container.addMessageListener(listenerAdapter, new PatternTopic(topic.getTopic()))  
 );  
 return container;  
 }  

 @Bean  
 public RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {  
 RedisTemplate template = new RedisTemplate<>();  
 template.setConnectionFactory(connectionFactory);  

 RedisSerializer stringSerializer = new StringRedisSerializer();  
 RedisSerializer

RedisConfig клас є класом конфігурації Spring, який налаштовує Redis-пов'язані біні та конфігурації для вашого додатку Spring Boot.
Ось розбір його компонентів:

**Анотації:**

- [@EnableCaching](http://twitter.com/EnableCaching): Вмикає можливість керування кешем, керовану анотаціями Spring
- [@Configuration](http://twitter.com/Configuration): Позначає, що клас може бути використаний контейнером Spring IoC як джерело визначень бінів
- [@EnableRedisRepositor](http://twitter.com/EnableRedisRepositor)ies: Вмикає репозиторії Redis і події ключового простору

**Біни**

- redisConnectionFactory(): Створює бін LettuceConnectionFactory, який надає з'єднання з сервером Redis
- listenerAdapter(MessageListener listener): Створює бін MessageListenerAdapter, який адаптує MessageListener до інфраструктури прослуховувача повідомлень Redis
- container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter): Створює бін RedisMessageListenerContainer, який керує прослуховувачами повідомлень Redis та їх підписками на теми
- redisTemplate(RedisConnectionFactory connectionFactory): Створює бін RedisTemplate, який надає високорівневі абстракції для взаємодії з Redis, включаючи налаштування серіалізації для ключів і значень.

**RedisPatternTopic**

/**
* Перелічення для представлення шаблонів тем Redis.
/
@Getter
public enum RedisPatternTopic {
EXPIREDEVENT("_keyevent@
:expired", "expired"),
SET_EVENT("
keyevent@:set", "set"),
DELETE_EVENT("
keyevent@
:del", "del"),
EVICT_EVENT("
keyevent@*__:evict", "evict");

private final String topic;
private final String operation;

RedisPatternTopic(String topic, String operation) {
this.topic = topic;
this.operation = operation;
}

public static RedisPatternTopic from(String topic) {
for (RedisPatternTopic redisPatternTopic : values()) {
if (redisPatternTopic.operation.equals(topic.split(":")[1])) {
return redisPatternTopic;
}
}
throw new IllegalArgumentException("Invalid topic: " + topic);
}
}
```

RedisEventListener

/**  
 * Прослуховувач для обробки подій "вичерпано" з Redis.  
 */  
@Slf4j  
@Component  
@RequiredArgsConstructor  
public class RedisEventListener implements MessageListener {  

 private final Set handlers;  

 @Override  
 public void onMessage(Message message, byte[] pattern) {  
 var eventTopic = RedisPatternTopic.from(new String(message.getChannel()));  
 log.info("message:{}, event-topic={}", message, eventTopic);  

 handlers.stream()  
 .filter(handler -> handler.accept(eventTopic))  
 .findFirst()  
 .ifPresent(handler -> handler.handle(new String(message.getBody())));  
 }  
}

Пояснення:

Клас RedisEventListener є компонентом, який слухає події ключового простору Redis і обробляє їх за допомогою набору стратегій. Ось розбір його компонентів:

  • handlers: Набір екземплярів HandlerRedisEventStrategy, які визначають, як обробляти різні події Redis
  • onMessage(Message message, byte[] pattern): Цей метод викликається, коли з Redis надходить повідомлення.
    Перетворює канал повідомлення на RedisPatternTopic і доставляє до відповідного обробника

HandlerRedisEventStrategy:

public interface HandlerRedisEventStrategy {  

 boolean accept(RedisPatternTopic redisPatternTopic);  

 void handle(String key);  
}

HandlerRedisEventStrategySet:

@Slf4j  
@Component  
public class HandlerRedisEventStrategySet implements HandlerRedisEventStrategy {  

 @Override  
 public boolean accept(RedisPatternTopic redisPatternTopic) {  
 return RedisPatternTopic.SET_EVENT.equals(redisPatternTopic);  
 }  

 @Override  
 public void handle(String key) {  
 log.info("Обробка встановленого ключа: {}", key);  
 }  
}

HandlerRedisEventStrategyDel:

@Slf4j  
@Component  
public class HandlerRedisEventStrategyDel implements HandlerRedisEventStrategy {  

 @Override  
 public boolean accept(RedisPatternTopic redisPatternTopic) {  
 return RedisPatternTopic.DELETE_EVENT.equals(redisPatternTopic);  
 }  

 @Override  
 public void handle(String key) {  
 log.info("Обробка видаленого ключа: {}", key);  
 }  
}

HandlerRedisEventStrategyExpired:

@Slf4j  
@Component  
public class HandlerRedisEventStrategyExpired implements HandlerRedisEventStrategy {  

 @Override  
 public boolean accept(RedisPatternTopic redisPatternTopic) {  
 return RedisPatternTopic.EXPIRED_EVENT.equals(redisPatternTopic);  
 }  

 @Override  
 public void handle(String key) {  
 log.info("Обробка вичерпаного ключа: {}", key);  
 }  
}

Тестування

Для тестування інтеграції Redis pub/sub ми можемо створити простий EmployeeController для керування операціями CRUD.
Цей контролер дозволяє виконувати основні операції, одночасно демонструючи, як публікувати події на каналах Redis.

EmployeeController:

@RestController  
@RequestMapping(EmployeeController.EMPLOYEES_API_PATH)  
@RequiredArgsConstructor  
public class EmployeeController {  

 public static final String EMPLOYEES_API_PATH = "/api/employees";  

 private final EmployeeChangeUseCase employeeChangeUseCase;  
 private final EmployeeSearchUseCase employeeSearchUseCase;  
 private final EmployeeApplicationMapper mapper;  

 @GetMapping(value = "/{id}", produces = MediaType.APPLICATION_JSON_VALUE)  
 public ResponseEntity getById(@PathVariable Long id) {  
 return ResponseEntity.ok(employeeSearchUseCase.getById(id));  
 }  

 @PostMapping(consumes = MediaType.APPLICATION_JSON_VALUE)  
 public ResponseEntity create(@RequestBody @Validated EmployeeRequest request) {  
 var employee = employeeChangeUseCase.create(mapper.mapToModel(request));  
 URI location = ServletUriComponentsBuilder.fromCurrentRequest()  
 .path("/{id}")  
 .buildAndExpand(employee.getId())  
 .toUri();  
 return ResponseEntity.created(location).build();  
 }  

 @PutMapping(value = "/{id}", consumes = MediaType.APPLICATION_JSON_VALUE)  
 public ResponseEntity update(@PathVariable Long id, @RequestBody @Validated EmployeeRequest request) {  
 employeeChangeUseCase.update(id, mapper.mapToModel(request));  
 return ResponseEntity.noContent().build();  
 }  

 @DeleteMapping(value = "/{id}")  
 public ResponseEntity delete(@PathVariable Long id) {  
 employeeChangeUseCase.delete(id);  
 return ResponseEntity.noContent().build();  
 }  
}

В середині наших useCases, ми маємо операції кешування:

EmployeeSearchUseCaseImpl:

@Service  
@RequiredArgsConstructor  
@FieldDefaults(level = lombok.AccessLevel.PRIVATE, makeFinal = true)  
public class EmployeeSearchUseCaseImpl implements EmployeeSearchUseCase {  

 EmployeeProvider employeeProvider;  

 @Cacheable(cacheNames = "employeeById", key = "#id")  
 public Employee getById(Long id) {  
 return employeeProvider.getById(id);  
 }  

}

EmployeeChangeUseCaseImpl:

@Service  
@RequiredArgsConstructor  
@FieldDefaults(level = lombok.AccessLevel.PRIVATE, makeFinal = true)  
public class EmployeeChangeUseCaseImpl implements EmployeeChangeUseCase {  

 EmployeeProvider employeeProvider;  

 public Employee create(Employee employee) {  
 return employeeProvider.save(employee);  
 }  

 @CachePut(cacheNames = "employeeById", key = "#id")  
 public Employee update(Long id, Employee employee) {  
 return employeeProvider.update(id, employee);  
 }  

 @CacheEvict(cacheNames = "employeeById", key = "#id")  
 public void delete(Long id) {  
 employeeProvider.delete(id);  
 }  
}

Запуск

Давайте симулюємо наступні операції:

  1. Створення нового співробітника
  2. Пошук за ID
  3. Видалення за ID
  4. Створення нового співробітника
  5. Пошук за ID
    6.
    Подія Redis Expired, видалення ключа в кеші

pic

Створення нового співробітника

pic

Пошук за ID

pic

Консоль додатку

pic

Видалення співробітника

pic

Консоль додатку

pic

Створення нового співробітника

pic

Пошук за ID

pic

Консоль додатку

посилання на проект: https://github.com/Medium-Artigos/events-from-redis

Висновок

Redis — це набагато більше, ніж просто сховище типу "ключ-значення"; його багатий набір функцій дозволяє розробникам створювати високо реактивні, масштабовані та ефективні додатки. Серед цих функцій Keyspace Notifications виділяються як потужний механізм для архітектур, заснованих на подіях в реальному часі. Використовуючи ці сповіщення, додатки можуть миттєво реагувати на зміни в ключах Redis — такі як закінчення терміну дії, видалення чи оновлення — і виконувати налаштовану бізнес-логіку.

У цій статті ми розглянули:

  • Основи сповіщень Keyspace Redis і їх практичні варіанти використання.
  • Як налаштувати Redis для включення цих сповіщень.
  • Приклад інтеграції сповіщень Keyspace Redis у Spring Boot додаток.

Сповіщення Keyspace особливо корисні в таких сценаріях, як інвалідизація кешу, архітектури, орієнтовані на події, та моніторинг змін у системі. Однак важливо ретельно спроектувати ваше додаток, щоб уникнути зайвого навантаження на сервер Redis, особливо при роботі з великими даними або високочастотними подіями.

Не соромтеся залишати коментарі або пропозиції, а також ділитися з іншими розробниками, яким це рішення може бути корисним!

Слідкуйте за мною для ще більш цікавого контенту

➡️ Medium

➡️ LinkedIn

➡️ Substack

☕ Купити каву ☕

Перекладено з: Listening to Events From Redis in Your Spring Boot Application

Leave a Reply

Your email address will not be published. Required fields are marked *