Пакетування даних у NodeJS

Пакетна обробка в Node.js — це техніка ефективної обробки великих обсягів даних шляхом розбиття їх на групи або пакети. Замість обробки елементів окремо, пакетна обробка дозволяє оптимізувати продуктивність, керуючи паралельними та асинхронними задачами. Типові випадки використання включають трансформацію даних, обробку файлів та оновлення баз даних. Ця техніка може значно підвищити ефективність системи та зменшити використання пам'яті.

Загальний робочий процес

Типовий робочий процес для пакетної обробки в Node.js включає наступні етапи:

  1. Збір даних: Збір даних для обробки в пакетах, таких як масиви, файли чи записи в базі даних.
  2. Функції для обробки пакетів: Створення функцій для обробки окремих елементів та пакетної обробки.
  3. Визначення розміру пакету: Визначення оптимального розміру пакету залежно від можливостей системи та характеристик даних.
  4. Ітерація по пакетах: Ітерація по кожному пакету, вилучення необхідних даних та їх обробка за допомогою визначених функцій.
  5. Обробка помилок і логування: Реалізація надійної обробки помилок і логування для зручності виконання та налагодження.
  6. Завершення: Завершення обробки з підсумковим або повідомленням про завершення.

Методи пакетної обробки в Node.js

1. Синхронна обробка

Цей підхід обробляє пакети один за одним послідовно, забезпечуючи простоту, але може призвести до недоцільного використання ресурсів системи.

Приклад

function processSynchronously(data, batchSize, processingFunction) {  
 for (let i = 0; i < data.length; i += batchSize) {  
 const batch = data.slice(i, i + batchSize);  
 batch.forEach(item => processingFunction(item));  
 }  
}

2. Використання промісів (Promises)

Проміси дозволяють чисто обробляти асинхронні завдання, особливо для операцій, пов'язаних з мережею чи базами даних.

Приклад

async function processWithPromises(data, batchSize, processingFunction) {  
 for (let i = 0; i < data.length; i += batchSize) {  
 const batch = data.slice(i, i + batchSize);  
 await Promise.all(batch.map(async (item) => processingFunction(item)));  
 }  
}

3. Використання стрімів (Streams)

Стріми забезпечують ефективну обробку великих даних з мінімальним використанням пам'яті, що підходить для обробки великих файлів або безперервного потоку даних.

Приклад: Обробка CSV файлу

const fs = require('fs');  
const readline = require('readline');  

const fileStream = fs.createReadStream('largeDataset.csv');  

const rl = readline.createInterface({  
 input: fileStream,  
});  

rl.on('line', (line) => {  
 // Обробка кожного рядка як частини пакету;console.log(`Обробка: ${line}`);  
})  

rl.on('close', () => {  
 console.log('Пакетна обробка завершена.');  
});

4. Паралельна пакетна обробка

Паралельна обробка повністю використовує ресурси системи, виконуючи кілька пакетів одночасно. Бібліотеки, такі як worker_threads, можуть додатково оптимізувати цей підхід.

Приклад

const { Worker } = require('worker_threads');  

function runWorker(dataBatch) {  
 return new Promise((resolve, reject) => {  
 const worker = new Worker('./batchProcessor.js', { workerData: dataBatch });  
 worker.on('message', resolve);  
 worker.on('error', reject);  
 worker.on('exit', (code) => {  
 if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));  
 });  
 });  
}  

async function processInParallel(data, batchSize) {  
 const batches = [];  
 for (let i = 0; i < data.length; i += batchSize) {  
 batches.push(data.slice(i, i + batchSize));  
 }  
 await Promise.all(batches.map(batch => runWorker(batch)));  
}

Приклад — Додатковий

Розглянемо приклад обробки замовлень клієнтів у системі електронної комерції. Припустимо, що у нас є набір даних про замовлення клієнтів, і нам потрібно оновити ціни замовлень відповідно до нової стратегії ціноутворення. Ми будемо оновлювати ціни за допомогою пакетної обробки Node.js.
Тут пакетна обробка здійснюється за допомогою простого синхронного підходу, який використовує базові цикли та виклики функцій для обробки пакетів.

const ordersData = [  
 { orderId: 1, product: 'Phone', quantity: 2, price: 15 },  
 { orderId: 2, product: 'Tablet', quantity: 1, price: 25 },  
 { orderId: 3, product: 'Macbook', quantity: 3, price: 30 },  
 { orderId: 4, product: 'Laptop', quantity: 1, price: 20 },  
 { orderId: 5, product: 'Microphone', quantity: 4, price: 18 },  
 { orderId: 6, product: 'Earphones', quantity: 2, price: 35 },  
 { orderId: 7, product: 'Jacket', quantity: 1, price: 22 },  
 { orderId: 8, product: 'Shirt', quantity: 2, price: 28 },  
 { orderId: 9, product: 'Shorts', quantity: 3, price: 12 },  
 { orderId: 10, product: 'Pants', quantity: 2, price: 40 },  
 { orderId: 11, product: 'Gazebo', quantity: 1, price: 25 },  
 { orderId: 12, product: 'Dressing Table', quantity: 3, price: 20 },  
 { orderId: 13, product: 'Charger', quantity: 1, price: 32 },  
 { orderId: 14, product: 'Television', quantity: 2, price: 14 },  
 { orderId: 15, product: 'Sweater', quantity: 4, price: 26 },  
];  

const batchSize = 5;  

function updateOrderPrice(order) {  
 const newPrice = parseFloat((order.price * 1.1).toFixed(1));  
 return { ...order, price: newPrice };  
}  

function processABatch(batch, processingFunction) {  
 for (const order of batch) {  
 const updatedOrder = processingFunction(order);  
 console.log(`Order ${updatedOrder.orderId} - Updated Price: $${updatedOrder.price}`);  
 }  
}  

const numOfBatches = Math.ceil(ordersData.length / batchSize);  

for (let batchIndex = 0; batchIndex < numOfBatches; batchIndex++) {  
 const start = batchIndex * batchSize;  
 const end = Math.min(start + batchSize, ordersData.length);  
 const batch = ordersData.slice(start, end);  
 processABatch(batch, updateOrderPrice);  
 console.log(`Batch ${batchIndex + 1} processed.`);  
 console.log("");  
}  

console.log('Batch processing of order prices is complete.');

Пояснення коду

  • Визначається масив, який містить дані про замовлення клієнтів, що потребують обробки, та розмір пакету.
  • Описується функція updateOrderPrice для обробки окремого замовлення шляхом збільшення ціни замовлення на 10%, округленої до 1 десяткового знака.
  • Описується функція processABatch для обробки кожного пакету замовлень. Вона буде ітерувати по кожному замовленню в пакеті та викликати функцію обробки, якою в цьому випадку є функція updateOrderPrice.
  • Обчислюється кількість пакетів на основі даних та визначеного розміру пакету.
  • Цикл ітерує по кожному пакету, і в кожній ітерації ми витягуємо пакет даних за допомогою методу slice.
  • Викликається функція processABatch для обробки поточного пакету даних.

Масштабованість і оптимізація продуктивності

1. Динамічне визначення розміру пакету

Коригування розміру пакету в реальному часі залежно від навантаження на систему або часу обробки. Наприклад, зменшити розмір пакету, якщо використання пам'яті перевищує певний поріг.

Динамічне визначення розміру пакету регулює розмір оброблюваних даних на основі поточних умов системи. Це забезпечує ефективність системи та запобігає її перевантаженню.

Як визначити навантаження на систему

• Метрики системи: Моніторинг використання процесора та пам'яті за допомогою вбудованих модулів Node.js, таких як os, або сторонніх бібліотек.

const os = require('os');  

function getSystemMetrics() {  
 const loadAvg = os.loadavg()[0]; // Середнє навантаження за 1 хвилину  
 const freeMemory = os.freemem();  
 const totalMemory = os.totalmem();  
 const memoryUsage = ((totalMemory - freeMemory) / totalMemory) * 100;  

 return { loadAvg, memoryUsage };  
}

• Затримка в циклі подій (Event Loop Lag): Використовуйте модуль perf_hooks для вимірювання затримок у циклі подій. Затримка в циклі подій безпосередньо вимірює час, коли цикл подій Node.js затримується в обробці наступного завдання через перевантаження або блокуючі завдання.
Це час, який проходить між тим, як завдання ставиться в чергу на виконання (через щось на зразок setImmediate), і коли воно фактично виконується.

const { performance } = require('perf_hooks');  

async function measureEventLoopLag() {  
 const start = performance.now();  

 return new Promise((resolve) => {  
 setImmediate(() => {  
 const lag = performance.now() - start;  
 resolve(lag);  
 });  
 });  
}
  • Затримка в циклі подій (Event Loop Lag) — Цикл подій в Node.js управляє такими завданнями, як таймери, операції введення/виведення та виконання коду, визначеного користувачем. Затримка в циклі подій виникає, коли цикл затримується в обробці завдання через те, що він зайнятий виконанням інших операцій. Висока затримка в циклі подій може вказувати на проблеми з продуктивністю, такі як ресурсоємні завдання або неоптимізований код. Мета цієї функції — виміряти час, який цикл подій витрачає на обробку завдання (setImmediate у цьому випадку). Різниця між фактичним часом (performance.now()) і очікуваним часом представляє затримку.
  • performance — Об'єкт performance є частиною модуля perf_hooks в Node.js і надає високоточні часові позначки. performance.now() дає поточний час у мілісекундах з підмілісекундною точністю. Отже, тут ми фіксуємо час початку завдання циклу подій.
  • setImmediate — Функція setImmediate є частиною Node.js і доступна в глобальній області видимості. Вона дозволяє нам запланувати виконання функції відразу після поточної фази циклу подій. Вона подібна до setTimeout з 0 затримкою, але працює по-іншому, плануючи виклик в фазі "check" циклу подій. У цьому прикладі зворотний виклик обчислює різницю між початковим часом і часом, коли цей зворотний виклик виконується.
    Отже, значення (lag), що було отримано в результаті, представляє затримку в мілісекундах.

• Динамічне регулювання розміру пакетів — Створіть логіку для збільшення чи зменшення розміру пакету залежно від навантаження на систему та затримки в циклі подій.

let batchSize = 100; // Початковий розмір пакету  
const MIN_BATCH_SIZE = 10;  
const MAX_BATCH_SIZE = 1000;  

function adjustBatchSize(metrics, lag, currentBatchSize) {  
 const { loadAvg, memoryUsage } = metrics;  

 if (loadAvg > 2 || memoryUsage > 80 || lag > 100) {  
 // Зменшити розмір пакету при високому навантаженні або затримці  
 return Math.max(MIN_BATCH_SIZE, currentBatchSize / 2);  
 } else if (loadAvg < 1 && memoryUsage < 50 && lag < 50) {  
 // Збільшити розмір пакету при легкому навантаженні  
 return Math.min(MAX_BATCH_SIZE, currentBatchSize * 2);  
 }  

 return currentBatchSize; // Без змін  
}

• Логіка обробки пакетів — Інтегруйте моніторинг метрик та регулювання розміру пакету в цикл обробки.

async function processBatch(batch, processingFunction) {  
 for (const item of batch) {  
 await processingFunction(item); // Симуляція асинхронної обробки  
 }  
}  

async function processDataset(dataset, processingFunction) {  
 let startIndex = 0; // Ініціалізація початкового індексу  

 while (startIndex < dataset.length) {  
 // Збір метрик системи  
 const metrics = getSystemMetrics();  
 const lag = await measureEventLoopLag();  

 // Динамічне коригування розміру пакету  
 batchSize = adjustBatchSize(metrics, lag, batchSize);  

 // Витягнення та обробка поточного пакету  
 const currentBatch = dataset.slice(startIndex, startIndex + batchSize);  
 await processBatch(currentBatch, processingFunction);  

 // Оновлення початкового індексу;  
 startIndex += batchSize;  
 }  
}

• Приклад набору даних та функції обробки

const transactions = Array.from({ length: 10000 }, (_, i) => ({ id: i + 1 }));  

async function processTransaction(transaction) {  
 // Симуляція затримки обробки  
 await new Promise((resolve) => setTimeout(resolve, 10));  
 console.log(`Оброблено транзакцію ${transaction.id}`);  
}  

// Початок обробки  
processDataset(transactions, processTransaction)  
 .then(() => console.log('Усі транзакції успішно оброблено.'))  
 .catch((error) => console.error('Помилка під час обробки:', error));nsactions = Array.from({ length: 10000 }, (_, i) => ({ id: i + 1 }));

Очікувана поведінка

1. Нормальне навантаження:

  • Розмір пакету збільшується, що максимізує пропускну здатність.
  • Система залишається чутливою.

2. Високе навантаження:

  • Розмір пакету зменшується, запобігаючи перевантаженню пам'яті або процесора.
  • Застосунок залишається стабільним.

Переваги в реальному світі

  • Оптимізована продуктивність: Динамічне коригування розміру пакету дозволяє уникати перевантаження ресурсів.
  • Кращий користувацький досвід: Зменшена затримка в циклі подій забезпечує залишкову чутливість навіть під час інтенсивної обробки.
  • Стійкий дизайн: Ефективно обробляє неочікувані сплески навантаження на систему.

2. Управління зворотним тиском (Backpressure)

Для потоків реалізуйте механізми ефективного керування потоком даних і запобігання переповненню пам'яті.

Що таке зворотний тиск (Backpressure)?

Зворотний тиск (Backpressure) виникає, коли виробник даних (наприклад, зчитуваний потік) генерує дані швидше, ніж споживач (наприклад, записуваний потік) може їх обробити. Без керування зворотним тиском надлишкові дані можуть призвести до переповнення пам'яті або погіршення продуктивності.

Як працює зворотний тиск

Потоки Node.js нативно обробляють зворотний тиск за допомогою:

  1. Призупинення потоку даних, коли споживач не може обробити їх достатньо швидко.
    2.
    Відновлення потоку даних, коли споживач готовий.

Як управляти надмірним навантаженням (Backpressure)

• Використання потоків (Streams): Використовуйте методи pause() та resume().

const fs = require('fs');  

const readable = fs.createReadStream('largeFile.txt');  
const writable = fs.createWriteStream('output.txt');  

readable.on('data', (chunk) => {  
 if (!writable.write(chunk)) {  
 readable.pause();  
 }  
});  

writable.on('drain', () => {  
 readable.resume();  
});  

readable.on('end', () => {  
 writable.end();  
 console.log('Обробка завершена');  
});

• Налаштування розміру буфера (Buffer Size Tuning): Налаштуйте параметр highWaterMark для потоків, щоб точно контролювати розмір буфера та потік.

const readable = fs.createReadStream('largeFile.txt', { highWaterMark: 16 * 1024 }); // 16 KB буфер

• Бібліотеки третьих сторін (Third-Party Libraries): Використовуйте інструменти, такі як stream.pipeline, для складніших випадків. Вона автоматично керує надмірним навантаженням через кілька потоків.

const { pipeline } = require('stream');  
const fs = require('fs');  
const zlib = require('zlib');  

pipeline(  
 fs.createReadStream('largeFile.txt'),  
 zlib.createGzip(),  
 fs.createWriteStream('largeFile.txt.gz'),  
 (err) => {  
 if (err) console.error('Не вдалося виконати pipeline', err);  
 else console.log('Pipeline успішно завершено');  
 }  
);

3. Використання бібліотек

Використовуйте бібліотеки, такі як Bull або Agenda, для розподіленої та запланованої пакетної обробки.

Що таке Bull

Bull — це бібліотека Node.js для керування чергами та обробки завдань. Вона використовує Redis як бекенд і оптимізована для швидкої та розподіленої обробки завдань.

Ключові особливості:

  • Пріоритезація завдань.
  • Затримані та повторювані завдання.
  • Контроль паралельних завдань.
  • Повтор спроб та відступи для завдань.
  • Підтримка кластеризації та масштабування.

Випадок використання: Найкраще підходить для реальних додатків, які потребують розподіленого керування чергами, таких як чат-системи, сповіщення по електронній пошті або кодування відео.

Що таке Agenda

Agenda — це легка бібліотека для планування завдань в Node.js, побудована на основі MongoDB. Вона орієнтована на визначення та планування завдань, які можуть виконуватись у конкретний час або інтервали.

Ключові особливості:

  • Планування завдань за допомогою стилю cron.
  • Персистентність завдань (використовуючи колекції MongoDB).
  • Гнучкі API для керування життєвим циклом завдань.
  • Легка інтеграція з існуючими налаштуваннями MongoDB.

Випадок використання: Ідеально підходить для додатків з планованими завданнями, такими як виконання щоденних звітів, надсилання нагадувань по електронній пошті або очищення застарілих даних.

Різниця між Bull та Agenda

Коли використовувати

Використовуйте Bull:

  • Для систем з високим пропуском.
  • Коли потрібне складне керування чергами (наприклад, пріоритет, повтори).
  • Якщо ви вже використовуєте Redis або потребуєте розподіленого масштабування.

Використовуйте Agenda:

  • Коли основною вимогою є планування (наприклад, cron-завдання).
  • Якщо ви вже використовуєте MongoDB і не хочете додавати Redis до стеку.
  • Для простіших випадків використання з нижчими вимогами до продуктивності.

Уникнення непорозумінь

Щоб узагальнити ці останні теми, бо це може бути досить заплутано, давайте роз'яснимо їх простими словами 🙏✊

  1. Що таке динамічне пакетування (Dynamic Batching)?

Динамічне пакетування означає обробку завдань групами (пакетами) і налаштування розміру пакета залежно від продуктивності нашої системи. Чому? Щоб уникнути перевантаження системи, коли вона під сильним навантаженням, і максимізувати пропускну здатність, коли вона вільна.

Коли використовувати:

  • Ми керуємо завданнями вручну (не використовуємо бібліотеки на зразок Bull або Agenda).
  • Ми маємо контроль над тим, коли завдання виконуються, їхнім розміром і тим, як метрики системи спрямовують рішення (наприклад, навантаження на процесор або використання пам'яті).
  1. Що таке Bull?

Bull — це як менеджер для наших завдань. Він:

  • Ставить завдання в чергу: Ми говоримо Bull, які завдання потрібно виконати, і він ставить їх у чергу.
  • Масштабується автоматично: Він обробляє багато завдань одночасно (паралельність).
  • Ограничення швидкості: Ми можемо уповільнити процес, якщо потрібно, щоб уникнути перевантаження системи.
  • Повторює невдалі завдання: Якщо щось не вдалося, Bull спробує знову.

Приклад — У нас є 1,000 зображень для зміни розміру.
Замість того, щоб змінювати розмір усіх зображень одночасно (що може призвести до збою системи), Bull обробляє їх у менших пакетах.

  1. Що таке Agenda?

Agenda — це ще один менеджер для завдань, але він простіший за Bull. Він:

  • Планує завдання: Ми можемо запускати завдання у певний час (наприклад, «Обробляти це кожну хвилину»).
  • Зберігає завдання в MongoDB: Це чудово, якщо наш додаток уже використовує MongoDB.

Приклад — Ми хочемо надсилати електронні листи користувачам щодня о 9 ранку. Agenda ідеально підходить для цього.

  1. Чи вирішують Bull і Agenda проблеми масштабованості?

Так, але частково. Вони справляються з:

  • Чергою завдань: Розбиття завдань на менші пакети (як пакетування).
  • Паралельністю: Виконання кількох завдань одночасно без збою системи.

Однак:

  • Вони не налаштовують розмір пакетів динамічно для нас.
  • Якщо наша система перевантажена, ми все одно повинні реалізувати логіку для адаптації (наприклад, зменшити кількість завдань у кожному пакеті).
  1. Чи слід використовувати тільки одну з бібліотек?

Так, виберіть одну залежно від наших потреб:

  • Використовуйте Bull: Для додатків, що вимагають високої продуктивності (наприклад, зміна розміру зображень, обробка платежів).
  • Використовуйте Agenda: Для простого планування завдань (наприклад, щоденні нагадування по електронній пошті).
  1. Чи потрібне динамічне пакетування з Bull чи Agenda?

Залежить від ситуації:

  • Bull: Ми можемо уникнути ручного пакетування, оскільки він має функції паралельності та обмеження швидкості. Але якщо ми хочемо динамічно адаптувати розмір пакету (залежно від навантаження на процесор/пам'ять), нам все одно знадобиться власна логіка.
  • Agenda: Нам доведеться реалізувати пакетування самостійно, оскільки вона орієнтована на планування.
  1. Як прийняти рішення

Запитайте себе:

1. Чи хочу я керувати пакетами вручну?

  • Так → Використовуйте логіку динамічного пакетування (як приклади, які ми розглядали раніше).
  • Ні → Використовуйте Bull або Agenda.

2. Чи потрібен мені Redis або MongoDB?

  • Redis → Використовуйте Bull.
  • MongoDB → Використовуйте Agenda.

3. Чи є продуктивність завдань критично важливою?

  • Так → Використовуйте Bull.
  • Ні, тільки планування → Використовуйте Agenda.

Резюме TL;DR

  • Динамічне пакетування підходить для точного контролю, коли ми обробляємо завдання вручну.
  • Bull та Agenda керують чергами завдань для нас, вирішуючи багато проблем масштабованості (наприклад, паралельність, повтори).
  • Якщо ми використовуємо Bull/Agenda, можливо, нам не потрібно динамічне пакетування, якщо ми не хочемо розширеного масштабування, орієнтованого на систему.

Вибирайте Bull, якщо потрібно масштабувати важкі завдання, і Agenda для простіших розкладів.

Висновок

В реальних сценаріях ми можемо виконувати більш складні завдання за допомогою пакетної обробки, наприклад, обробка великих обсягів даних з CSV файлу, оновлення бази даних або трансформація даних. Налаштовуючи код під конкретний випадок, ми можемо найкращим чином використовувати Node.js для пакетної обробки. Це може заощадити час, знизити використання пам'яті та підвищити ефективність при роботі з великими обсягами даних.

Перекладено з: Data batching in NodeJS

Leave a Reply

Your email address will not be published. Required fields are marked *