Проста демонстрація можливого рішення для побудови чат-сервера на основі WebSocket з Kafka.
Як розробник, зацікавлений в екосистемі Kafka, завжди цікаво експериментувати з Kafka в поєднанні з іншими технологіями. У цьому блозі я продемонструю поєднання WebSocket і Kafka для створення простого чат-сервера.
Використання Kafka в чат-сервері має сенс
- Коли трафік збільшується, повідомлення можуть споживатися з іншою швидкістю, ніж виробляються.
- Змінюйте стратегію розподілу повідомлень на стороні споживача без перезавантаження WebSocket сервера і вимкнення всіх існуючих WebSocket.
- Повідомлення можуть бути транслювані на кілька цілей, наприклад, WebSocket іншого активного користувача, базу даних для зберігання історії користувача або сховище даних.
Поєднання Kafka і WebSocket може бути складним
WebSocket можна уявити як стан, збережений в пам'яті, і це не природно сумісно з Kafka. Ось деякі проблеми, які потрібно вирішити:
- WebSocket потребує збереження в пам'яті, його неможливо серіалізувати і передавати.
- На стороні споживача, розділи теми повинні бути призначені відповідно до WebSocket, наприклад, якщо ServerA має WebSocket користувача UserA, то ServerA як споживач має заявити про розділ(и), де знаходяться всі повідомлення для UserA. Тому або потрібно створити кастомний PartitionAssignor, або споживачам потрібно використовувати
assign
замістьsubscribe
, алеassign
обов'язково вимагає більшої ручної настройки. - Враховуючи перший і другий пункти, зусилля для налаштування будуть великими, наприклад, при збільшенні трафіку потрібно масштабувати кількість розділів.
Відокремлення виробництва та споживання повідомлень може вирішити проблему
Натхненний OpenIM, потенційне рішення, яке поєднує WebSocket і Kafka, може бути додавання механізму переспрямування повідомлень між ними. OpenIM надає граф з детальною схемою з брокером повідомлень, відправником повідомлень, передачею повідомлень тощо. Щоб витягнути основні компоненти, я абстрагував основні компоненти та логіку на графіку нижче, який імітує відправку повідомлення від UserA до UserB.
Імітація потоку повідомлень від UserA до UserB
З цією спрощеною схемою головною проблемою є те, як «Знайти сервер». Насправді є кілька рішень, наприклад, реєструвати userId
і serverId
в Zookeeper або Consul, або використовувати який-небудь алгоритм узгодженого хешування для розподілу WebSocket і повідомлень вздовж userId
.
Реалізація демонстрації може допомогти краще зрозуміти це
Джерело коду можна знайти тут, цей приклад складається з:
- WebSocket сервера (ChatServer), заснованого на Ktor.
- HTTP сервера на базі Ktor (SessionRegistry), який працює як Consul/Zookeeper, що зберігає всі URL ChatServer для сесій користувачів, наприклад, у K8s, в якому поді знаходиться поточна сесія WebSocket.
- Простой споживач Kafka (MessageDispatcher), який споживає повідомлення та, маючи зареєстровану інформацію про сесії WebSocket, розподіляє повідомлення на цільовий ChatServer.
Коли UserA хоче відправити повідомлення UserB, основна логіка описана нижче:
- UserA починає з'єднання WebSocket з ChatServer, ChatServer реєструє свій URL у SessionRegistry для UserA. (Такий самий стартовий процес для всіх інших користувачів, і один користувач може мати кілька активних сесій WebSocket). Коли WebSocket закривається, ChatServer відреєстровує себе для цього користувача в SessionRegistry.
2.
У активній сесії WebSocket, кожне повідомлення (або кілька повідомлень, об'єднаних разом) від UserA, яке надходить на ChatServer, буде безпосередньо відправлено до теми повідомлень Kafka. Кожне повідомлення містить інформацію про senderId, receiverId і самі дані повідомлення. - MessageDispatcher продовжить споживати повідомлення з теми Kafka. Для кожного повідомлення (або кількох повідомлень для одного й того ж отримувача) MessageDispatcher спочатку знаходить усі активні сесії отримувача в SessionRegistry, а потім відправляє повідомлення на ChatServer.
- На кроку 3, якщо інформація в реєстрі сесій застаріла, спроба відправити повідомлення на ChatServer повинна бути неуспішною, і MessageDispatcher несе відповідальність за відреєстрування цієї конкретної сесії в SessionRegistry.
- Коли повідомлення відправлено на ChatServer, який утримує WebSocket сесію отримувача, ChatServer надішле повідомлення отримувачу через сесію. Якщо цільова WebSocket сесія не існує, ChatServer повинен надіслати помилку HTTP (наприклад, 404) під час відправки повідомлення, щоб MessageDispatcher міг обробити цей випадок, описаний на кроці 4.
- Якщо отримувач не має активної WebSocket сесії, MessageDispatcher ігнорує його під час відправки повідомлень. Це не означає, що повідомлення ігнорується. Розробник може зберігати повідомлення в темі повідомлень у будь-яке персистентне сховище і надалі обробляти офлайн повідомлення за потреби.
Цей дизайн є дуже загальним, і багато деталей для продукційної системи не розглянуті, наприклад:
- Як відновити або обробити офлайн повідомлення або повідомлення, які не вдалося доставити.
-
Надсилання повідомлень групам користувачів.
-
…
Але оскільки цей блог більше є брейнстормінгом, ви як читач можете самостійно досліджувати варіанти розширення системи 😃.
Перекладено з: Building WebSocket Based Chat Server With Kafka