Як не перевантажити API?
Створено за допомогою Lexica
JavaScript надає зручні методи для роботи з масивами, такі як map, forEach та reduce, але вони не мають прямого способу контролювати, скільки асинхронних завдань виконується одночасно. Якщо спробувати використовувати ці методи для асинхронних завдань, ви або запустите їх усі одночасно, або виконуватимете їх по черзі, і немає вбудованої можливості розподіляти їх на менші групи. Зазвичай потрібно обробляти список елементів, кожен з яких може вимагати виклику API або іншої асинхронної операції. Якщо ваш список невеликий, можна запускати всі виклики одночасно за допомогою map і чекати їх за допомогою Promise.all. Однак, якщо у вас є понад 1000 елементів, ви не хочете перевантажувати API або випадково викликати DDoS-атаку. Ось тут обмеження кількості одночасних операцій стає корисним.
У цьому пості я покажу один із простих способів обмеження конкуренції. Звісно, є й інші методи, але цей є простим і зрозумілим.
Повний код, якщо ви хочете пропустити решту посту
type Item = {
id: number
}
async function callApi(item: Item): Promise {
return new Promise(resolve => {
setTimeout(() => {
console.log(`Завершено виклик API для елемента: ${item.id}`)
resolve(`Відповідь для елемента ${item.id}`)
}, 1000)
})
}
async function processItemsWithConcurrency(items: Item[], concurrency: number): Promise {
const results: string[] = new Array(items.length)
let currentIndex = 0
async function worker() {
while (true) {
const index = currentIndex
currentIndex++
if (index >= items.length) break
results[index] = await callApi(items[index])
}
}
const workers = []
for (let i = 0; i < concurrency; i++) {
workers.push(worker())
}
await Promise.all(workers)
return results
}
(async () => {
const itemsToProcess: Item[] = Array.from({ length: 25 }, (_, i) => ({
id: i + 1
}))
const concurrency = 10
console.log(`Починаємо обробку ${itemsToProcess.length} елементів з конкуренцією = ${concurrency}`)
const responses = await processItemsWithConcurrency(itemsToProcess, concurrency)
console.log('Усі елементи оброблено:', responses)
})()
Як це працює "під капотом"?
Тип Item
Цей тип визначає структуру даних, з якими ми працюємо. У цьому прикладі, Item
— це просто об'єкт, що має властивість id
типу number. Ви можете додавати інші властивості, якщо ваш випадок використання цього вимагає. Розглядайте це як тип масиву, по якому ви перебираєте елементи.
callApi(item: Item)
Ця функція імітує асинхронний виклик API. Вона приймає один об'єкт Item
, а потім повертає Promise
. Всередині вона використовує setTimeout
, щоб імітувати затримку (у цьому випадку 1000 мілісекунд). Після затримки вона виводить повідомлення, що вказує на ID елемента, та вирішує проміс, що містить рядок з ID елемента. У реальному світі ви замінили б це на фактичний виклик API.
processItemsWithConcurrency(items: Item[], concurrency: number)
Ця функція приймає масив елементів і число concurrency
. Вона повертає список відповідей, кожна з яких відповідає одному з елементів:
results
: Масив (такої ж довжини, як іitems
), в якому зберігаються відповіді, отримані відcallApi
.currentIndex
: Лічильник, що відслідковує, який елемент потрібно обробити наступним.worker
: Внутрішня асинхронна функція. Кожен робітник виконується до тих пір, поки не будуть оброблені всі елементи. Він зчитує поточний індекс, збільшує його, перевіряє, чи не перевищує він довжину масиву, і якщо не перевищує, викликаєcallApi
для цього елемента та зберігає результат у масивіresults
.- Створення робітників: Створюється стільки робітників, скільки вказано у значенні
concurrency
, і вони зберігаються в масиві. - Очікування всіх робітників: Використовуємо
Promise.all
, щоб чекати завершення всіх робітників.
Якщо всі робітники завершать свою роботу, ми повертаємо масивresults
.
Як ми можемо бути впевнені, що робітники не заберуть один і той самий елемент більше ніж один раз?
Це дуже цікава частина Node.js, яка стає в нагоді, коли ви працюєте з подібними ситуаціями. Коли ви вперше читаєте код, можливо, виникає питання, як ми можемо бути впевнені, що два робітники не виберуть один і той самий елемент одночасно. Коротка відповідь: Node.js працює в однонитковому циклі подій (event loop), що означає, що кожна функція робітника завжди звертається до змінної currentIndex
і оновлює її по черзі. Зокрема:
- Робітник зчитує поточне значення
currentIndex
- Відразу збільшує
currentIndex
- Якщо новий
index
виходить за межі масивуitems
, робітник зупиняється
Через таку послідовність жоден з робітників не зможе одночасно прочитати одне й те саме значення currentIndex
, перш ніж воно буде збільшене. Коли робітник забирає значення currentIndex
, це число миттєво збільшується, тому наступний робітник завжди бачить нове значення. Саме тому немає жодного шансу для перекриття, коли один і той самий елемент обробляється двома різними робітниками.
Перекладено з: Limiting Concurrency in Node.js