“Створити сигнал, коли з кредитної картки за кордоном протягом останньої години було виконано більше ніж 2 транзакції в одній компанії, а загальна сума перевищує 5000 TL.”
“Створити сигнал, коли з кредитної картки протягом останньої години було знято більше ніж 2 аванси готівкою.”
Ви хочете перевіряти фінансові або нефінансові операції в реальному часі (або майже в реальному часі), дотримуючись певних правил і шаблонів. Як це зробити?
Це питання можна вирішити різними способами. Відповідь може змінюватися в залежності від контексту. Як і в більшості інженерних задач, немає єдино правильного рішення. Ідеальне рішення, яке не залежить від контексту, практично неможливе. Якщо хтось говорить про ідеальне і досконале рішення, варто насторожитися — це означає, що вони дивляться на проблему з дуже вузької перспективи. Як сказав Ніл Форд, замість ідеального рішення потрібно шукати "найменш погане" (найбільш прийнятне) рішення в межах нашого контексту. Чи ви ревізуєте існуючий додаток або пишете новий з нуля? Який обсяг транзакцій (100 TPS, 1K TPS, 10K TPS, більше)? Яка допустима затримка — реальний час, майже реальний час або пакетна обробка? Скільки буде сценаріїв? Хто визначатиме ці сценарії? Як гнучким буде це визначення? Де буде працювати додаток? Хто його запускатиме? Які витрати на розробку та експлуатацію? Яка складність? Які кваліфікації та досвід технічної команди? Скільки часу на розробку? І так далі.
Я хочу поділитися історією, як ми шукали відповідь на це питання в нашому контексті.
Перший метод, який спадає на думку — написати для цього додаток. Створити інтерфейс для визначення правил. Зберігати всі транзакції в базі даних. Для кожної нової транзакції виконувати перевірку на основі історії транзакцій і застосовувати правила. Транзакцію, що порушує правила, позначати як ризикову та вживати відповідні заходи. Оскільки додаток ви пишете самі, ви маєте повну гнучкість, але при використанні бази даних для запитів, коли обсяг транзакцій перевищує 10K TPS, ви зіткнетеся з фізичними обмеженнями реляційних баз даних. Як відомо, реляційні бази даних, навіть якщо ви використовуєте Exadata, все одно мають фізичні обмеження. Вони не можуть безмежно масштабуватися вгору. Розподілені бази даних NOSQL, здатні масштабуватися горизонтально, не мають такої гнучкості та можливостей для запитів, як реляційні БД. Тому додаток, що працює за допомогою запитів до реляційної бази даних, після певної точки перестає бути дійсним варіантом.
Другий метод — використання платформ для потокової обробки даних, що здатні зберігати історичні дані. Ці платформи можуть акумулювати статистичні дані про клієнтів або підприємства, наприклад, за останню годину, день, тиждень тощо, і виконувати їх обробку в режимі реального часу. Це дозволяє уникнути необхідності запитувати базу даних, знижуючи навантаження на неї. Тепер обробка даних відбувається на платформі, що працює з потоками даних. Платформи для обробки потокових даних працюють швидко завдяки тому, що вони зберігають дані в пам'яті та працюють у розподіленій архітектурі. Хоча ці платформи не такі гнучкі, як спеціально написані додатки, вони пропонують потужні рішення завдяки високорівневим структурам даних, API для обробки даних, управлінню складними розподіленими системами та масштабованості. Існує безліч рішень для цієї проблеми.
“Створити сигнал, коли з кредитної картки за кордоном протягом останньої години було виконано більше ніж 2 транзакції в одній компанії, а загальна сума перевищує 5000 TL.”
“Створити сигнал, коли з кредитної картки протягом останньої години було знято більше ніж 2 аванси готівкою.”
Ви хочете перевіряти фінансові або нефінансові операції в реальному часі (або майже в реальному часі), дотримуючись певних правил і шаблонів. Як це зробити?
Це питання можна вирішити різними способами. Відповідь може змінюватися в залежності від контексту. Як і в більшості інженерних задач, немає єдино правильного рішення. Ідеальне рішення, яке не залежить від контексту, практично неможливе. Якщо хтось говорить про ідеальне і досконале рішення, варто насторожитися — це означає, що вони дивляться на проблему з дуже вузької перспективи. Як сказав Ніл Форд, замість ідеального рішення потрібно шукати "найменш погане" (найбільш прийнятне) рішення в межах нашого контексту. Чи ви ревізуєте існуючий додаток або пишете новий з нуля? Який обсяг транзакцій (100 TPS, 1K TPS, 10K TPS, більше)? Яка допустима затримка — реальний час, майже реальний час або пакетна обробка? Скільки буде сценаріїв? Хто визначатиме ці сценарії? Як гнучким буде це визначення? Де буде працювати додаток? Хто його запускатиме? Які витрати на розробку та експлуатацію? Яка складність? Які кваліфікації та досвід технічної команди? Скільки часу на розробку? І так далі.
Я хочу поділитися історією, як ми шукали відповідь на це питання в нашому контексті.
Перший метод, який спадає на думку — написати для цього додаток. Створити інтерфейс для визначення правил. Зберігати всі транзакції в базі даних. Для кожної нової транзакції виконувати перевірку на основі історії транзакцій і застосовувати правила. Транзакцію, що порушує правила, позначати як ризикову та вживати відповідні заходи. Оскільки додаток ви пишете самі, ви маєте повну гнучкість, але при використанні бази даних для запитів, коли обсяг транзакцій перевищує 10K TPS, ви зіткнетеся з фізичними обмеженнями реляційних баз даних. Як відомо, реляційні бази даних, навіть якщо ви використовуєте Exadata, все одно мають фізичні обмеження. Вони не можуть безмежно масштабуватися вгору. Розподілені бази даних NOSQL, здатні масштабуватися горизонтально, не мають такої гнучкості та можливостей для запитів, як реляційні БД. Тому додаток, що працює за допомогою запитів до реляційної бази даних, після певної точки перестає бути дійсним варіантом.
Другий метод — використання платформ для потокової обробки даних, що здатні зберігати історичні дані. Ці платформи можуть акумулювати статистичні дані про клієнтів або підприємства, наприклад, за останню годину, день, тиждень тощо, і виконувати їх обробку в режимі реального часу. Це дозволяє уникнути необхідності запитувати базу даних, знижуючи навантаження на неї. Тепер обробка даних відбувається на платформі, що працює з потоками даних. Платформи для обробки потокових даних працюють швидко завдяки тому, що вони зберігають дані в пам'яті та працюють у розподіленій архітектурі. Хоча ці платформи не такі гнучкі, як спеціально написані додатки, вони пропонують потужні рішення завдяки високорівневим структурам даних, API для обробки даних, управлінню складними розподіленими системами та масштабованості. Існує безліч рішень для цієї проблеми.
Потокова Обробка Даних (Stream Processing)
У галузі потокової обробки даних існує багато інструментів. Ці інструменти відрізняються за можливостями, сильними та слабкими сторонами, рівнем зрілості, рівнем прийняття в галузі та поширеністю. Це абсолютно окрема область спеціалізації. Нижче наведено актуальний огляд цієї екосистеми.
Джерело: The Data Streaming Landscape 2025 — Kai Waehner
Основні Поняття Флінк
Apache Flink — це платформа для обробки даних з відкритим вихідним кодом. Вона об'єднує обробку потокових і пакетних даних в одному розподіленому середовищі. Перша версія була випущена в 2011 році. Вона написана на Java і Scala. Для обробки даних можна писати додатки на Java, Scala, Python та SQL. Спільнота організовує щорічну конференцію Flink Forward з високою участю.
Flink визначає себе як інструмент для “Stateful Computations over Data Streams” — це означає, що він здатний зберігати стан (історичну інформацію, стан) в процесі обробки потокових даних.
На одній платформі Flink може обробляти як потокові (stream), так і пакетні (batch) дані. Потокові дані — це невизначені (unbounded) дані, а пакетні — обмежені (bounded) дані.
Flink пропонує API, яке складається з 4 рівнів.
Flink APIs
Архітектура Flink виглядає наступним чином:
Flink Task Manager виконує роль робітника. Flink Job працюють на Task Manager.
Flink Job Manager керує Flink Job.
Він відповідає за такі завдання, як запуск, зупинка, відслідковування статусів, автоматичне створення Task Manager, надання REST API тощо.
Для того, щоб врахувати запізнілі транзакції та відсортувати їх, передбачено механізм Watermark.
Для внутрішньої узгодженості та автоматичного управління помилками з можливістю відновлення процесу після помилки, Flink надає механізми checkpoint та savepoint. Через Checkpoint моди доступні опції для узгодженості, як "точно один раз" та "хоча б один раз" (exactly once, at least once).
Flink дозволяє обробляти дані паралельно. Під час запуску Flink Job достатньо налаштувати параметр parallelism, щоб забезпечити горизонтальне масштабування.
Можна визначити часові вікна, наприклад, "за останню годину", за допомогою структури _window. Існують різні види часових вікон: tumbling, sliding тощо.
Flink працює як з часом обробки даних (processing time), так і з часом події (event time).
Установку на платформі Kubernetes можна здійснити за допомогою оператора, і також здійснюється автоматичне управління ресурсами.
Flink SQL
Flink SQL є найвищим рівнем API Flink. Наприклад, сценарій, згаданий на початку цієї статті, можна реалізувати дуже просто за допомогою Flink SQL. Коли ви сказали цей сценарій ChatGPT, він надасть вам наступну відповідь.
Сценарій:
Коли з кредитної картки за кордоном на тій самій компанії
протягом останньої години здійснено більше 2 операцій,
і загальна сума перевищує 5000 TL, створити сигнал.
Відповідь на Flink SQL:
CREATE TABLE alerts (
card_id STRING,
firm_id STRING,
total_amount DOUBLE,
transaction_count INT,
alert_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/your_database',
'table-name' = 'alerts',
'username' = 'your_username',
'password' = 'your_password'
);
INSERT INTO alerts
SELECT
card_id,
firm_id,
SUM(amount) AS total_amount,
COUNT(*) AS transaction_count,
CURRENT_TIMESTAMP AS alert_time
FROM transactions
WHERE amount > 0
GROUP BY
card_id,
firm_id,
TUMBLE(transaction_time, INTERVAL '1' HOUR)
HAVING COUNT(*) > 2
AND SUM(amount) > 5000;
Інтеграція з Flink REST API
Інтеграція Flink REST API з додатком дуже проста. Ви можете отримати API документацію для відповідної версії за цим посиланням.
Документ написаний з використанням OpenAPI 3, ми можемо експортувати його до Postman як колекцію API, також за допомогою openapi-generator ми можемо автоматично генерувати код для клієнтської частини.
Експортування колекції Flink API до Postman
Ми можемо вставити посилання на API документацію в Postman і створити колекцію API, як показано нижче.
Експортування документації Flink API до Postman
Після створення колекції Flink API ми можемо отримати доступ до Flink кластеру через Flink API, створювати нові роботи (jobs), ставити їх на паузу, виконувати запити статусу та інші дії.
Використання Flink API через Postman API колекцію
Створення клієнта для Flink API
Для інтеграції Flink API в наше додаток, ми можемо автоматично генерувати код для клієнтської частини (стаби, API клієнти тощо) через інструменти, засновані на специфікації OpenAPI.
Для цього ми завантажуємо та використовуємо npm пакет openapi-generator.
npm install @openapitools/openapi-generator-cli -g
Створюємо новий проект для API клієнта і в директорії проекту виконуємо наступний скрипт для створення бібліотеки інтеграції API.
openapi-generator-cli generate \
-i https://nightlies.apache.org/flink/flink-docs-release-1.16/generated/rest_v1_dispatcher.yml \
--api-package org.apache.flink.apiclient.api \
--model-package org.apache.flink.apiclient.model \
--invoker-package org.apache.flink.apiclient.invoker \
--group-id org.apache.flink.apiclient \
--artifact-id flink-api-client \
--artifact-version 1.16.0 \
-g java \
-p java8=true \
--library apache-httpclient \
--global-property skipFormModel=false \
--additional-properties useRuntimeException=true
Після виконання скрипту створюється проект, який ми можемо опублікувати в нашому репозиторії Maven і використовувати в додатку.
Доступ до Flink API з додатку
public class FlinkApiPlayground {
private static final Logger logger = LogManager.getLogger(FlinkApiPlayground.class);
private static final String FLINK_API_BASE_PATH = "http://xxxx";
private static final FlinkApi flinkApi;
private static final FlinkApiConfig flinkApiConfig;
public static class FlinkApi extends DefaultApi {
public FlinkApi(FlinkApiConfig config) {
ApiClient apiClient = Configuration.getDefaultApiClient();
apiClient.setBasePath(config.getBasePath());
setApiClient(apiClient);
}
}
public static class FlinkApiConfig {
private String basePath;
// ...
// getters & setters
}
static {
flinkApiConfig = new FlinkApiConfig();
flinkApiConfig.setBasePath(FLINK_API_BASE_PATH);
flinkApi = new FlinkApi(flinkApiConfig);
}
public static void main(String[] args) {
logger.info("Welcome to Flink Api Playground...");
try {
ClusterOverviewWithVersion overview = flinkApi.getClusterOverview();
logger.info("Flink cluster overview: {}", overview);
JarListInfo jarListInfo = flinkApi.getJarList();
logger.info("Flink jar list: {}", jarListInfo);
String jarId = jarListInfo.getFiles().get(0).getId();
logger.info("Jar id: {}", jarId);
JarRunResponseBody jobResponse = flinkApi.submitJobFromJar(jarId, false, null, null, null, null, null, null);
logger.info("Job response: {}", jobResponse);
} catch (Exception e) {
logger.error(e);
}
logger.info("Enough playing...");
:)");
}
## Тести інтеграції Flink
Для тестування інтеграції Flink є **різні стратегії**, залежно від потреб.
Для тестів інтеграції Flink пропонується **мінімальна кластерна інфраструктура**, де ми можемо використовувати плагіни для JUnit5 та створювати **вбудований** кластер для виконання інтеграційних тестів. Цю інфраструктуру також можна використовувати для тестування API інтеграцій.
Для виконання Flink Job не потрібно створювати мінімальний кластер. За допомогою TestContainers можна підняти Kafka і DB разом, а потім виконати Flink Job у тестовому методі та перевірити результати.
## Тести інтеграції API з мінімальним кластером Flink
class FlinkClientIntegrationTest {
@RegisterExtension
public final static MiniClusterExtension FLINK_CLUSTER = new MiniClusterExtension(getClusterConfig());
// приклад: https://github.com/apache/flink/blob/master/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
private static FlinkApi flinkApi;
private static MiniClusterResourceConfiguration getClusterConfig() {
return new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build();
}
@BeforeAll
static void beforeAll(@InjectClusterRESTAddress URI restAddress) {
logger.info("Flink mini test cluster configured...");
logger.info("Flink mini cluster url: {}", restAddress);
FlinkApiConfig flinkApiConfig = new FlinkApiConfig();
flinkApiConfig.setBasePath(restAddress.toString());
flinkApi = new FlinkApi(flinkApiConfig);
}
@Test
@DisplayName("should fetch cluster overview successfully")
void shouldfetchclusteroverviewsuccessfully(@InjectClusterRESTAddress URI restAddress) {
ClusterOverviewWithVersion clusterOverview = flinkApi.getClusterOverview();
logger.info("Cluster overview: {}", clusterOverview);
Assertions.assertNotNull(clusterOverview);
}
}
```
Тести інтеграції Flink Job
У цьому розділі я надам приклад Flink Job та його тесту. Код Flink Job в основному був створений за допомогою ChatGPT, оскільки нам потрібен був тестовий код.
Використовувані структури даних:
- Transaction: Моделює транзакції.
- TransactionDeserializer: Використовується на стороні Flink для десеріалізації. Створено за допомогою ChatGPT.
- SimpleFlinkJob: Простий клас Flink Job, написаний за допомогою Flink SQL. У ньому сценарій для транзакцій, сума яких більша за 100, генерує попередження та записує їх у таблицю
alerts
в базі даних. Створено за допомогою ChatGPT. - AbstractKafkaIntegrationTest: Базовий клас, що піднімає як Kafka, так і контейнер PostgreSQL через бібліотеку TestContainers. Деталі реалізації відсутні. Для використання TestContainers дивіться цю статтю.
- FlinkJobIntegrationTest: Клас для інтеграційних тестів. Використовує інфраструктуру AbstractKafkaIntegrationTest. Включає тест інтеграції, написаний у стилі BDD.
Основне, що робить SimpleFlinkJob — запускає обробку транзакції та перевіряє, що запис у таблиці alerts про сигнал тривоги був створений.
Клас Transaction:
class Transaction {
private Integer amount;
private String customerNo;
// getters & setters
}
TransactionDeserializer:
public class TransactionDeserializer implements KafkaRecordDeserializationSchema {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void deserialize(ConsumerRecord record, Collector out) throws IOException {
String json = new String(record.value());
Transaction transaction = objectMapper.readValue(json, Transaction.class);
out.collect(transaction);
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Transaction.class);
}
}
SimpleFlinkJob:
public class SimpleFlinkJob {
public static void run(String kafkaBootstrapServers, String topic, String dbUrl, String username, String pass) throws Exception {
// 1. Створюємо середовище виконання потоку Flink і середовище таблиць
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8888);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(Duration.ofMillis(100).toMillis());
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. Створюємо KafkaSource і використовуємо кастомний десеріалізатор
KafkaSource kafkaSource = KafkaSource.builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(topic)
.setGroupId("flink-transactions-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new TransactionDeserializer())
.build();
// 3. Отримуємо дані з Kafka
DataStream kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 4. Створюємо view для обробки даних через Flink SQL
tableEnv.createTemporaryView("transactions", kafkaStream);
// 5. Створюємо структуру для JDBC sink в PostgreSQL
String jdbcSink = "CREATE TABLE alerts ("
+ "id SERIAL PRIMARY KEY,"
+ "customer_no VARCHAR,"
+ "amount DOUBLE PRECISION"
+ ") WITH ("
+ "'connector' = 'jdbc',"
+ "'url' = '" + dbUrl + "',"
+ "'username' = '" + username + "',"
+ "'password' = '" + pass + "',"
+ "'driver' = 'org.postgresql.Driver',"
+ "'table-name' = 'alerts'"
+ ")";
tableEnv.executeSql(jdbcSink);
// 5. Виконуємо запис у базу даних і фільтрацію в одному SQL запиті
String flinkJobSql = "INSERT INTO alerts "
+ "SELECT ROW_NUMBER() OVER () AS id, customerNo AS customer_no, amount "
+ "FROM transactions "
+ "WHERE amount > 100";
tableEnv.executeSql(flinkJobSql);
// 8.
Запускаємо процес
env.execute("Flink Kafka to DB with Alerts");
}
}
FlinkJobIntegrationTest:
class FlinkJobIntegrationTest extends AbstractKafkaIntegrationTest {
static Producer producer;
Connection conn;
Statement stmt;
static String bootstrapServers;
static String jdbcUrl;
static String username = "xxx";
static String pass = "xxx";
static String topic = "transactions";
static {
createKafkaTopic(topic);
initKafkaProducer();
bootstrapServers = kafkaContainer.getBootstrapServers();
jdbcUrl = "jdbc:postgresql://localhost:" + postgresqlContainer.getFirstMappedPort() + "/app-schema";
}
@BeforeEach
void beforeEach() throws Exception {
Connection conn = DriverManager.getConnection(jdbcUrl, username, pass);
stmt = conn.createStatement();
}
@AfterEach
void afterEach() throws SQLException {
stmt.close();
conn.close();
}
@Test
@DisplayName("should create alerts for transactions fed to kafka ")
void should_create_alerts_for_transactions_fed_to_kafka() throws Exception {
CountDownLatch jobStartedLatch = new CountDownLatch(1);
Runnable simpleFlinkJob = new Runnable() {
@Override
public void run() {
try {
String jdbcUrl = postgresqlContainer.getFirstMappedPort() + "/app-schema";
SimpleFlinkJob.run(bootstrapServers, topic, jdbcUrl, username, pass);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
Runnable clusterReadinessCheck = new Runnable() {
@Override
public void run() {
while (true) {
try {
RestTemplate restTemplate = new RestTemplate();
String result = restTemplate.getForObject("http://localhost:8888/jobs/overview", String.class);
boolean isJobRunning = result.contains("\"state\":\"RUNNING\"");
boolean isTaskRunning = result.contains("\"running\":1");
if (isTaskRunning & isJobRunning) {
jobStartedLatch.countDown();
}
Thread.sleep(1000);
} catch (Exception ex) {
logger.error(ex);
}
}
}
};
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(simpleFlinkJob); // запускаємо Flink job
executorService.execute(clusterReadinessCheck);// перевіряємо наявність готового кластера в окремому потоці
jobStartedLatch.await(1, TimeUnit.MINUTES); // чекаємо на старт job, максимум 1 хвилина, але зазвичай це займає менше часу
Thread.sleep(2000);
feedTransactionToKafkaTopic(topic); // надсилаємо транзакцію в Kafka
Thread.sleep(2000); // взагалі не рекомендується використовувати sleep :)
// Перевіряємо, чи є один запис у таблиці alerts в базі даних
ResultSet alertCountResultSet = stmt.executeQuery("select count(*) from alerts");
alertCountResultSet.next();
Assertions.assertEquals(1, alertCountResultSet.getInt(1), "expected 1 alert");
}
void feedTransactionToKafkaTopic(String topic) {
Transaction trx = new Transaction();
trx.setCustomerNo("123");
trx.setAmount(100);
producer.send(new ProducerRecord<>(topic, trx.getCustomerNo(), trx));
}
static void initKafkaProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
}
Налаштування контейнера Flink
Контейнер Flink надається як відкрите джерело.
Для того, щоб записувати Checkpoint та Savepoint у S3, можна активувати плагін S3 та включити необхідні бібліотеки для DB-драйверів та інших залежностей у зображення контейнера.
FROM flink:1.16.1
ENV FLINK_LIB_DIR=/opt/flink/lib
ENV OPTIONAL_LIB_DIR=/opt/flink/opt
ENV S3_PLUGIN_DIR=/opt/flink/plugins/flink-s3-fs-hadoop
WORKDIR ${S3_PLUGIN_DIR}
RUN cp ${OPTIONAL_LIB_DIR}/flink-s3-fs-hadoop-1.16.1.jar ./
WORKDIR ${FLINK_LIB_DIR}
COPY shared-job-libs ./
Flink Cluster — Flink Kubernetes Operator
Flink надає оператор Kubernetes для створення кластерів Flink на платформі Kubernetes, базуючись на Helm chart. Архітектура та загальна робота оператора продемонстровані на наступних зображеннях.
Джерело: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/
Джерело: Architecture | Apache Flink Kubernetes Operator
Оператор пропонує два CRD (Custom Resource Definition) — FlinkDeployment та FlinkSessionJob. Ми використовуватимемо FlinkDeployment.
Оператор і його FlinkDeployment CRD дозволяють створювати і налаштовувати середовище кластеру (Job Manager, Task Manager). Ми налаштовуємо Job Manager для роботи в режимі HA (висока доступність) або з одною копією, а Task Manager налаштовується так, щоб автоматично створювати нові копії за потреби.
Оскільки операції відбуваються на рівні кластера Kubernetes, важливим є налаштування авторизації. Якщо ви надаєте послуги корпоративним клієнтам, то важливо, щоб продукт не мав повного доступу до всього кластеру, тому ролі та дозволи мають бути налаштовані правильно.
Нижче наведено два приклади ролей flink-operator та flink, а також дозволи, які вони використовують.
Джерело: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/operations/rbac/
OpenShift Authorisation: Ролі та прив'язка ролей
У OpenShift ролі можна визначати як для локальних простірів імен (namespace), так і для кластеру в цілому. Локальні ролі можуть бути прив'язані до локальних просторів імен (RoleBinding), а ролі на рівні кластера — як до кластеру (ClusterRoleBinding), так і до локальних просторів імен.
Джерело: Using RBAC to define and apply permissions | Authentication and authorization | OpenShift Container Platform 4.11
Для автоматизації процесу налаштування та оновлення кластера за допомогою Helm chart визначаємо обліковий запис сервісу з назвою flink-deployer. Далі ми налаштовуємо відповідні дозволи, як описано в наступних кроках.
Роль crd-admin та надання прав для flink-deployer
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: crd-admin
rules:
- verbs:
- '*'
apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-crd-admin
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: crd-admin
Ми зв'язуємо роль crd-admin з сервіс-обліковим записом flink-deployer на рівні кластера, оскільки для додавання CRD (Custom Resource Definition) необхідно мати відповідні права на рівні кластера.
В описі ролі видно, що ми надаємо дозвіл на доступ лише до необхідних API (customresourcedefinitions).
Роль flink-operator та надання прав для flink-deployer
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-operator
rules:
- verbs:
- '*'
apiGroups:
- ''
resources:
- pods
- services
- events
- configmaps
- secrets
- verbs:
- list
apiGroups:
- ''
resources:
- nodes
- verbs:
- '*'
apiGroups:
- apps
resources:
- deployments
- deployments/finalizers
- replicasets
- verbs:
- '*'
apiGroups:
- extensions
resources:
- deployments
- ingresses
- verbs:
- '*'
apiGroups:
- flink.apache.org
resources:
- flinkdeployments
- flinkdeployments/status
- flinksessionjobs
- flinksessionjobs/status
- verbs:
- '*'
apiGroups:
- networking.k8s.io
resources:
- ingresses
- verbs:
- '*'
apiGroups:
- coordination.k8s.io
resources:
- leases
Оператор має роль на рівні кластера з широкими правами доступу. Оскільки він буде створювати весь кластер, цей рол є дуже потужним. Тому ми зв'язуємо його з обліковим записом сервісу flink-deployer для локального простору імен flink-cluster.
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-flink-operator
namespace: flink-cluster
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flink-operator
Роль webhook-admin та надання прав для flink-deployer
Хоча ми безпосередньо не використовуємо цю функцію, оператор Helm chart потребує цієї ролі для роботи. Ми також надаємо роль на рівні кластера для обмеження доступу до лише API webhook.
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: webhook-admin
rules:
- verbs:
- '*'
apiGroups:
- admissionregistration.k8s.io
resources:
- mutatingwebhookconfigurations
- validatingwebhookconfigurations
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-webhook-admin
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: webhook-admin
Надання прав адміністратора проекту для flink-deployer
Останнім кроком ми надаємо обліковому запису flink-deployer адміністративні права для проекту flink-cluster.
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: flink-deployer-flink-cluster-admin
namespace: flink-cluster
subjects:
- kind: ServiceAccount
name: flink-deployer
namespace: flink-cluster
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: admin
Налаштування Helm Chart для Flink Operator та визначення FlinkDeployment
Про Helm chart та його налаштування можна дізнатися більше в цій статті. Структура проекту для Helm Chart виглядає наступним чином:
У каталозі charts знаходиться оригінальний Helm chart для оператора. В каталозі templates знаходяться наші додаткові налаштування. У файлі values.yaml зберігаються налаштування для кастомізації. Під час налаштування можна змінювати значення values.yaml залежно від середовища.
Нижче наведено вміст файлу deployment.yaml.
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-deployment
labels:
{{- include "flink-operator.labels" .
Основні налаштування в файлі values.yaml виглядають наступним чином:
flink-kubernetes-operator:
defaultConfiguration:
create: true
# Встановіть append в false для заміни конфігураційних файлів
append: true
flink-conf.yaml: |+
# Налаштування Flink
s3.access-key: xxx
s3.secret-key: xxx
s3.endpoint: https://s3.xxx.amazonaws.com
s3.path.style.access: true
podSecurityContext:
# помилка helm, не можна перевизначити security context для типу map у підчарті
# чекаємо виправлення, issue: https://github.com/helm/helm/issues/9027 , https://github.com/helm/helm/pull/11440
runAsUser: 1000759999
operatorSecurityContext: {}
webhookSecurityContext: {}
watchNamespaces: ["flink-cluster"]
configMaps:
ref: flink-deployment-config
values:
dbUrl: "jdbc:oracle:thin:@xxx:xxx/xxx?rewriteBatchedStatements=true"
dbUsername: "xxx"
dbPassword: "xxx"
kafkaConsumerGroupId: "xxx"
kafkaTopic: "xxx"
kafkaBrokersIpList: "kafka.flink-cluster.svc.cluster.local:9092"
image:
flinkImage: "registry.xxx.com/our-app-images/flink:1.16.1"
# Коли потрібно завантажувати образ.
# Опція 'Always' перевіряє хеш образу та завантажує його, якщо він змінений. За замовчуванням 'IfNotPresent'.
pullPolicy: Always
flinkVersion: v1_16
ingressHost: flink.apps.flink-cluster.xxx.com
flinkConfiguration:
taskmanager.numberOfTaskSlots: '5'
jobmanager.memory.jvm-metaspace.size: "536870912"
taskmanager.memory.jvm-metaspace.size: "536870912"
kubernetes.operator.cluster.health-check.checkpoint-progress.enabled: "true"
kubernetes.operator.cluster.health-check.enabled: "true"
state.savepoints.dir: s3://flink-data/savepoints
state.checkpoints.dir: s3://flink-data/checkpoints
kubernetes.operator.periodic.savepoint.interval: 5h
kubernetes.operator.job.autoscaler.enabled: "true"
high-availability: kubernetes
high-availability.storageDir: s3://flink-data/recovery
parallelism.default: "1"
pipeline.operator-chaining: "false"
rest.flamegraph.enabled: "true"
taskManagerResource:
memory: 1024m
cpu: 1
jobManagerReplicaCount: 1
jobManagerResource:
memory: 2048m
cpu: 1
imagePullSecrets:
- name: xxx-registry
```
Кожен з цих параметрів є важливим для налаштування середовища Flink. Тут не буду вдаватися в деталі кожного налаштування, так як іменування досить зрозуміле. Для тих, хто хоче дізнатися більше, можна ознайомитися з документацією Flink і Flink Operator.
Тести продуктивності та оптимізація
Якщо ви починаєте використовувати нову технологію, і ваша архітектура зазнає значних змін, то тестування продуктивності є критично важливим. Не можна рухатись за припущеннями. Необхідно спостерігати за кожним налаштуванням і його впливом, а потім проводити оптимізацію в залежності від отриманих результатів, оскільки закон великих чисел має велике значення. Хоча, чи є цей закон насправді, не знаю 🙂
Нижче наведено приклад тестової топології.
Тестова топологія Flink кластера
Підготувавши тестове середовище, ми почали з малих навантажень і поступово збільшували їх до великих TPS.
Пропускна здатність, використання ЦП, використання пам'яті, використання пропускної здатності — ці метрики ми спостерігали через метрики OpenShift. Спочатку ми проводили тести з одним сценарієм (роботою). Потім, множачи сценарії, ми вивчали використання ресурсів.
Основні результати та наші рішення виглядали наступним чином:
- Спочатку дані були у форматі JSON, і коли ми прибрали непотрібні поля, передаючи лише заповнені, ми побачили покращення продуктивності.
- Аналізуючи використання ЦП за допомогою Flame графіків, ми побачили, що саме операції десеріалізації створюють вузькі місця.
- Коли ми стиснули Kafka повідомлення (compression), ми побачили приблизно 4-кратне покращення пропускної здатності.
- Використовуючи Protobuf для серіалізації замість JSON, ми досягли 5-кратного покращення пропускної здатності. Використання ЦП знизилось майже в 8 разів. Щоб отримати загальний огляд серіалізації, можна ознайомитися з статтею "1001 облич серіалізації".
- Оскільки ми динамічно розгортаємо Job, метаспростір JVM в якийсь момент став недостатнім, і ми збільшили значення за замовчуванням з 250 МБ до 512 МБ.
- Одного разу, попри те, що використання ЦП було низьким, ми помітили, що не можемо досягти певного значення TPS. При детальнішому вивченні з'ясувалося, що проблема була в пропускній здатності. Працюючи з одним партиціоном Kafka, ми збільшили кількість партицій до 4. Одночасно ми встановили паралелізм Flink Job на 4. Це дозволило збільшити горизонтальну паралельність до 4. Після цих налаштувань проблема з пропускною здатністю була вирішена частково. Однак, коли кількість сценаріїв досягла 800, проблема з використанням ЦП і пропускною здатністю залишалася серйозною. Кожен Flink Job працював незалежно, отримуючи дані з Kafka topic самостійно. Це означало, що одне завдання отримувалось 800 разів, що було серйозною проблемою. Щоб вирішити це, ми спробували виконати кілька сценаріїв в одному Job. Хоча управління життєвим циклом Job стало складним і вимагало спеціального налаштування (наприклад, динамічне призупинення одного сценарію в межах того ж пакета), управління ресурсами покращилось. Коли ми запустили 4 сценарії в одному Job, замість того, щоб мати 4 окремих Job (застосунки), це підвищило продуктивність майже в 4 рази. Але тут також виникло насичення, і ми намагались знайти оптимальну кількість у межах нашої конфігурації.
- Використання Watermark збільшило навантаження на ЦП на 50%. Ми також виявили, що період Watermark не мав впливу на ці показники. У разі, якщо це не було необхідно, ми вирішили не використовувати Watermark.
- Перетворення даних, як-от дати, збільшувало використання ЦП, і ми вирішили, що замість перетворення всіх даних буде правильніше виконувати локальні перетворення в сценаріях лише за необхідністю.
Завершення
Акан обробка даних (stream processing) є надзвичайно широкою темою. Це окрема галузь спеціалізації. Сучасні додатки дуже складні. Вони об'єднують в собі багато технологій і підтримують їх. Бази даних, інструменти для бізнес-аналітики та звітності, машинне навчання, технології великих даних, фреймворки та інші інструменти і платформи ми інтегруємо у наші продукти. Це означає, що нам, інженерам, доводиться досліджувати і розуміти багато технологій, навіть якщо ми не досягаємо в них глибокої спеціалізації. Сучасні інженери мають велику сферу діяльності. Це може призводити до того, що ми часто залишаємося на поверхні багатьох тем, або навіть коли ми стаємо експертами в одній темі, змінюючи напрямок, забуваємо більшість деталей. Світ такий, і єдиний вихід — грати за його правилами, навіть якщо нам доводиться оцінювати все через вузький погляд, базуючись не на величезному досвіді, а на доступних ресурсах. У світі, де все оцінюється через доступні матеріали, хоч і є певна сила, але змінити ситуацію непросто.
Отже, мабуть, найкраще намагатися насолоджуватися моментом та роботою, яку ми виконуюємо.
Сподіваюся, що ця стаття буде корисною для тих, хто цікавиться темою обробки потокових даних. Я отримав задоволення від дослідження, роботи та написання цієї статті, і сподіваюся, що ви також отримаєте задоволення від її читання. 🙂
Джерела
- Apache Flink® — Stateful Computations over Data Streams | Apache Flink
- Overview | Apache Flink
- Stateful Stream Processing | Apache Flink
- Timely Stream Processing | Apache Flink
- Flink Architecture | Apache Flink
- The Data Streaming Landscape 2025 — Kai Waehner
- Quick Start | Apache Flink Kubernetes Operator
- Helm Chart, Helmfile ve Ötesi. Çok sıkmamayı planlıyor bu yazı… | by Süleyman Fazıl Yeşil | Medium
- 1001 Yüzlü Veri. S:“Annenizin kızlık soyadının 2., 3. ve… | by Süleyman Fazıl Yeşil | Medium
- nightlies.apache.org/flink/flink-docs-release-1.16/generated/restv1dispatcher.yml
- GitHub — OpenAPITools/openapi-generator: OpenAPI Generator allows generation of API client libraries (SDK generation), server stubs, documentation and configuration automatically given an OpenAPI Spec (v2, v3)
- Glossary | Apache Flink
- Testcontainers
- Modern Entegrasyon Testleri: Testcontainers ve Docker | by Süleyman Fazıl Yeşil | Medium
Перекладено з: Bir Flink Hikayesi…