Привіт усім,
Армандо тут 👋 Технічний лідер @ Gusto, керуючи організацією AI Platform. Я розробляю наступне покоління застосунків Gen AI в Gusto, керуючи трансформацією від традиційної до AI-активованої інженерії та будуючи нашу першу когнітивну архітектуру. TLDR: Я використовую основні LLM для прискорення процесів, функцій продуктів і досвідів.
Стримінг виводів великих мовних моделей (LLM) в реальному часі є критично важливим для багатьох застосунків, особливо тих, що потребують інтерактивних або безперервних користувацьких досвідів. Flask, популярний веб-фреймворк, традиційно не підтримує асинхронні парадигми повною мірою, що може ускладнити обробку стрімінгових відповідей з асинхронними генераторами. Хоча Flask і надає асинхронну підтримку для представлень і генераторів, інтеграція асинхронного стрімінгу все ж вимагає деякої ручної настройки. У цьому дописі я покажу вам два елегантні варіанти реалізації для обробки стрімінгових відповідей від LLM, використовуючи функції-генератори, які усувають розрив між асинхронними взаємодіями з LLM і синхронними основами Flask. Реалізація генеруватиме відповіді, сумісні з протоколом Assistant UI.
Що таке Assistant UI?
Assistant UI — це фреймворк і протокол, що стандартизує комунікацію та взаємодію між користувачами та AI-системами. Він орієнтований на структуроване повідомлення, стрімінг відповідей у реальному часі та безшовну інтеграцію з сучасними платформами для чат-ботів і AI, такими як OpenAI та LangChain. Ключові функції включають підтримку відповідей у частинах, модульний дизайн та сумісність з існуючими протоколами комунікації AI.
- Assistant UI спрощує розробку застосунків для спілкування, обробляючи структуровані повідомлення та надаючи повторно використовуваний фреймворк. Стрімінг у реальному часі забезпечує швидші й більш інтерактивні користувацькі досвіди, особливо для чат-ботів або віртуальних помічників.
- Його протокол ефективно керує використанням ресурсів, дозволяючи створювати масштабовані застосунки, які здатні витримувати високі навантаження. Модульний дизайн дозволяє легко інтегрувати або змінювати постачальників AI без порушення роботи інтерфейсу користувача.
Ключові цілі
- Ефективно передавати відповіді AI частинами до клієнта.
- Підтримувати чисту і повторно використовувану абстракцію для обробки стрімінгових завдань.
- Безшовно інтегрувати з Flask, використовуючи патерн Assistant UI.
Основні компоненти реалізації
Логіка інференсу
Функція інференсу організовує взаємодію з LLM, використовуючи LangChain’s ChatPromptTemplate та ChatOpenAI:
async def inference(query, conversation):
"""
Головна функція для генерування відповідей.
"""
llm_prompt = ChatPromptTemplate.from_template(
"Ти корисний помічник. Тобі надано тему, і ти маєш згенерувати відповідь. "
"Запит користувача:\n{query}\n\n"
"Історія розмови:\n{conversation}\n\n"
)
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=1,
openai_api_key=Secrets.OPENAI_CLIENT_API_KEY.value,
)
runnable = llm_prompt | llm
async for event in runnable.astream_events(
{"query": query, "conversation": conversation[:-1]},
return_only_outputs=True,
version="v1",
):
if event["event"] == "on_chat_model_stream":
yield event["data"]["chunk"].content.replace("\n", "\\n")
if event["event"] == "on_chat_model_end":
yield ""
Ось що відбувається:
- Шаблон
llm_prompt
формує запит для моделі, використовуючи історію розмови та запит користувача. astream_events
від LangChain асинхронно передає події, згенеровані LLM.- Частини вмісту відповіді передаються по мірі отримання від моделі.
Генератор загального призначення для стрімінгу
Функція generate_stream є серцем цієї реалізації.
Ця функція діє як обгортка для обробки асинхронних частин даних від LLM і форматування їх для стрімінгових відповідей.
def generate_stream(callable_fn, chunk_fnc) -> Generator[str, None, None]:
"""
Генерує відповіді. Перемикає між асинхронними і синхронними парадигмами, частини можуть оброблятися по мірі їх надходження
@param callable_fn [Callable] Функція, що повертає асинхронний ітератор частин.
@param chunk_fnc [Callable] Функція, що форматує кожну частину.
@return [Generator] Генератор, що повертає відформатовані частини тексту відповіді.
"""
async def run_inference():
async for chunk in callable_fn():
yield chunk
async def collect_chunks():
chunks = []
async for chunk in run_inference():
# Клієнт може діяти з частиною тут для паралельної обробки!
chunks.append(chunk)
return chunks
chunks = asyncio.run(collect_chunks())
for chunk in chunks:
yield chunk_fnc(chunk)
Ось що відбувається:
- Приймає асинхронну функцію (
callable_fn
), яка передає дані, та функцію форматування (chunk_fnc
). - Використовує
asyncio.run
, щоб керувати асинхронними операціями в синхронному середовищі Flask. - Ітерує через частини, застосовуючи форматування перед поверненням.
Уявіть цю функцію як міст між асинхронними і синхронними парадигмами: використовуючи generate_stream
, бажану асинхронну природу відповідей LLM можна перетворити в синхронний генератор, сумісний з Flask. Потім ми можемо безперервно обробляти дані частинами по мірі їх надходження.
Хочете передавати вміст браузеру по мірі його отримання? Для цього потрібно повернути синхронний генераторний міст, який буде передавати частини по мірі їх надходження.
def __iter_over_async(async_generator) -> Generator[str, None, None]:
"""
Ітерує по асинхронному ітератору і повертає відформатовані частини.
Повертає:
str: Відформатована частина тексту відповіді.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
iterator = async_generator.__aiter__()
async def get_next() -> tuple[bool, Any]:
"""
Отримує наступну частину з ітератора.
Повертає:
tuple[bool, Any]: Кортеж, що містить булевий результат, чи завершено ітерацію, і частину.
"""
try:
obj = await iterator.__anext__()
return False, obj
except StopAsyncIteration:
return True, None
except Exception as e:
print(f"Error in get_next: {e}")
# Обробка помилок від callable_fn або chunk_fnc
return True, None
try:
while True:
done, obj = loop.run_until_complete(get_next())
if done:
break
yield obj
finally:
loop.close()
def generate_stream(callable_fn, chunk_fnc) -> Generator[str, None, None]:
# Адаптуємо collect_chunks, щоб приймати аргументи і застосовувати їх
return __iter_over_async(collect_chunks(callable_fn, chunk_fnc))
API-ендпоінт Flask
Маршрут стрімінгу інтегрує всі компоненти, надаючи ендпоінт для стрімінгу відповідей AI:
def send_chunk(chunk, newline=False):
"""
Надсилає частину клієнту, використовуючи патерн Assistant UI.
"""
extra_char = "\\n" if newline else ""
chunk = chunk.replace("\n", "\\n").replace('"', '\\"')
return f'0:"{chunk}{extra_char}"\n'
@ExternalAPINameSpaces.EXTERNAL_API_EXAMPLES.value.route("/stream", methods=["POST"])
def stream():
"""
Приклад стрімінгу, використовуючи протокол Assistant UI.
"""
payload = request.get_json(force=True).get("messages", [])
query = get_last_user_message(payload)
conversation = get_all_messages(payload)
return Response(
generate_stream(lambda: inference(query, conversation), send_chunk),
mimetype="application/json",
)
Ось що відбувається:
- Видобуває запит користувача та історію розмови з тіла POST-запиту.
- Викликає
generate_stream
зinference
як викликається функцією іsend_chunk
як функцією форматування. - Повертає об'єкт
Response
Flask з синхронним генератором (тобто...
контент не надсилається браузеру до завершення інференції).
Хочете передавати контент клієнту по мірі його отримання? Для підтримки цього випадку, нам потрібно повернути асинхронний генераторний міст до Flask Response
.
Переваги цього підходу
- Повторне використання: Функція
generate_stream
абстрагує логіку стрімінгу для будь-якого постачальника LLM. - Сумісність з протоколом: Забезпечує, що частини, які передаються, відповідають патерну Assistant UI.
- Простота: Залишає ендпоінт Flask простим, делегуючи складність у повторно використовувані функції.
- Масштабованість: Підтримує одночасні з'єднання клієнтів без блокування інших запитів.
Остаточні думки
Ця реалізація демонструє, як адаптувати асинхронне стрімінгове передавання для Flask за допомогою комбінації повторно використовуваних абстракцій і конкретного форматування протоколу. Незалежно від того, чи працюєте ви з OpenAI, LangChain або іншим постачальником LLM, цей підхід гарантує безшовну інтеграцію та ефективне стрімінгове передавання відповідей.
Якщо ви досліджуєте схожі випадки використання стрімінгу або маєте відгуки щодо оптимізації цього рішення, дайте знати.
Дякую за увагу! Ви можете знайти мене на Armando. Завжди радий поспілкуватися 👋.
Перекладено з: Streaming AI Responses with Flask: A Practical Guide