Оскільки Python є інтерпретованою мовою, при використанні для бекенд-розробки, наприклад, у комбінації Python + Django, у порівнянні з Java + Spring, час відповіді буде трохи більшим. Проте, якщо код правильно оптимізовано, різниця не буде занадто значною. Навіть коли Django використовує багатопроцесорний режим, його здатність до обробки одночасних запитів залишатиметься значно слабшою. Проте у Python є кілька рішень для покращення можливостей обробки одночасних запитів. Наприклад, використовуючи асинхронний фреймворк FastAPI, завдяки його асинхронним можливостям, здатність обробляти I/O-інтенсивні задачі значно покращується.
FastAPI є одним з найшвидших фреймворків для Python.
FastAPI як приклад
Давайте спочатку коротко ознайомимося з тим, як використовувати FastAPI.
Приклад 1: Асинхронний I/O для мережі за замовчуванням
Інсталяція:
pip install fastapi
Простий серверний код:
# app.py
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
Запуск:
uvicorn app:app --reload
Ми можемо побачити, що в порівнянні з іншими фреймворками, інтерфейс FastAPI має лише додаткове ключове слово async
. Ключове слово async
визначає інтерфейс як асинхронний. Зі самої відповіді ми не можемо помітити різницю між FastAPI та іншими фреймворками Python. Різниця полягає в обробці одночасних запитів. Коли серверні потоки FastAPI обробляють запити за маршрутом, наприклад, http://127.0.0.1:8000/, якщо вони стикаються з мережею I/O, вони більше не чекають на її завершення, а натомість обробляють інші запити.
Коли мережеве I/O завершиться, виконання відновиться. Ця асинхронна здатність покращує обробку I/O-інтенсивних завдань.
Приклад 2: Явне мережеве асинхронне I/O
Розглянемо ще один приклад. У бізнес-логіці ініціюється явний асинхронний мережевий запит.
Для цього мережевого I/O, як і для обробки запитів до маршрутів, FastAPI також буде обробляти їх асинхронно.
# app.py
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
# Приклад асинхронного GET запиту
@app.get("/external-api")
async def call_external_api():
url = "https://leapcell.io"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Не вдалося отримати дані")
return response.json()
Якщо ви хочете, щоб I/O з базою даних було асинхронним, вам потрібно, щоб драйвер бази даних або ORM підтримували асинхронні операції.
Асинхронне I/O
Основною реалізацією асинхронності в FastAPI є асинхронне I/O
.
Ми можемо запустити сервер з можливостями асинхронної обробки безпосередньо, використовуючи асинхронне I/O без FastAPI.
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(1) # Імітація операції I/O
return web.Response(text='{"Hello": "World"}', content_type='application/json')
async def init(loop):
# Використовуємо event loop для моніторингу веб-запитів
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
# Запускаємо сервер, а event loop моніторить та обробляє веб-запити
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Сервер запущено на http://127.0.0.1:8000...')
return srv
# Явно отримуємо event loop
loop = asyncio.get_event_loop()
# Запускаємо event loop
loop.run_until_complete(init(loop))
loop.run_forever()
Коли цей приклад запускається, результат повернення за адресою
буде таким же, як у Прикладі 1.
Основний принцип реалізації асинхронного I/O полягає в використанні "корутин" та "event loop" (подієвий цикл).
Корутини
async def index(request):
await asyncio.sleep(1) # Імітація операції I/O
return web.Response(text='{"Hello": "World"}', content_type='application/json')
Функція index
визначена за допомогою async def
, що означає, що це корутина. Ключове слово await
використовується перед операцією I/O, щоб вказати потоку виконання не чекати на завершення цієї операції I/O. Виклики звичайних функцій реалізуються через стек, і функції можуть викликатись та виконуватись лише одна за одною. Однак корутина є особливим типом функції (не кооперативним потоком). Вона дозволяє потоку призупиняти виконання на мітці await
і переключатись для виконання інших завдань.
Коли операція I/O завершується, виконання продовжується.
Давайте подивимось на ефект виконання кількох корутин одночасно.
import asyncio
from datetime import datetime
async def coroutine3():
print(f"Корутина 3 почалась о {datetime.now()}")
await asyncio.sleep(1) # Імітація операції I/O
print(f"Корутина 3 завершилась о {datetime.now()}")
async def coroutine2():
print(f"Корутина 2 почалась о {datetime.now()}")
await asyncio.sleep(1) # Імітація операції I/O
print(f"Корутина 2 завершилась о {datetime.now()}")
async def coroutine1():
print(f"Корутина 1 почалась о {datetime.now()}")
await asyncio.sleep(1) # Імітація операції I/O
print(f"Корутина 1 завершилась о {datetime.now()}")
async def main():
print("Головна корутина почалась")
# Створення задач для одночасного виконання корутин
task1 = asyncio.create_task(coroutine1())
task2 = asyncio.create_task(coroutine2())
task3 = asyncio.create_task(coroutine3())
# Очікуємо завершення всіх задач
await task1
await task2
await task3
print("Головна корутина завершилась")
# Запуск головної корутини
asyncio.run(main())
Вихід:
Головна корутина почалась
Корутина 1 почалась о 2024-12-27 12:28:01.661251
Корутина 2 почалась о 2024-12-27 12:28:01.661276
Корутина 3 почалась о 2024-12-27 12:28:01.665012
Корутина 1 завершилась о 2024-12-27 12:28:02.665125
Корутина 2 завершилась о 2024-12-27 12:28:02.665120
Корутина 3 завершилась о 2024-12-27 12:28:02.665120
Головна корутина завершилась
Ми можемо побачити, що потік не виконує три задачі одну за одною.
Коли потік стикається з операцією I/O, він переключається на виконання інших задач. Після завершення операції I/O виконання продовжується. Також можна побачити, що три корутини фактично починають очікувати операцію I/O одночасно, тому кінцеві часи завершення виконання в основному однакові. Хоча тут не використовується явно цикл подій, asyncio.run
використовує його неявно.
Генератори
Корутини реалізовані через генератори.
Генератори можуть призупиняти виконання функцій, а також відновлювати його, що є характеристиками корутин.
def simple_generator():
print("Перше значення")
yield 1
print("Друге значення")
yield 2
print("Третє значення")
yield 3
# simple_generator — це генераторна функція, gen — це генератор
gen = simple_generator()
print(next(gen)) # Вивід: Перше значення \n 1n 2
print(next(gen)) # Вивід: Третє значення \n 3
При виконанні генератора з використанням next()
, коли він зустрічає yield
, він призупиняється. Коли next()
виконується знову, він продовжує виконання з того місця, де був призупинений останній раз. До Python 3.5 корутини також писалися за допомогою "анотацій" + yield
.
Починаючи з Python 3.5, використовуються async def
+ await
.
import asyncio
from datetime import datetime
@asyncio.coroutine
def my_coroutine():
print("Запуск корутини", datetime.now())
# Асинхронний виклик asyncio.sleep(1):
yield from asyncio.sleep(1)
print("Кінець корутини", datetime.now())
# Отримуємо EventLoop
loop = asyncio.get_event_loop()
# Виконуємо корутину
loop.run_until_complete(my_coroutine())
loop.close()
Можливості призупинення та відновлення виконання генераторів можна використовувати для багатьох завдань, окрім корутин. Наприклад, це можна використовувати для обчислень під час циклів і зберігання алгоритмів.
Наприклад, реалізація трикутника Паскаля (по обидва боки кожного рядка стоїть 1, а числа на інших позиціях є сумою двох чисел, що знаходяться безпосередньо над ним).
def pascal_triangle():
row = [1]
while True:
yield row
new_row = [1] # Перший елемент кожного рядка завжди 1
for i in range(1, len(row)):
new_row.append(row[i - 1] + row[i])
new_row.append(1) # Останній елемент кожного рядка завжди 1
row = new_row
# Генеруємо та виводимо перші 5 рядків трикутника Паскаля
triangle = pascal_triangle()
for _ in range(5):
print(next(triangle))
Вивід:
[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]
Цикли подій (Event Loops)
Оскільки виконання корутин може бути призупинене, коли корутина відновить своє виконання? Для цього необхідно використати цикл подій (event loop), щоб повідомити потік виконання.
# Отримуємо EventLoop
loop = asyncio.get_event_loop()
# Цикл подій виконує корутину
loop.run_until_complete(my_coroutine())
loop.close()
Цикл подій використовує технологію мультиплексування вводу/виводу, постійно циклюючи, щоб моніторити події, де корутини можуть продовжити виконання.
Коли вони можуть бути виконані, потік продовжить виконувати корутини.
Технологія мультиплексування вводу/виводу (I/O Multiplexing)
Щоб простіше зрозуміти мультиплексування вводу/виводу: я — начальник кур'єрської станції. Мені не потрібно активно запитувати кожного кур'єра про завершення їхніх завдань. Замість цього кур'єри самі приходять до мене після виконання своїх завдань. Це підвищує мої можливості обробки завдань, і я можу робити більше речей.
select
, poll
та epoll
можуть усі забезпечити мультиплексування вводу/виводу. У порівнянні з select
та poll
, epoll
має кращу продуктивність.
Linux зазвичай використовує epoll
за замовчуванням, а macOS використовує kqueue
, що схоже на epoll
і має подібну продуктивність.
Сервер сокетів із використанням циклів подій
import selectors
import socket
# Створення об'єкта selectors, еквівалентного реалізації epoll, коли запускається на Linux
sel = selectors.DefaultSelector()
# Функція обробки події прийому запиту. Приймає нові з'єднання та реєструє події для читання
def accept(sock, mask):
conn, addr = sock.accept() # Приймає з'єднання
print('Прийнято з'єднання від', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # Реєстрація події для читання
# Функція обробки події читання запиту. Читає дані запиту і надсилає HTTP-відповідь, після чого закриває з'єднання.
def read(conn, mask):
data = conn.recv(100) # Читання даних із з'єднання
print('відповідь на')
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n
"\r\n" \
"{\"Hello": \"World\"}"
conn.send(response.encode()) # Відправка відповіді
print('Закриваю з\'єднання')
sel.unregister(conn) # Відміна реєстрації події
conn.close() # Закриття з\'єднання
# Створення серверного сокету
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)
# Реєстрація події прийому з'єднання
sel.register(sock, selectors.EVENT_READ, accept)
print("Сервер працює на порту 8000...")
# Цикл подій
while True:
# Це буде блокуватися, коли немає запитів
events = sel.select() # Вибір файлових дескрипторів (подій), які готові
print("довжина подій: ", len(events))
for key, mask in events:
callback = key.data # Отримання функції обробки події
print("handler_name:", callback.__name__)
callback(key.fileobj, mask) # Виклик функції обробки події
Запустіть серверний сокет для моніторингу вказаного порту.
Якщо програма виконується на системі Linux, то за замовчуванням бібліотека selectors
використовує epoll
як свою реалізацію. Код використовує epoll
для реєстрації події прийому запиту (подія прийому). Коли надходить новий запит, epoll
спрацьовує і виконує функцію обробки події, одночасно реєструючи подію читання (подія читання), щоб обробити і відповісти на запит. Коли до сервера звертаються через веб-сторінку за адресою http://127.0.0.1:8000/, результат повернення буде таким же, як у Прикладі 1. Лог роботи сервера:
Сервер працює на порту 8000...
довжина подій: 1
handler_name: accept
Прийнято з\'єднання від ('127.0.0.1', 60941)
довжина подій: 1
handler_name: read
відповідь на
Закриваю з\'єднання
Сервер на сокетах
Прямо використовуйте Socket для запуску сервера.
Коли до сервера звертаються через браузер за адресою http://127.0.0.1:8080/ або за допомогою curl http://127.0.0.1:8080/
, він поверне {"Hello": "World"}
import socket
from datetime import datetime
# Створення TCP-сокета
server_socket = socket.socket()
# Прив'язка сокета до вказаної IP-адреси та порту
server_socket.bind(('127.0.0.1', 8001))
# Початок прослуховування вхідних з\'єднань
server_socket.listen(5)
# Цикл для прийому з\'єднань від клієнтів
while True:
print("%s Очікування з\'єднання..." % datetime.now())
client_socket, addr = server_socket.accept() # Це буде блокувати, чекаючи на з\'єднання клієнта
print(f"{datetime.now()} Отримано з\'єднання від {addr}")
# Отримання даних від клієнта
data = client_socket.recv(1024)
print(f"Отримано: {data.decode()}")
# Відправка відповіді
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello": \"World\"}"
client_socket.sendall(response.encode())
# Закриття сокета клієнта
client_socket.close()
Коли звертаються за допомогою curl http://127.0.0.1:8001/
, лог роботи сервера:
2024-12-27 12:53:36.711732 Очікування з\'єднання...
2024-12-27 12:54:30.715928 Отримано з\'єднання від ('127.0.0.1', 64361)
Отримано: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*
Підсумок
Асинхронний ввід/вивід реалізується на нижчому рівні за допомогою "корутин" (coroutines) та "циклів подій" (event loops). "Корутині" (coroutines) забезпечують те, що коли потік зустрічає позначені операції вводу/виводу під час виконання, йому не потрібно чекати завершення вводу/виводу, він може поставити виконання на паузу і дозволити потоку виконувати інші завдання без блокування. "Цикли подій" (event loops) використовують технологію мультиплексування вводу/виводу, постійно циклюючи для моніторингу подій вводу/виводу.
Коли певна подія вводу/виводу завершена, відповідний зворотний виклик (callback) активується, дозволяючи корутині (coroutine) продовжити виконання.
Leapcell: Ідеальна платформа для FastAPI та інших Python-додатків:
Нарешті, дозвольте представити ідеальну платформу для розгортання Flask/FastAPI: Leapcell.
Leapcell — це платформа хмарних обчислень, спеціально розроблена для сучасних розподілених додатків. Її модель оплати за використання гарантує відсутність витрат на простій, що означає, що користувачі платять лише за ресурси, які вони реально використовують.
Унікальні переваги Leapcell для WSGI/ASGI додатків:
1. Підтримка кількох мов
- Підтримує розробку на JavaScript, Python, Go або Rust.
2. Безкоштовне розгортання необмеженої кількості проєктів
- Плата стягується лише за використання. Безкоштовно, коли немає запитів.
3.
Неймовірна рентабельність
- Оплата за використання, без витрат на простій.
- Наприклад, $25 може підтримувати 6,94 мільйона запитів із середнім часом відповіді 60 мілісекунд.
4. Спрощений досвід для розробників
- Інтуїтивно зрозумілий інтерфейс для легкого налаштування.
- Повністю автоматизовані CI/CD пайплайни та інтеграція з GitOps.
- Метрики та журнали в реальному часі, що надають корисні відомості.
5. Беззусильне масштабування та висока продуктивність
- Автоматичне масштабування для легкого оброблення високої конкуренції.
- Відсутність витрат на обслуговування, що дозволяє розробникам зосередитись на розробці.
Дізнайтеся більше в документації!
Twitter Leapcell: https://x.com/LeapcellHQ
Перекладено з: Mastering Python Async IO with FastAPI