Ілюстрація регулювання трафіку після втручання
Резюме
У цій статті розглядається безпечна для форків реалізація алгоритму Token Bucket на Python, з акцентом на сценарій, де "кошик токенів" спільно використовують різні працівники через центральну базу даних. Більш конкретно, для ілюстрації цього підходу буде використана база даних MongoDB, з використанням не безпечного для форків підключення PyMongo. Форкування керується за допомогою пулу працівників Celery, які можуть бути запущені на (різних) машинах.
Вступ
При роботі з виробничими системами штучного інтелекту одним із ключових викликів є управління обмеженнями швидкості, що накладаються різними AI API. Ці обмеження є необхідними для забезпечення справедливого використання та підтримки стабільності сервісів, але вони також можуть стати вузьким місцем для вашого застосунку, якщо не керувати ними належним чином.
Розглянемо такий сценарій: ваш застосунок на основі AI був спочатку створений як малий прототип і тепер успішно пройшов перевірку усіх відповідних зацікавлених осіб, готовий до масштабування від команди проєкту до більшої організації. Застосунок здобуває популярність серед колег, і більше користувачів взаємодіють з його функціями. Як тільки використання збільшується, ви починаєте стикатися з обмеженнями швидкості вашого AI API, наприклад, ендпоінту для Великої Мовної Моделі (LLM). Запити починають обмежуватися, продуктивність страждає, і ваші користувачі стикаються з затримками або помилками. Це не просто гіпотетична ситуація — це звичайна проблема, з якою стикаються розробники, що інтегрують AI API.
Або розгляньте реалізацію, де кілька агентів AI співпрацюють для досягнення спільної мети. Якщо ці агенти перевантажать AI ендпоінти, весь набір може зазнати невдачі, що призведе до відсутності значущих результатів.
Щоб ефективно керувати цими обмеженнями швидкості та забезпечити плавний досвід для користувачів, вам потрібне надійне рішення. Алгоритм Token Bucket — один з таких підходів, який забезпечує контрольований і передбачуваний спосіб керування швидкістю запитів. У цій статті ми розглянемо, як реалізувати алгоритм Token Bucket за допомогою Python Celery і MongoDB.
Ми детально розглянемо патерн Token Bucket, пояснимо, чому він підходить для обмеження швидкості AI API, і надамо покрокову інструкцію щодо його впровадження у ваш застосунок. До кінця цієї статті ви зможете добре зрозуміти, як підтримувати надійність і продуктивність вашого застосунку навіть під високим навантаженням.
Алгоритм Token Bucket
Алгоритм Token Bucket — це широко використовувана техніка обмеження швидкості, яка допомагає контролювати потік запитів до API, забезпечуючи стабільність системи і запобігаючи перевантаженню. У цьому алгоритмі токени додаються до кошика з фіксованою швидкістю, що представляє здатність здійснювати API виклики. Кожен вихідний API виклик споживає один токен з кошика. Якщо токени є, виклик API здійснюється негайно; якщо кошик порожній, виклик повинен чекати, поки нові токени не будуть додані. Цей метод дозволяє робити сплески вхідних API викликів, при цьому зберігаючи контрольовану середню вихідну швидкість, що робить його ідеальним для управління змінними навантаженнями та забезпечення надійної продуктивності в умовах високого трафіку. Для більш детального пояснення ви можете звернутися до цієї статті на Wikipedia.
Технічна налаштування
Технічна налаштування для цього проєкту побудовано навколо API на Flask, який використовується для обслуговування кінцевого продукту, наприклад, веб-застосунку. Python Flask є гарним вибором для розробки API завдяки своїй легковажності та гнучкості.
Асинхронні виклики API керуються через пул працівників Celery, налаштований з потенційно довільною кількістю працівників для забезпечення масштабованості та ефективної обробки завдань.
Завдання ставляться в чергу за допомогою RabbitMQ, надійного брокера повідомлень, який надійно зберігає та розподіляє завдання серед пулу працівників. Оброблені результати зберігаються в базі даних MongoDB, вибраній за її здатність до масштабування та ефективного управління великими наборами даних. Ця архітектура створена для забезпечення відгуку та стійкості системи, здатної легко обробляти великий трафік і складні операції.
Виклик
Flask додаток прив'язаний до MongoDB, але PyMongo не є безпечним для форків. Оскільки всі працівники працюють незалежно, потенційно на різних машинах, Python-базований патерн Singleton для спільного ресурсу мало корисний. Замість цього ми використаємо кошик на екземплярі MongoDB, який можна ділити між усіма працівниками.
Але це також створює проблему, оскільки цей підхід за своєю суттю схильний до умов гонки. Отже, написання коду на Python для перевірки, чи є токен у кошику, а потім оновлення кошика, не працюватиме тут. Нам потрібно виконувати код атомарно в Mongo, не дозволяючи жодному іншому процесу втрутитися між перевіркою та оновленням.
Рішення
В основі рішення лежить спостереження, що виклик find
і update
повинен бути виконаний атомарно — тому документ (наш Token Bucket) має бути заблокований, коли він перевіряється та оновлюється одним процесом. Це можна досягти за допомогою Aggregation Pipeline.
Модель даних у колекції Mongo досить проста, ось приклад:
{
"_id": {
"$oid": "650460f1cab8f432f119c6a4"
},
"bucket_name": "my_task",
"token": 0,
"last_time": {
"$date": "2025–01–20T09:18:09.241Z"
}
}
Одна колекція може містити багато різних кошиків для різних завдань. Тому "bucketname" використовується як ідентифікатор.
“token” — це поточна кількість дозволених запитів або доступних токенів у кошику, а “lasttime” — це мітка часу останнього додавання токенів у кошик.
Основою реалізації TokenBucket є така формулювання оновлення в пайплайні:
def get_token(self):
# два варіанти для розрізнення:
# 1) У кошику є хоча б один токен:
# отримуємо його → зменшуємо кількість токенів на один, і додаємо токени, які повинні бути додані з моменту
# останньої операції
# 2) У кошику немає токенів, тоді не віднімаємо, не реєструємо оновлення
pipeline = [{
"$set": {
"token":
{"$cond": [
{"$gte": ["$token", 1]},
{"$min": [self.capacity,
{"$add": [
"$token",
{"$floor": {
"$multiply": [{"$dateDiff": {"startDate":"$last_time","endDate":"$$NOW", "unit":"millisecond"}}, self.rate_per_millisec]}},
-1
]}]},
{"$min": [self.capacity, {"$add": [
"$token",
{"$floor": {
"$multiply": [{"$dateDiff": {"startDate":"$last_time","endDate":"$$NOW", "unit":"millisecond"}}, self.rate_per_millisec]}}
]}]}
]},
# останній час (мітка часу поповнення) оновлюється лише, якщо токени були додані:
"last_time": {"$cond": [{"$gte": [{"$add": [
"$token",
{"$floor": {
"$multiply": [{"$dateDiff": {"startDate":"$last_time","endDate":"$$NOW", "unit":"millisecond"}}, self.rate_per_millisec]}}
]}, 1]}, "$$NOW", "$last_time"]}
}
}]
# Знайти та оновити документ
document = self.bucket_collection.find_one_and_update(
{
'bucket_name': self.bucket_name
},
pipeline,
return_document=pymongo.ReturnDocument.BEFORE
)
is_allowed = document['token'] >= 1
logging.info("Token-bucket з " + str(document['token']) + " токенами повертає " +str(is_allowed) + " для " + self.bucket_name + " на " + str(datetime.datetime.utcnow()))
return is_allowed
Отже, ми перевіряємо, чи є хоча б один токен у кошику, якщо так, ми отримуємо його та оновлюємо кількість токенів, множачи час, що пройшов, на швидкість поповнення, але не більше за ємність. Ви можете встановити цю ємність на 1, щоб запобігти малим сплескам, або, якщо API, до якого здійснюється запит, встановлює ліміт, наприклад, 4 запити на секунду і перевіряє трафік лише на рівні секунди, можна встановити її на 4, дозволяючи малий сплеск з 4 запитів на секунду.
Уся реалізація є класом Python, де метод get_token() є серцем цього процесу:
import datetime
import logging
import pymongo
class TokenBucket:
"""
Клас TokenBucket є допоміжним для реалізації глобальних лімітів швидкості, що
дотримуються навіть у розподіленому середовищі
"""
def __init__(self, refill_rate_per_sec, capacity, db_con, bucket_name, init_new=False):
"""
Клас TokenBucket реалізує TokenBucket для вихідних API запитів на спільному екземплярі Mongo
:param refill_rate_per_sec: Як часто поповнюються токени? Наприклад, 1, якщо це кожну секунду, 1/30 кожні 30 секунд
:param capacity: Яка ємність у кошика.
:param db_con: З'єднання з базою даних
:param bucket_name: Назва кошика токенів, яка повинна відповідати назві в колекції
:param init_new: Булеве значення, що вказує, чи потрібно створити новий кошик
"""
self.refill_rate_per_sec = refill_rate_per_sec
self.rate_per_millisec = refill_rate_per_sec / 1000.0
self.capacity = capacity
self.db_con = db_con
self.bucket_name = bucket_name
self.bucket_collection = db_con.token_buckets
if init_new:
self.bucket_collection.update_one(
{'bucket_name' : bucket_name},
{"$set":
{'bucket_name' : bucket_name, 'token': capacity, 'last_time': "$clusterTime"}
}, upsert=True)
def get_token(self):
…
Використання
Цей Token Bucket можна інтегрувати з інфраструктурою Celery, здійснивши виклик до Python класу в межах shared_task
:
import time, os
from celery import shared_task
from pymongo import MongoClient
@shared_task(queue=os.environ.get('CELERY_QUEUE'), soft_time_limit=10)
def get_response_from_api(key=value):
# відкриваємо клієнт бази даних для працівника:
db_client = MongoClient(
MONGO_SCHEME+'://' +MONGO_USER + ':' + MONGO_PW +
'@'+MONGO_HOST+'/' + MONGO_DB + '?retryWrites=true&w=majority')
db_con = db_client.get_database()
# тепер ініціалізуємо кошик токенів:
token_bucket = TokenBucket(refill_rate_per_sec=4, capacity=1, db_con=db_con, bucket_name='my_task')
while not token_bucket.get_token():
time.sleep(0.1) # Чекаємо 100 мілісекунд перед наступною спробою
result = your_api_call(params)
Загальне завдання потім можна виконати, наприклад, у групі Celery, і ви можете перевірити файл журналу на правильність — з наведеними налаштуваннями тільки 4 виклики на секунду будуть проходити. Всі інші завдання повинні будуть чекати своєї черги.
Не забудьте відповідно налаштувати параметр softtimelimit. Якщо у вас дуже різкий трафік і багато розподілених працівників, які намагаються отримати доступ до дуже повільного API, що не дозволяє багато запитів на секунду, працівники будуть чекати в черзі деякий час і можуть активувати обмежувач softtimelimit.
Тест симуляції
Щоб підтвердити, що цей підхід працює, як очікується, ми можемо провести невеликий тест. Встановіть швидкість поповнення, наприклад, на ½, тоді вихідний запит буде дозволений кожні 2 секунди.
Якщо ми додамо оператор виведення в журналі для працівника, який намагається виконати вихідні запити і відобразимо мітку часу та інформацію про те, чи було запит заблоковано (не дозволено) або пройшов (дозволено), ви побачите наступне у вашому файлі журналу:
2025–01–21 09:40:09.929536: Не дозволено
2025–01–21 09:40:10.061105: Дозволено
2025–01–21 09:40:10.741350: Не дозволено
2025–01–21 09:40:10.861117: Не дозволено
2025–01–21 09:40:10.991974: Не дозволено
2025–01–21 09:40:11.114126: Не дозволено
2025–01–21 09:40:11.240675: Не дозволено
2025–01–21 09:40:11.362580: Не дозволено
2025–01–21 09:40:11.481074: Не дозволено
2025–01–21 09:40:11.604872: Не дозволено
2025–01–21 09:40:11.735626: Не дозволено
2025–01–21 09:40:11.869136: Не дозволено
2025–01–21 09:40:11.998463: Не дозволено
2025–01–21 09:40:12.131688: Не дозволено
2025–01–21 09:40:12.262276: Дозволено
2025–01–21 09:40:12.604045: Не дозволено
2025–01–21 09:40:12.729667: Не дозволено
2025–01–21 09:40:12.854335: Не дозволено
2025–01–21 09:40:12.999993: Не дозволено
2025–01–21 09:40:13.128204: Не дозволено
2025–01–21 09:40:13.252679: Не дозволено
2025–01–21 09:40:13.378298: Не дозволено
2025–01–21 09:40:13.508852: Не дозволено
2025–01–21 09:40:13.633653: Не дозволено
2025–01–21 09:40:13.756551: Не дозволено
2025–01–21 09:40:13.887134: Не дозволено
2025–01–21 09:40:14.017464: Не дозволено
2025–01–21 09:40:14.143564: Не дозволено
2025–01–21 09:40:14.281994: Не дозволено
2025–01–21 09:40:14.416012: Дозволено
2025–01–21 09:40:14.812309: Не дозволено
2025–01–21 09:40:14.933719: Не дозволено
2025–01–21 09:40:15.056074: Не дозволено
2025–01–21 09:40:15.179546: Не дозволено
…
З міток часу видно, що наш сплеск трафіку тепер регулюється до одного запиту кожні дві секунди.
Висновок
У цій статті ми описали реалізацію безпечної для форків версії алгоритму Token Bucket на Python, спеціально адаптованої для управління вихідними API запитами в розподіленому середовищі. Використовуючи базу даних MongoDB та пул працівників Celery, ми продемонстрували, як ефективно обробляти обмеження швидкості, накладені AI API, забезпечуючи стабільність системи та оптимальну продуктивність.
Алгоритм Token Bucket є надійним рішенням для контролю потоку вихідних API запитів. Реалізуючи цей алгоритм, ми можемо керувати змінними навантаженнями та запобігати перевантаженню системи, таким чином зберігаючи плавний досвід користувача навіть за умов високого трафіку.
Використання MongoDB для зберігання кошика токенів дозволяє створити централізований та послідовний механізм обмеження швидкості, який можна використовувати серед кількох працівників Celery, що працюють незалежно на різних машинах.
Ми вирішили проблему небезпечного для форків характеру PyMongo та потенційних умов гонки, застосувавши атомарний пайплайн оновлення. Це забезпечує послідовність і надійність стану кошика токенів навіть при одночасному доступі з кількох працівників.
Дотримуючись кроків, описаних у цій статті, ви зможете впровадити подібне рішення у своїх застосунках, забезпечивши ефективне масштабування ваших AI-систем при дотриманні обмежень швидкості зовнішніх API. Цей підхід не лише покращує надійність та продуктивність вашого застосунку, але й надає масштабоване рішення, яке може рости разом з вашою базою користувачів.
Підсумовуючи, ми бачимо комбінацію Flask, Celery, RabbitMQ і MongoDB разом з алгоритмом Token Bucket як потужну та гнучку архітектуру для роботи з AI-системами виробничого рівня.
Перекладено з: Working with AI Models in a distributed compute infrastructure