Як створювати масштабовані реальні додатки в Node.js з кластеризацією сокетів та Redis Pub/Sub

Уявіть собі додаток для доставки їжі, де покупці розміщують замовлення, а продавці отримують сповіщення в реальному часі, навіть якщо вони залогінені на кількох пристроях. Тут Redis Pub/Sub виступає в ролі посланця, забезпечуючи безперебійну комунікацію між покупцями та продавцями. Кластери Node.js використовують кілька ядер процесора для обробки великого трафіку, тоді як Socket.IO керує підключеннями в реальному часі. Redis гарантує, що всі сервери в кластері залишаються синхронізованими, транслюючи повідомлення (наприклад, оновлення замовлень) на всі підключені пристрої.
Ця архітектура забезпечує швидкі, масштабовані та надійні сповіщення, незалежно від того, скільки користувачів онлайн.

Ми маємо лише два файли: один це (server.js), а інший це (socket.js)

код: src/server.js

const express = require('express');  
const app = express();  
const cluster = require('cluster');  
const os = require('os');  
const process = require('process');  
const http = require('http');  
const socket = require('socket.io');  

// Імпортуємо кастомний модуль для socket  
const { socketModule } = require('./socket');   

/*   
 Кількість ядер процесора для кластеризації, що допомагає додатку   
 використовувати всі доступні ядра процесора  
*/  
const numCPUs = os.availableParallelism();  

// Перевіряємо, чи є поточний процес основним (master) процесом  
if (cluster.isPrimary) {  
 console.log(`Основний процес ${process.pid} працює`);  

 // Створення робочих процесів для балансування навантаження  
 for (let i = 0; i < numCPUs; i++) {  
 cluster.fork(); // Створює новий робочий процес  
 }  

 // Прослуховування подій виходу робочих процесів для обробки помилок або непередбачених завершень  
 cluster.on('exit', (code, signal) => {  
 if (signal) {  
 console.log(`робочий процес був вбитий сигналом: ${signal}`);  
 } else if (code !== 0) {  
 console.log(`робочий процес вийшов з кодом помилки: ${code}`);  
 } else {  
 console.log('робочий процес успішно завершений!');  
 }  
 });  

 // Прослуховування подій відключення робочих процесів  
 cluster.on('disconnect', (worker) => {  
 console.log(`Робочий процес #${worker.id} відключився`);  
 });  
} else {  
 // Створюємо HTTP сервер та прикріплюємо його до додатку Express  
 const server = http.createServer(app);  

 // Маршрут для обробки створення замовлення  
 app.get('/create-order', (req, res) => {  
 // TODO: Додати логіку для створення замовлення та збереження його в базі даних  
 // Відправляємо подію продавцю через socket після створення замовлення  
 res.send(`Замовлення успішно розміщене.`);  
 })  

 // Маршрут для обробки прийняття замовлення  
 app.get('/order-accept', (req, res) => {  
 // TODO: Додати логіку для прийняття замовлення та оновлення в базі даних  
 // Відправляємо подію покупцю через socket після прийняття замовлення  
 res.send(`Замовлення прийнято.`);  
 })  

 // Ініціалізація Socket.IO з увімкненим CORS для дозволу запитів з будь-якого джерела  
 const io = socket(server, { cors: { origin: "*" } });  

 // Передаємо інстанс Socket.IO в кастомний модуль socket для обробки подій  
 socketModule(io)  

 // Стартуємо сервер і слухаємо на порту 3030  
 server.listen(3030, () => {  
 console.log(`Сервер працює на порту 3030, Process ID: ${process.pid}`);  
 })  
}

Код: src/socket.js

const redis = require('redis');  
const { createAdapter } = require('@socket.io/redis-adapter');  

// Описуємо модуль socket для обробки підключень і подій socket  
function socketModule(io) {  
 const pubClient = redis.createClient({  
 host: 'localhost', // Хост Redis (використовуйте ім'я контейнера, якщо в Docker)  
 port: 6379, // Стандартний порт Redis  
 });  
 pubClient.connect(); // Підключаємо клієнт публікації до Redis  

 // Створюємо дубльований клієнт для підписки на події  
 const subClient = pubClient.duplicate();  

 // Прослуховувачі подій для клієнта публікації  
 pubClient.on('connect', () => console.log('PubClient підключено до Redis'));  
 pubClient.on('error', (err) => console.error('Помилка PubClient Redis:', err));  

 // Прослуховувачі подій для клієнта підписки  
 subClient.on('connect', () => console.log('SubClient підключено до Redis'));  
 subClient.on('error', (err) => console.error('Помилка SubClient Redis:', err));  

 // Прикріплюємо Redis адаптер до Socket.IO для увімкнення pub-sub комунікації між інстанціями  
 io.adapter(createAdapter(pubClient, subClient));  
 // console.log('Підключення до Redis для socket адаптера...');  

 // Створюємо окремий Redis клієнт для керування socket ID користувачів  
 const redisClient = redis.createClient({  
 host: 'localhost',  
 port: 6379,  
 });  
 redisClient.connect();

redisClient.on('connect', () => console.log('Клієнт Redis підключений'));  
redisClient.on('error', (err) => console.error('Помилка клієнта Redis:', err));  

io.use((socket, next) => {  
 console.log({ socket });  

 // Отримуємо токен від клієнта (з `auth` під час handshake)  
 // const token = socket.handshake.auth?.token;   

 // Отримуємо токен від клієнта (з `authorization` під час handshake)  
 const token = socket.handshake.headers.authorization;  

 if (!token) {  
 return next(new Error('Помилка автентифікації: Токен відсутній'));  
 }  

 try {  
 // Замість цього використовуйте свій секретний ключ  
 const secretKey = 'M5^rs$_r39@!lGsjrmdXjaov7fDqT';   

 // Перевіряємо та декодуємо JWT токен  
 const decoded = jwt.verify(token, secretKey);  

 // Додаємо логіку на основі ролі  
 const { role_name } = decoded; // Припускаємо, що токен містить поле `role`  

 if (role_name === 'buyer') {  
 buyers.push({ id: socket.id, role_name });  
 } else if (role_name === 'seller') {  
 sellers.push({ id: socket.id, role_name });  
 } else {  
 return next(new Error('Помилка автентифікації: Невірна роль'));  
 }  

 // Додаємо декодовану інформацію про користувача до об'єкта socket для подальшого використання  
 socket.user = decoded;  
 next(); // Дозволяємо підключення  
 } catch (err) {  
 return next(new Error('Помилка автентифікації: Невірний токен'));  
 }  
 });  

 io.on('connection', (socket) => {  
 console.log(`Socket підключено: ${socket.id}`);  

 socket.on('register', async ({ userType, userId }) => {  
 console.log({ userType, userId });  
 const redisKey = `${userType}_${userId}`;  
 try {  
 await redisClient.lPush(redisKey, socket.id);  
 console.log(`Додано socket ${socket.id} до ${redisKey}`);  
 } catch (err) {  
 console.log('Помилка додавання в Redis:', err);  
 }  

 socket.on('disconnect', async () => {  
 try {  
 // Видаляємо socket ID з Redis, коли socket відключається  
 await redisClient.lRem(redisKey, 0, socket.id);  
 console.log(`Видалено socket ${socket.id} з ${redisKey}`);  
 } catch (err) {  
 console.log('Помилка видалення з Redis:', err);  
 }  
 })  
 });  

 // Обробляємо подію 'sendOrder', щоб сповістити продавців про нове замовлення  
 socket.on('sendOrder', async ({ buyerId, sellerId }) => {  
 console.log({ buyerId, sellerId });  

 // Створюємо Redis ключ для продавця  
 const sellerKey = `seller_${sellerId}`;  

 try {  

 // Отримуємо всі socket ID для продавця з Redis  
 const sellerSockets = await redisClient.lRange(sellerKey, 0, -1);  
 console.log({ sellerSockets });  

 // Відправляємо подію на всі пристрої продавця  
 sellerSockets.forEach((socketId) => {  
 io.to(socketId).emit('newOrder', {  
 buyerId,  
 message: 'У вас нове замовлення'  
 })  
 });  
 // socket.emit('newOrder', { buyerId, message: 'You have new order' })  
 console.log(`Socket надіслано до ${socket.id} з ${sellerKey}`);  
 } catch (err) {  
 console.log('Помилка видалення з Redis:', err);  
 }  
 })  

 })  
}  

module.exports = { socketModule }

Запустіть цю команду

node src/server.js




Перекладено з: [How to Build Scalable Real-Time Apps in Node.js with Socket Clustering and Redis Pub/Sub](https://medium.com/@saikirand/how-to-build-scalable-real-time-apps-in-node-js-with-socket-clustering-and-redis-pub-sub-774e8d4654fc)

Leave a Reply

Your email address will not be published. Required fields are marked *