Проєктування API для стрімінгу за допомогою Python та Javascript

“Дивно, як щось таке просте, як ‘плавно завантажити дані’, може перетворитися на боротьбу на рівні протоколу.”

Цю цитату я почув від розробника, з яким працював нещодавно. Ми обговорювали пагінацію даних із бази для завантаження в таблицю, що є досить поширеною темою в моїй роботі — як передавати дані плавно через мережу, зменшувати транзакційні витрати, зберігати пропускну здатність та робити інтерфейс чуйним.

Не важливо, чи я допомагаю з реальним часом для аналітики, панелями керування для IoT або класичними CRUD-додатками — всі хочуть, щоб результати були плавними, і всі намагаються цього досягти.

Незалежно від сфери, зазвичай ми стикаємося з необхідністю отримувати дані пакетами — часто пагінованими з бази — і передавати їх на UI або до рівня додатка. І часто ці пакети перетворюються на величезні обсяги, які повільно завантажуються, змушують браузери зависати або блокують потоки без необхідності.

Це змусило мене подумати про різні способи, якими я допомагав впроваджувати стрімінг-розв'язки — не через складні системи на кшталт Kafka, RabbitMQ чи Kinesis, а через простий, безпосередній стрімінг: передача результатів з API споживачеві, щоб це виглядало чуйно.

У цій статті ми розглянемо чотири практичні підходи до стрімінгу даних з бекенду на фронтенд, які дозволяють зробити пагінацію більш плавною та чуйною:

  1. Server-Sent Events (SSE): Простий та надійний спосіб передачі даних від бекенду до фронтенду.
  2. WebSockets: Повнодуплексний канал для реального часу, що дозволяє двостороннє спілкування.
  3. Chunked HTTP Streaming: Гнучкий шаблон для обробки текстових стрімів за допомогою fetch() та ReadableStream.
  4. HTTP/2 Streaming (з і без gRPC): Сучасний та високопродуктивний підхід з мультиплексованими стрімами і сильно типізованими інтерфейсами gRPC.

Кожен з цих підходів показує, як можна передавати дані споживачу по мірі їх зчитування з бази даних, а не тільки після того, як вони будуть отримані повністю. Хоча кожен метод має свої плюси і мінуси, всі вони роблять ваш додаток швидшим, легшим і зручнішим для користувача.

0. Передумови

Перед тим як ми зможемо стрімити результати, нам потрібно щось для запитів. У цьому прикладі ми будемо використовувати PostgreSQL як базу даних і Python для генерації та вставки тестових даних.

Ми створимо просту таблицю з такими полями:

  • pk — Первинний ключ (автоінкремент)
  • sk — Ключ сортування (випадкове або патерноване ціле число)
  • fname — Ім'я
  • lname — Прізвище
  • dob — Дата народження
  • zipcode — 5-значний ZIP код

CREATE TABLE people (
pk SERIAL PRIMARY KEY,
sk INTEGER NOT NULL,
fname TEXT NOT NULL,
lname TEXT NOT NULL,
dob DATE NOT NULL,
zipcode TEXT NOT NULL
);

Тепер заповнимо таблицю 1 мільйоном рядків тестових даних за допомогою Python і бібліотеки faker.

import psycopg2
from faker import Faker
from random import randint
from datetime import datetime
import time

fake = Faker()

conn = psycopg2.connect("dbname=yourdb user=youruser password=your_password host=localhost")
cursor = conn.cursor()

start = time.time()
batchsize = 10000
total = 1
000_000

print("Inserting data...")

for i in range(0, total, batchsize):
batch = [
(
randint(1, 500), # sk
fake.first
name(), # fname
fake.lastname(), # lname
fake.date
ofbirth(minimumage=18, maximumage=90),
fake.zipcode()
)
for _ in range(batch
size)
]

argsstr = ",".join(cursor.mogrify("(%s, %s, %s, %s, %s)", row).decode("utf-8") for row in batch)
cursor.execute("INSERT INTO people (sk, fname, lname, dob, zipcode) VALUES " + args
str)
conn.commit()
print(f"Inserted {i + batch_size} rows...")

end = time.time()
print(f"Finished in {end - start:.2f} seconds.")

cursor.close()
conn.close()

Зачекайте кілька хвилин, і ми отримаємо базу даних з мільйоном записів, готових для нашої розробки.

1. Стрімінг з Server-Sent Events (SSE)

Server-Sent Events (SSE) — це простий спосіб передачі даних від сервера до браузера через HTTP. Браузер відкриває односпрямоване з'єднання через EventSource API, і сервер надсилає дані в форматі text/event-stream.

На відміну від WebSockets, SSE є односпрямованим (сервер до клієнта), але він вбудований у сучасні браузери, не потребує додаткових бібліотек і чудово підходить для легких випадків реального часу. Це може бути ідеальним рішенням для стрімінгу даних в UI.

Для нашого випадку, коли ми пагінуємо дані з бази та передаємо їх як тільки вони зчитуються, SSE — ідеальний вибір.

  • Підтримує довготривале, поетапне передавання даних.
  • Працює через стандартний HTTP.
  • Чудово підходить для поступової передачі даних в UI.

Ми можемо поєднати SSE з логікою пагінації на сервері, де сервер пагінує через базу даних і передає кожен рядок клієнту по мірі його отримання.

from flask import Flask, Response, streamwithcontext
import psycopg2
import json

app = Flask(name)

def getdbconnection():
return psycopg2.connect("dbname=yourdb user=youruser password=your_password host=localhost")

@app.route('/stream')
def streampeople():
def generate():
conn = get
dbconnection()
cursor = conn.cursor()
batch
size = 100
offset = 0
total_sent = 0

while True:
cursor.execute("""
SELECT pk, sk, fname, lname, dob, zipcode
FROM people
ORDER BY pk
LIMIT %s OFFSET %s
""", (batch_size, offset))

rows = cursor.fetchall()
if not rows:
break

for row in rows:
person = {
"pk": row[0],
"sk": row[1],
"fname": row[2],
"lname": row[3],
"dob": str(row[4]),
"zipcode": row[5],
}
yield f"data: {json.dumps(person)}\n\n"
total_sent += 1

offset += batch_size

cursor.close()
conn.close()

return Response(streamwithcontext(generate()), mimetype='text/event-stream')

Цей підхід забезпечує плавний процес отримання і рендерингу даних для користувачів.

2. Стрімінг за допомогою WebSockets

WebSockets забезпечують повнодуплексний канал зв'язку з низькою затримкою через одне TCP з'єднання. Вони підтримують двосторонній зв'язок, що дозволяє клієнтам ініціювати запити або отримувати оновлення в реальному часі.

WebSockets ідеально підходять для складніших застосунків, де потрібно двостороннє спілкування, а також для інтерактивних UI.

from flask import Flask
from flask_socketio import SocketIO, emit
import psycopg2
import json

app = Flask(name)
socketio = SocketIO(app, corsallowedorigins="*")

def getdbconnection():
return psycopg2.connect("dbname=yourdb user=youruser password=your_password host=localhost")

@socketio.on('startstream')
def handle
startstream():
conn = get
dbconnection()
cursor = conn.cursor()
batch
size = 100
offset = 0
total_sent = 0

while True:
cursor.execute("""
SELECT pk, sk, fname, lname, dob, zipcode
FROM people
ORDER BY pk
LIMIT %s OFFSET %s
""", (batch_size, offset))

rows = cursor.fetchall()
if not rows:
break

for row in rows:
person = {
"pk": row[0],
"sk": row[1],
"fname": row[2],
"lname": row[3],
"dob": str(row[4]),
"zipcode": row[5],
}
emit('row', person)
total_sent += 1

offset += batch_size

cursor.close()
conn.close()

WebSockets дозволяють реалізувати більш інтерактивні додатки з двостороннім зв'язком і гнучким управлінням потоком.

3. Стрімінг за допомогою чанкових HTTP-відповідей

Чанковий HTTP стрімінг використовує стандартний HTTP, щоб передавати дані поступово. Це підходить, коли вам потрібен контроль над передачею даних,

“Дивно, як щось таке просте, як ‘плавно завантажити дані’, може перетворитися на боротьбу на рівні протоколу.”

Я почув цю цитату від розробника, з яким нещодавно працював. Ми обговорювали пагінацію даних з бази даних для завантаження в таблицю. Це розмова, яка виникає щонайменше раз на місяць у моїй ролі — як плавно передавати дані через мережу, як зменшити транзакційні витрати, як зберегти пропускну здатність і як зробити інтерфейс більш чутливим.

Не має значення, чи я допомагаю з випадками реального часу для аналітики, панелями керування для IoT, або просто класичними CRUD-додатками — всі хочуть плавні результати, і всі намагаються їх досягти.

Незалежно від галузі, майже завжди ми опиняємося в ситуації, коли дані потрібно отримувати пакетами — зазвичай пагінованими з бази даних — і передавати їх в UI або на рівень додатка. І занадто часто ці пакети перетворюються на величезні частини, які повільно завантажуються, змушують браузери висіти або блокують потоки без потреби.

Це змусило мене задуматися про всі різні способи, якими я допомагав людям впроваджувати рішення для стрімінгу — не Kafka, RabbitMQ, або Kinesis — а стрімінг в простому, більш безпосередньому сенсі: стрімінг результатів з API до споживача так, щоб це виглядало чуйно.

У цій статті ми розглянемо, як зробити пагінацію більш плавною і чуйною, досліджуючи чотири практичні підходи до стрімінгу з бекенду на фронтенд:

  1. Server-Sent Events (SSE): Простий, нативний для браузера спосіб передачі даних від бекенду до фронтенду.
  2. WebSockets: Канал повного двостороннього зв'язку в реальному часі для надсилання та отримання повідомлень.
  3. Chunked HTTP Streaming: Гнучкий шаблон, що використовує fetch() та ReadableStream для обробки текстових стрімів.
  4. HTTP/2 Streaming (з і без gRPC): Сучасний, високопродуктивний підхід, що використовує мультиплексовані стріми та сильно типізовані інтерфейси gRPC.

Кожен з цих підходів покаже, як можна стрімити результати з бекенду, який пагінує через базу даних — передаючи дані споживачу під час їх читання, а не тільки коли вони будуть повністю отримані. Хоча кожен метод має свої переваги та недоліки, всі вони роблять ваше додаток швидшим, легшим і набагато зручнішим для користувача.

0. Передумови

Перш ніж ми зможемо стрімити будь-які результати, нам потрібно щось для запитів.
Для цього прикладу ми використовуватимемо PostgreSQL як нашу базу даних та Python для генерації та вставки тестових даних.

Ми створимо просту таблицю з таким описом:

  • pk — Первинний ключ (автоінкремент)
  • sk — Ключ сортування (випадкове або патерноване ціле число)
  • fname — Ім'я
  • lname — Прізвище
  • dob — Дата народження
  • zipcode — 5-значний ZIP код
CREATE TABLE people (  
 pk SERIAL PRIMARY KEY,  
 sk INTEGER NOT NULL,  
 fname TEXT NOT NULL,  
 lname TEXT NOT NULL,  
 dob DATE NOT NULL,  
 zipcode TEXT NOT NULL  
);

Тепер давайте заповнимо її 1 мільйоном рядків реалістичних даних за допомогою Python та бібліотеки faker.

import psycopg2  
from faker import Faker  
from random import randint  
from datetime import datetime  
import time  

fake = Faker()  

conn = psycopg2.connect("dbname=your_db user=your_user password=your_password host=localhost")  
cursor = conn.cursor()  

start = time.time()  
batch_size = 10000  
total = 1_000_000  

print("Inserting data...")  

for i in range(0, total, batch_size):  
 batch = [  
 (  
 randint(1, 500), # sk  
 fake.first_name(), # fname  
 fake.last_name(), # lname  
 fake.date_of_birth(minimum_age=18, maximum_age=90),  
 fake.zipcode()  
 )  
 for _ in range(batch_size)  
 ]  

 args_str = ",".join(cursor.mogrify("(%s, %s, %s, %s, %s)", row).decode("utf-8") for row in batch)  
 cursor.execute("INSERT INTO people (sk, fname, lname, dob, zipcode) VALUES " + args_str)  
 conn.commit()  
 print(f"Inserted {i + batch_size} rows...")  

end = time.time()  
print(f"Finished in {end - start:.2f} seconds.")  

cursor.close()  
conn.close()

Дайте цьому кілька хвилин, і ми матимемо базу даних з 1 мільйоном записів, готових до використання в нашій розробці.

1. Стрімінг з Server-Sent Events (SSE)

Server-Sent Events (SSE) — це простий спосіб для сервера відправляти оновлення в браузер через HTTP. Браузер відкриває однонаправлене з'єднання за допомогою API EventSource, і сервер відправляє дані через мережу у спеціальному форматі: text/event-stream.

На відміну від WebSockets, SSE є строго односпрямованим (сервер до клієнта), але він вбудований в сучасні браузери, не потребує додаткових бібліотек і чудово підходить для легких випадків реального часу. Це може бути корисно для таких застосувань, як стрімінг рядків даних в інтерфейс користувача, коли вони читаються з бази даних.

Для нашого випадку — пагінація даних з таблиці Postgres та стрімінг результатів під час їх отримання — SSE є природним вибором:

  • Він підтримує довготривале, поетапне доставлення даних.
  • Працює через стандартний HTTP (без необхідності оновлення або рукопожаття).
  • Чудово підходить для поступового передавання даних в UI без очікування на повний набір результатів.

Ми можемо поєднати SSE з бекенд-логікою пагінації: сервер пагінує через базу даних пакетами (наприклад, 100 рядків за раз) і стрімить кожен рядок клієнту, коли він читається.
Клієнт отримує рядки в реальному часі та одразу їх рендерить — без очікування на завершення повного запиту.

Для API ми використаємо Flask, оскільки він надає дуже простий і зручний фреймворк для розробки API.

from flask import Flask, Response, stream_with_context  
import psycopg2  
import json  

app = Flask(__name__)  

def get_db_connection():  
 return psycopg2.connect("dbname=your_db user=your_user password=your_password host=localhost")  

@app.route('/stream')  
def stream_people():  
 def generate():  
 conn = get_db_connection()  
 cursor = conn.cursor()  
 batch_size = 100  
 offset = 0  
 total_sent = 0  

 while True:  
 cursor.execute("""  
 SELECT pk, sk, fname, lname, dob, zipcode  
 FROM people  
 ORDER BY pk  
 LIMIT %s OFFSET %s  
 """, (batch_size, offset))  

 rows = cursor.fetchall()  
 if not rows:  
 break  

 for row in rows:  
 person = {  
 "pk": row[0],  
 "sk": row[1],  
 "fname": row[2],  
 "lname": row[3],  
 "dob": str(row[4]),  
 "zipcode": row[5],  
 }  
 yield f"data: {json.dumps(person)}\n\n"  
 total_sent += 1  

 offset += batch_size  

 cursor.close()  
 conn.close()  

 return Response(stream_with_context(generate()), mimetype='text/event-stream')

Ключовий формат тут — data: …\n\n, що відповідає специфікації SSE.

Зверніть увагу на структуру генератора з функцією yield. Ця функція в поєднанні з функцією streamwithcontext, імпортованою з Flask, дозволяє передавати дані, як тільки вони запитуються.

Щоб представити дані на фронтенді, ми можемо використати JavaScript для того, щоб це було корисно для користувачів.




Ключовим моментом у цьому JavaScript є EventSource (прослуховувач подій), який вказує браузеру на необхідність стрімити ці дані. SSE ідеально підходить для:

  • Стрімінгу односпрямованих оновлень даних
  • Спрощених рішень (без WebSocket сервера, без додаткових налаштувань протоколу)
  • Легких панелей управління, переглядачів журналів, рендерингу рядок по рядку

Але він має обмеження:

  • Підтримує лише комунікацію сервер → клієнт
  • Не підходить для бінарних даних (тільки текстовий протокол)
  • Без вбудованого механізму зворотного тиску або управління потоком
  • Браузери підтримують його тільки через HTTP/1.1 (не через HTTP/2)

Проте для нашого випадку — стрімінгу пагінованих результатів запитів для того, щоб зробити UI чуйним — це працює чудово. З’єднання залишається відкритим, рядки приходять, як тільки вони зчитуються, і користувач починає бачити дані негайно.

2. Стрімінг за допомогою WebSockets

WebSockets забезпечують повнодуплексний канал комунікації з низькою затримкою між браузером і вашим бекендом через одне постійне TCP з’єднання. На відміну від SSE, який є односпрямованим, WebSockets дозволяють двосторонню комунікацію. Це означає, що клієнти можуть ініціювати, паузувати, запускати, зупиняти і виконувати багато інших операцій через це з’єднання.

Для нашого випадку ми все одно будемо пагінувати через базу даних пакетами на бекенді і стрімити кожен рядок одразу, коли він зчитується.
Але цього разу з'єднання управляється за допомогою WebSocket, а не стандартного HTTP.

WebSockets ідеально підходять, коли вам потрібно:

  • Двостороння комунікація (наприклад, клієнт ініціює стрім, бекенд надсилає дані)
  • Низька накладна вартість після початкового рукопожаття
  • Підтримка більш інтерактивних або подієвих робочих процесів

Вони більш гнучкі за SSE і працюють у більшій кількості середовищ (включаючи бінарний транспорт і HTTP/2 на певних стека). Для стрімінгу пагінованих результатів запитів вони надають кращий контроль та можливість реалізувати управління потоком, підтвердження або паузу/продовження, якщо це необхідно.

Ми використаємо Flask-SocketIO, який надає підтримку WebSocket з API, знайомим з Flask.

from flask import Flask  
from flask_socketio import SocketIO, emit  
import psycopg2  
import json  

app = Flask(__name__)  
socketio = SocketIO(app, cors_allowed_origins="*")  

def get_db_connection():  
 return psycopg2.connect("dbname=your_db user=your_user password=your_password host=localhost")  

@socketio.on('start_stream')  
def handle_start_stream():  
 conn = get_db_connection()  
 cursor = conn.cursor()  
 batch_size = 100  
 offset = 0  
 total_sent = 0  

 while True:  
 cursor.execute("""  
 SELECT pk, sk, fname, lname, dob, zipcode  
 FROM people  
 ORDER BY pk  
 LIMIT %s OFFSET %s  
 """, (batch_size, offset))  

 rows = cursor.fetchall()  
 if not rows:  
 break  

 for row in rows:  
 person = {  
 "pk": row[0],  
 "sk": row[1],  
 "fname": row[2],  
 "lname": row[3],  
 "dob": str(row[4]),  
 "zipcode": row[5],  
 }  
 emit('row', person)  
 total_sent += 1  

 offset += batch_size  

 cursor.close()  
 conn.close()

Зверніть увагу, що структура схожа на структуру API Flask. Замість використання декоратора функції через app.route, тут використовується @socketio.on(‘start_stream’).

Для фронтенду ми використаємо Socket.IO клієнт, щоб отримувати стрімовані рядки.



Зверніть увагу на імпорт, який потрібно зробити, щоб підключити необхідні JavaScript бібліотеки для підтримки WebSockets. Команда start_stream є тією, яка ініціює API Flask для виконання запиту та початку стрімінгу результатів. Цей приклад показує лише маленьку частину того, що можна зробити з WebSockets.

WebSockets ідеально підходять для:

  • Повнодуплексного зв'язку (клієнт і сервер можуть спілкуватися між собою)
  • Чудово підходить для реальних UI та панелей управління
  • Працює через HTTP/1.1 і HTTP/2
  • Більш гнучкий протокол (можна передавати бінарні дані, спеціальні події)

Але має обмеження, такі як:

  • Потребує бібліотек для WebSocket сервера та клієнта (наприклад, Socket.IO)
  • Трохи складніше, ніж SSE для односпрямованого стрімінгу
  • Не так зручний для використання через файрволи або проксі, як звичайний HTTP/SSE
  • Немає нативного API браузера (як у EventSource (прослуховувач подій)), тому потрібен зовнішній JavaScript

Ви хочете використовувати WebSockets, коли:

  • Вам потрібен більший контроль над тим, як і коли надсилаються дані
  • Ви очікуєте підтримку повідомлень від клієнта до сервера (наприклад, фільтри, терміни пошуку)
  • Ви будуєте інтерактивні UI, яким можуть знадобитися підтвердження, скасування стрімів або перемикання оновлень у реальному часі

3. Стрімінг за допомогою чанкових HTTP-відповідей

На перший погляд, чанковий HTTP стрімінг виглядає дуже схоже на SSE: ми повертаємо стрімінгову відповідь з бекенду за допомогою yield, і браузер отримує її поступово. Але насправді все зовсім інше.

SSE — це протокол, який використовує специфічний MIME тип (text/event-stream) і форматування (data: …\n\n), і він прив'язаний до API браузера EventSource (прослуховувач подій).
З іншого боку, чанковий стрімінг — це просто чистий HTTP без очікувань на рівні протоколу. Ви надсилаєте текст (або бінарні дані), і клієнт читає їх, як тільки вони надходять. Це робить його більш гнучким — але також більш ручним.

Чанковий стрімінг чудово підходить, коли:

  • Ви хочете стрімити довільний текст (JSON, CSV, журнали) без обмежень формату SSE.
  • Ви хочете використовувати API fetch() замість EventSource.
  • Вам не потрібне (або ви не хочете) постійне, довготривале з’єднання, як у WebSockets.

Це особливо корисно, коли ви хочете залишатися в межах HTTP, але не хочете прихилятися до повного протоколу, як SSE або WebSockets.

Ось приклад коду:

from flask import Flask, Response, stream_with_context  
import psycopg2  
import json  

app = Flask(__name__)  

def get_db_connection():  
 return psycopg2.connect("dbname=your_db user=your_user password=your_password host=localhost")  

@app.route('/chunked-stream')  
def chunked_stream():  
 def generate():  
 conn = get_db_connection()  
 cursor = conn.cursor()  
 batch_size = 100  
 offset = 0  

 while True:  
 cursor.execute("""  
 SELECT pk, sk, fname, lname, dob, zipcode  
 FROM people  
 ORDER BY pk  
 LIMIT %s OFFSET %s  
 """, (batch_size, offset))  

 rows = cursor.fetchall()  
 if not rows:  
 break  

 for row in rows:  
 person = {  
 "pk": row[0],  
 "sk": row[1],  
 "fname": row[2],  
 "lname": row[3],  
 "dob": str(row[4]),  
 "zipcode": row[5],  
 }  
 yield json.dumps(person) + "\n"  

 offset += batch_size  

 cursor.close()  
 conn.close()  

 return Response(stream_with_context(generate()), content_type='text/plain')

Зверніть увагу, що немає спеціального форматування даних, жодного text/event-stream, жодної магії SSE — лише JSON, розділений новими рядками.

Цього разу ми використовуємо сучасний API fetch() та низькорівневий інтерфейс ReadableStream для обробки стріму:



Це читає стрім покроково, кожен рядок розбирається як JSON і додається на сторінку, як тільки він стає доступним.

Переваги чанкового стрімінгу:

  • Гнучкість — надсилайте що завгодно (JSON, CSV, бінарні дані тощо)
  • Працює через звичайний HTTP без необхідності в бібліотеках клієнта
  • Уникає обмежень SSE або WebSockets
  • Повністю контролюється бекендом — легко реалізувати у Flask

Але має й обмеження:

  • Більш ручне (немає вбудованих механізмів для повторного підключення, форматування чи подій)
  • Потребує логіки парсингу на фронтенді
  • Не так широко зрозумілий/документований, як SSE/WebSockets
  • Може бути складним з буферизацією в проксі/CDN (можливо, потрібно використовувати заголовки, як: Cache-Control: no-transform)

Використовуйте чанковий HTTP стрімінг, коли:

  • Вам потрібен детальний контроль над форматом стріму
  • Ви хочете використовувати fetch(), але все одно потребуєте поступової доставки
  • Ви створюєте панелі управління, переглядачі стрімів або завантажувачі даних, які виграють від раннього рендерингу
  • Ви хочете використовувати fallback, який виглядає більше як HTTP, аніж WebSockets

4. Стрімінг з HTTP/2 (і gRPC)

Тут починається цікаве, і це особисто моє улюблене рішення.

HTTP/2 вводить рідну підтримку мультиплексованих, постійних з'єднань.
Це означає, що ви можете надсилати кілька відповідей через одне TCP з'єднання без блокування на рівні заголовка, яке сповільнює HTTP/1.1. І разом з цим ви отримуєте можливість стрімити відповіді чистим, сучасним способом.

Ви можете використовувати HTTP/2 безпосередньо з такими фреймворками, як Quart, або повністю інтегрувати gRPC, який побудований на основі HTTP/2 і приносить з собою:

  • Застосування схеми через Protocol Buffers
  • Сильна типізація
  • Двосторонній стрімінг
  • Висока продуктивність

Використовуйте HTTP/2 Streaming, коли ви будуєте:

  • Високопродуктивні API
  • Реальні сервіси, що потребують перевірки схеми
  • Системи, де важливі типізовані контракти та продуктивність

На відміну від SSE або чанкового HTTP стрімінгу, gRPC дає вам структурований, надійний протокол, що включає в себе вбудовану підтримку стрімінгу, серіалізації та стиснення.

Однак це вимагає підвищеної складності — вам потрібно буде визначати файли .proto, генерувати код для клієнта/сервера та (для веб-клієнтів) часто розгортати проксі, як Envoy, для мосту між gRPC і gRPC-Web.

Опція A: HTTP/2 Streaming без gRPC

Якщо ви хочете отримати переваги HTTP/2 без занурення в gRPC, ви можете використовувати Quart, асинхронний фреймворк, сумісний з Flask, який підтримує HTTP/2, коли поєднаний з hypercorn.

from quart import Quart, websocket  
import psycopg2  
import json  
import asyncio  

app = Quart(__name__)  

def get_db_connection():  
 return psycopg2.connect("dbname=your_db user=your_user password=your_password host=localhost")  

@app.websocket('/ws')  
async def ws():  
 conn = get_db_connection()  
 cursor = conn.cursor()  
 batch_size = 100  
 offset = 0  

 while True:  
 cursor.execute("""  
 SELECT pk, sk, fname, lname, dob, zipcode  
 FROM people  
 ORDER BY pk  
 LIMIT %s OFFSET %s  
 """, (batch_size, offset))  

 rows = cursor.fetchall()  
 if not rows:  
 break  

 for row in rows:  
 person = {  
 "pk": row[0],  
 "sk": row[1],  
 "fname": row[2],  
 "lname": row[3],  
 "dob": str(row[4]),  
 "zipcode": row[5],  
 }  
 await websocket.send(json.dumps(person))  
 await asyncio.sleep(0) # Yield to event loop  

 offset += batch_size  

 cursor.close()  
 conn.close()

Quart підтримує як HTTP/1.1, так і HTTP/2.
Ви можете налаштувати hypercorn за допомогою alpn_protocols=[“h2”], щоб пріоритезувати стрімінг HTTP/2.

Для того, щоб прочитати дані, вам потрібно підключитися до WebSocket кінцевої точки Quart, яку ми визначили на кінцевій точці /ws.



Навіть якщо це WebSocket, якщо ваш сервер (наприклад, Quart з Hypercorn) працює через HTTP/2, ваше рукопожаття WebSocket буде оновлене через HTTP/2 — тож ви все одно отримаєте переваги мультиплексування та постійних з'єднань, особливо коли поєднуєте це з іншими ресурсами, що надаються з того ж самого джерела.

Опція B: HTTP/2 з gRPC (двосторонній і типізований)

Спочатку потрібно створити файли .proto, щоб описати наш сервіс і стрімити рядки, використовуючи тип повернення stream.

syntax = "proto3";      
service PeopleService {    
  rpc StreamPeople (PeopleRequest) returns (stream Person);   
}      
message PeopleRequest {    
  int32 limit = 1;    
  int32 batch_size = 2;   
}      
message Person {    
  int32 pk = 1;    
  int32 sk = 2;    
  string fname = 3;    
  string lname = 4;    
  string dob = 5;    
  string zipcode = 6;   
} 

Потім нам потрібен Python сервіс, щоб надавати дані через gRPC.

class PeopleServiceServicer(PeopleService_pb2_grpc.PeopleServiceServicer):    
    def StreamPeople(self, request, context):    
        conn = get_db_connection()    
        cursor = conn.cursor()       
        offset = 0    
        total_sent = 0       

        while total_sent < request.limit:    
            cursor.execute("""    
            SELECT pk, sk, fname, lname, dob, zipcode    
            FROM people    
            ORDER BY pk    
            LIMIT %s OFFSET %s    
            """, (request.batch_size, offset))       

            rows = cursor.fetchall()    
            if not rows:    
                break       

            for row in rows:    
                yield PeopleService_pb2.Person(    
                    pk=row[0],    
                    sk=row[1],    
                    fname=row[2],    
                    lname=row[3],    
                    dob=str(row[4]),    
                    zipcode=row[5],    
                )    
                total_sent += 1       

            offset += request.batch_size       
        cursor.close()    
        conn.close() 

Веб-браузери не підтримують gRPC нативно, тому вам доведеться використовувати рішення, як-от:
- gRPC-Web + Envoy Proxy, або
- gRPC через WebTransport (експериментальний), або
- Викликати з React/Vue фронтенду за допомогою @improbable-eng/grpc-web або grpc-web з Protocol Buffers.

Але всі мови, з якими я працював, підтримують gRPC, і це робить міжмовну комунікацію та підтримку надзвичайно простою.

Переваги gRPC:

  • Надзвичайно ефективний і продуктивний
  • Підтримує типізований двосторонній стрімінг
  • gRPC має вбудовану обробку помилок, повторні спроби, дедлайни
  • Мультиплексовані запити через одне з'єднання

Недоліки gRPC:

  • Складніший у налаштуванні (особливо в браузерах)
  • Потрібні інструменти (protoc, codegen, можливо, проксі Envoy)
  • Не такий дружній до HTTP, як інші рішення
  • Труднощі з налагодженням без специфічних інструментів

Використовуйте HTTP/2 або gRPC, коли:

  • Ви будуєте мікросервіси або внутрішні API, які потребують продуктивності та структури
  • Вам потрібен реальний стрімінг з управлінням потоком
  • Ви хочете клієнт-серверні контракти і бінарно закодовані повідомлення
  • Ви працюєте з не браузерними клієнтами (або у вас є рішення для проксі браузера)

Висновок: Зробіть це швидким

Не важливо, наскільки складний ваш бекенд, користувачі оцінюватимуть ваш додаток за тим, як швидко вони побачать результати.
У цій статті ми розглянули чотири різні способи стрімити дані з бекенду на фронтенд, кожен з яких починає повертати результати відразу — навіть коли база даних все ще пагінує через рядки у фоновому режимі:

1.
Server-Sent Events (SSE): Простий, надійний і чудово підходить для односпрямованого стрімінгу.
2. WebSockets: Гнучкий, реальний час, двосторонній — ідеальний для більш інтерактивних додатків.
3. Chunked HTTP Streaming: Прямий і потужний; ідеальний, коли ви хочете контролю без накладних витрат протоколу.
4. HTTP/2 + gRPC: Високопродуктивний, типізований стрімінг для сучасних API та мікросервісів.

Кожен підхід має свої переваги і недоліки. SSE та chunked streaming простіші в реалізації та інтеграції в нативні браузерні інструменти. WebSockets відкривають можливості для повнодуплексної комунікації. gRPC пропонує структуровані контракти і двосторонній стрімінг, але вимагає більш складної налаштування.

Правильне рішення залежить від вашого стеку, терпимості до затримок, потреб фронтенду — і від того, скільки контролю ви хочете над потоком даних.

Але незалежно від того, що ви оберете, патерн залишається тим самим: пагінуйте через сторінки, стріміть по мірі отримання, рендеріть відразу. Це проста зміна, але така, що перетворює повільні, громіздкі інтерфейси у швидкі, плавні досвіди.

Дякую за прочитане — якщо це допомогло, дайте 👏 або підпишіться для нових матеріалів!

Перекладено з: Streaming API Design using Python and Javascript