MQTT з Python на веб-сервері — як магія!

pic

DALL-E: Пітон з поштовим конвертом у своїх зміїних обіймах

Застереження: Стаття була написана в жовтні 2021 року, але опублікована лише в грудні 2024 року. Тому деяка інформація може бути застарілою.

MQTT — це популярний вибір протоколу, особливо для систем Інтернету речей (IoT). Він ідеально підходить для підключення обмежених в ресурсах вбудованих пристроїв до хмари завдяки своєму легкому навантаженню, моделі публікації-підписки, двосторонньому з’єднанню та підтримці безпечного з’єднання через TLS.

Загалом вбудований пристрій підключається до брокера MQTT для публікації даних — переважно зчитувань датчиків, і підписується на тему, щоб слухати команди від хмари. Клієнт користувача в веб-браузері чи додатку може надсилати ці команди безпосередньо до брокера, або сервер (від імені користувача) може надіслати їх.
Цей пост докладно описує останній метод; він є важливою частиною системи, особливо коли ви хочете, щоб клієнти, які не використовують MQTT (як додатки OAuth), мали доступ до пристрою, що підтримує IoT.

┌──────────────────────────────┐  
 │ ХМАРА │  
 ┌──────┐ │ ┌──────┐ ┌────────────┐ │  
 │ ПРИСТРІЙ│◄──┼─►│ │ │ │ │  
 │ A │ │ │ │ │ │ │  
 └──────┘ │ │ │ │ │ │  
 ┌──────┐ │ │ │ │ │ │  
 │ ПРИСТРІЙ│◄──┼─►│ MQTT |◄───►| ВЕБ-СЕРВЕР │ │  
 │ B │ │ │БРОКЕР│ │ │ │  
 └──────┘ │ │ │ │ │ │  
 ┌──────┐ │ │ │ │ │ │  
 │ ПРИСТРІЙ│◄──┼─►│ │ │ │ │  
 │ C │ │ │ │ │ │ │  
 └──────┘ │ └───▲──┘ └──────▲─────┘ │  
 └──────┼───────────────┼───────┘  
 │ │  
 ┌───▼───────────────▼───┐  
 │ КОРИСТУВАЧ │  
 ├──────┬──────┬─────────┤  
 │Мобільний│ ВЕБ │3rd PARTY│  
 │КЛІЄНТ│КЛІЄНТ│ОСНОВНИЙ APP│  
 └──────┴──────┴─────────┘

Оскільки наша хмарна програма написана на Python, всі деталі реалізації, згадані тут, відносяться до цієї мови, але такі ж підходи можуть бути застосовані й в інших мовах/стеку.
Також цей пост обмежений версіями MQTT v3.1 та v3.1.1._

Трохи про контекст

Як згадувалося раніше, ми мали подібну налаштування у нашому IoT-проекті — пристрої підключалися до MQTT брокера, а користувачі (через мобільні додатки) підключалися до нашого веб-сервера. Ми хотіли відправляти "команди" на пристрої з веб-сервера. Ці команди могли бути типу "fire-and-forget" або "request-response", виконуваними в процесі (веб-запиту) або за допомогою фонових процесів, надсиланими на одне чи кілька пристроїв одночасно.

Зважаючи на ці варіанти використання, ми змогли визначити основні утиліти, необхідні для цього:

  • проста блокуюча функція для публікації будь-якого повідомлення
  • проста блокуюча функція для підписки на теми, очікування n кількості повідомлень або до закінчення часу, повертаючи отримані повідомлення
  • проста блокуюча функція для реалізації функціональності типу запит-відповідь — це може бути обгортка для попередніх двох

Після цього ми почали шукати MQTT клієнт для підключення веб-сервера до брокера.
Ми вирішили використати paho-mqtt — Python-модуль частину проєкту Eclipse Paho, який також підтримує MQTT-бібліотеки для інших популярних мов. Модуль стабільний, багатий на функціональність і добре спроектований. Проте, нам знадобилося деякий час і кілька ітерацій, щоб ефективно з ним працювати.

Ця стаття розповідає про шлях інтеграції MQTT з нашим веб-сервером, який працює як годинник — тут ми ділимося різними ітераціями, навчанням та досвідом, який ми здобули за рік роботи над цим проєктом.

Я пишу цей пост як серію дій, подій і рішень, щоб дати повну картину.
Якщо вам потрібні деякі фрагменти коду для роботи, ви можете знайти посилання на фрагмент коду нижче.

[

Допоміжні функції та класи MQTT на Python [Фрагменти коду]

GitLab.com

gitlab.com

](https://gitlab.com/-/snippets/2183967?source=post_page-----9425dec116ef--------------------------------)

[v1] Простий обгортковий функціонал

Ми думали, що це буде просто — оскільки paho в основному бере на себе основну роботу і має блокуючі API, які нам потрібні (ну, майже, ви побачите!).

Функція публікації була простою. Вона була “обгорткою” навколо функції paho.mqtt.publish.single.

def publish_to_mqtt(  
 topic: str, payload: str, qos: int = 0, retain: bool = False  
) -> MQTTMessageInfo:  
 return publish.single(  
 ...  
 )

Аналогічно, ми знайшли paho.mqtt.subscribe.simple, але цього було недостатньо. Вона не мала жодного “тайм-ауту”, що означає, що вона блокуватиме назавжди, якщо не буде отримано повідомлення на цьому топіку. Ми вирішили скопіювати код і додати перевірку на тайм-аут (жирний шрифт нижче).
Це виконало свою задачу.

def subscribe_to_mqtt(  
 topics: Union[str, List[str]],  
 qos: int = 0,  
 msg_count: int = 1,  
 retained: bool = False,  
 timeout: int = 5,  
 userdata: dict = None  
) -> Union[MQTTMessage, List[MQTTMessage]]:  
_"""  
 змінений метод paho.mqtt.subscribe.simple(), щоб підтримувати тайм-аут  
 """_ # callback для on_connect  
 def _on_connect(..., userdata: dict, ...):  
 # викликаємо виключення, якщо не вдалося підключитися  
 ...  
 # підписуємось на топіки з userdata  
 ...  


 # callback для on_message  
 def _on_message_simple(..., userdata: dict, message: MQTTMessage):  
 # ігноруємо, якщо msg_count вже 0  
 ...  
 # ігноруємо, якщо повідомлення "залишене" (retained), а користувач його не хоче  
 ...  
 # зменшуємо msg_count і додаємо його до повідомлень у userdata  
 ...  


 # перевіряємо msg_count і qos  
 ...  
 # налаштовуємо повернення одного повідомлення, якщо msg_count == 1, або списку, якщо > 1  
 ...  
 # створюємо userdata  
 ...
User is proficient in both English and Ukrainian. 
# створити екземпляр клієнта та налаштувати зворотні виклики (callbacks) і облікові дані користувача  
 ...  
 ## підключення до брокера, відмітити час початку та запустити "поточний мережевий цикл" (threaded network loop) з клієнтом  
 ...  
```# чекати, поки не будуть отримані всі повідомлення або не вийде час  
 while True:  
 should_timeout = time.time() > start_time + timeout  
 if userdata["msg_count"] == 0 or should_timeout:  
 client.loop_stop()  
 client.disconnect()  
 break  
 time.sleep(0.2)```  
```# підняти помилку, якщо повідомлень немає або їх менше ніж потрібно  
 messages = userdata["messages"]  
 if not messages or len(messages) < msg_count:  
 raise TimeoutError(...)```  
 повернути повідомлення



Ми наївно думали, що поєднання вищезазначених двох функцій виконуватиме запит-відповідь.
Hey! How’s it going?
User is proficient in both English and Ukrainian. 
**# публікація даних на основі pub_on_connect в userdata  
 якщо "pub_on_connect" є в userdata:  
 ...** def request_response_to_mqtt(  
 topic: str,  
 payload: str,  
 response_topic: str,  
 qos: int = 0,  
 response_msg_count: int = 1,  
 response_timeout: int = 5,  
 retained: bool = False  
) -> Union[MQTTMessage, List[MQTTMessage]]:  
**# створення userdata з `pub_on_connect`  
 userdata = {  
 "pub_on_connect": {  
 "topic": topic, "payload": payload, "qos": qos  
 }  
 }**    
 # підписка з створеним userdata і  
 # очікування повідомлень на response_topic  
 return subscribe_to_mqtt(  
**userdata=userdata,**  
 ...  
 )

Повний код можна знайти у файлі mqtt_v1.pyfile in the code snippet.

>>> # простий демо  
>>> для i в діапазоні(5):  
... start=time.time(); rpc(device, "RPC.Ping"); end=time.time()  
... print(f"[{i}] Час виконання: {end - start}")  
...
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[0] Час виконання: 1.220142126083374  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[1] Час виконання: 0.8168301582336426  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[2] Час виконання: 1.0094208717346191  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[3] Час виконання: 1.0069520473480225  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[4] Час виконання: 1.003695011138916

І це все. Ми запустили систему, оскільки вона пройшла базові тести, і цього було достатньо, поки наші користувачі не почали скаржитись на затримки — більше про це в наступному розділі.

[v2] Singleton, який повторно використовує з'єднання

Після того, як ми запустили систему в бета-версії з 20–25 пристроями та їх користувачами, ми отримали зворотний зв'язок від них.
Вони помітили значну затримку (більше 3 секунд), коли виконували команди для керування своїми пристроями.

Після проведення базового профілювання стало зрозуміло, що основна причина затримки — це TLS-ручне рукопотискання (handshake) та встановлення з'єднання з брокером. Кожного разу, коли сервер взаємодіє з брокером, йому потрібно встановити нове з'єднання TCP+TLS. Ми вирішили оптимізувати це так само, як це роблять з'єднання з базами даних у веб-серверах — зберігаючи з'єднання та повторно його використовуючи. Оскільки більшість веб-серверів (принаймні WSGI) повторно використовують процеси, створені для обробки веб-запитів, можна мати MQTT-з'єднання, яке буде жити довше за цикл запит-відповідь HTTP.

Щоб досягти цього, ми вирішили використати клас Singleton, який утримує клієнта та його з'єднання. Цей клас потім надає ці утиліти і в фоновому режимі використовує збережене з'єднання. Основна робота полягала в реорганізації наявного коду попередньої версії для підтримки цього патерну.
Клас singleton (ми назвали його MQTTAgent) виглядав приблизно так:

class MQTTAgent:  
 _instance: "MQTTAgent" = None  
**_client: Optional[MQTTClient] = None**  

 @staticmethod  
 def get_instance() -> "MQTTAgent":  
_""" Статичний метод доступу. """  
 ..._ def __init__(self):  
_""" Практично приватний конструктор. """  
 ..._    

 def clean_up(self):  
_""" Очищення мережевого з'єднання та клієнта. """  
 ..._    

 def _connect_async(self):  
_""" Налаштування клієнта та з'єднання асинхронно. """  
 ..._    

 def connect(self, timeout: int = 5):  
_""" Синхронне з'єднання з тайм-аутом. """  
 ..._    

 def disconnect(self):  
_""" Синхронне відключення від брокера. """  
 ..._    

 def __on_connect(self,  
 client: MQTTClient,  
 userdata: dict,  
 flags: int,  
 rc: int):  
_""" Зворотний виклик, який викликається, коли клієнт підключається.
""  
 ..._    

 def __on_message(self,  
 client: MQTTClient,  
 userdata: dict,  
 message: MQTTMessage):  
_""" Зворотний виклик, який викликається, коли отримано повідомлення. """  
 ..._    

**def is_connected(self) -> bool:**  
_""" Перевірити, чи підключено до брокера. """  
 ..._    

**def publish(self,  
 topic: str,  
 payload: str,  
 qos: int = 1,  
 retain: bool = False) -> MQTTMessageInfo:**    
_""" Синхронно публікувати повідомлення. """  
 ..._    

**def subscribe(self,  
 topics: _Topics,  
 qos: int = 1,  
 retained: bool = False,  
 msg_count: int = 1,  
 timeout: int = 5) -> _Messages:**    
 _..._    

**def request_response(self,  
 topic: str,  
 payload: str,  
 response_topics: _Topics,  
 qos: int = 1,  
 response_msg_count: int = 1,  
 response_timeout: int = 5,  
 retained=False) -> _Messages:**    
 _..._    


## зручний глобальний екземпляр, який можна  
## безпосередньо імпортувати та використовувати.
**mqtt_agent = MQTTAgent.get_instance()**

Повний код можна знайти у файлі mqtt_v2.pyfile in the code snippet.

>>> # простий демо  
>>> для i в діапазоні(5):  
... start=time.time(); device.rpc("RPC.Ping"); end=time.time()  
... print(f"[{i}] Час виконання: {end - start}")  
... **З'єднання прийнято.[0] клієнт mqtt підключений до mqtt-брокера:1883.**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[0] Час виконання: 1.0129196643829346**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[1] Час виконання: 0.10076355934143066**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[2] Час виконання: 0.10155487060546875**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[3] Час виконання: 0.10111713409423828**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[4] Час виконання: 0.1007685661315918**

Модуль MQTT тепер чистий, елегантний у використанні та ефективний.
Затримка покращилась в 10 разів (перевірте вихідний результат демо-коду вище). Користувачі помітили різницю та оцінили це. Перемога на день!

[v3] Забезпечення безпеки при роботі з потоками

Коли ми отримали вимогу зробити одночасні виклики команд до кількох пристроїв, у нас було два варіанти — виконувати їх в різних потоках або використовувати систему розподіленої черги завдань (ми вже мали налаштований Celery на нашому сервері).

Перший варіант здавався підходящим рішенням, але вимагав би зробити рішення v2 “безпечним для потоків” (thread-safe), оскільки mqtt_agent.subscribe (і, відповідно, mqtt_agent.request_response) не були спроектовані з урахуванням одночасного виконання. Замість того, щоб робити це, ми ліниво вибрали підхід “черги завдань”. Він би планував одночасні виклики команд та передавав їх розподіленим фоновим працівникам для виконання. Це виконувало свою задачу, але оскільки завдання були “в черзі” та “розподілено виконувались”, продуктивність була непостійною.
Це забирало більше часу, ніж очікувалося — коли черга велика або працівники зайняті. Це часто траплялося, коли кількість пристроїв на користувача зростала. Здавалося, що іншого варіанту, окрім як зробити MQTTAgent безпечним для потоків (thread-safe), в нас немає.

У v2, userdata (яке є атрибутом MQTTClient і передається в обробник on_message) зберігає “стан підписки”; ключ до того, щоб зробити MQTTAgent безпечним для потоків, полягає в тому, щоб зробити його безпечним для потоків. Ідея полягає в тому, щоб мати “сховище” всіх “станів підписки”, і коли повідомлення отримано, перевіряється це сховище. Якщо будь-яка “підписка” запитала отримання повідомлень з цієї теми, то ми додаємо це повідомлення до цього “стану підписки”. Також потрібно розумно обробляти підписки на теми — якщо два потоки підписуються на одну і ту ж тему, то внутрішньо повинна бути підписка лише один раз, а обидва потоки отримують одне і те ж повідомлення.

По-перше, ми створили клас _MQTTSubscription для зберігання “стану підписки” замість використання dict.
Також ми перемістили логіку додавання нових повідомлень всередину класу.

**class _MQTTSubscription:**  
**def __init__(self,  
 topics: _Topics,  
 msg_count: int = 1,  
 qos: int = 1,  
 retained: bool = False):**    
 self.thread_id: int = current_thread().ident  
**self.topics: List[str] = topics if isinstance(topics, list) else [topics]**  
**self.messages: _Messages = list() if msg_count > 1 else None**  
**self.pending_msg_count: int = msg_count**  
 self._msg_count: int = msg_count  
 self.qos: int = qos  
 self.retained: bool = retained  

**def add_message(self, message: MQTTMessage):  
_""" Обробити нове повідомлення для цієї підписки. """_**    
# ігнорувати - pending_msg_count <= 0 або повідомлення збережене, але запит не на збережене  
 ...  
 # зменшити pending_msg_count та додати до повідомлень  
 ...

Після цього ми додали атрибути “сховище підписок” та “лічильник посилань на теми” до MQTTAgent.
“Сховище підписок” (subscription store) — це набір всіх екземплярів підписок, які очікують на повідомлення. “Лічильник посилань на теми” (topic reference count) — це відображення (мапа), де ключем є тема, а значенням — кількість екземплярів підписок, підписаних на цю тему.

class MQTTAgent:  
 ...  
**_subscriptions: Set[_MQTTSubscription] = set()  
 _topics_count: Dict[str, int] = dict()**    

 ...  

 def _disconnect(self)  
 ...  
**# скинути _subscriptions, _topics_count  
 self._subscriptions = set()  
 self._topics_count = dict()**    
 ...

Коли запитується нова “підписка”, створюється екземпляр subscription і додається до набору _subscriptions, лічильник посилань на теми збільшується для цих тем, а _client підписується на теми, на які ще не підписанося (з лічильником посилань як 1). Зворотний процес відбувається, коли підписка має бути “закрита або скасована” — лічильник посилань зменшується, а _client відписується від тем, на які більше не є посилання (з лічильником посилань як 0).
У зворотньому виклику __on_message ми перевіряємо всі _subscriptions і викликаємо метод subscription.add_message для тих, де тема підписки та тема повідомлення співпадають.

class MQTTAgent:  
 ...  

 def __on_message(self, ...):  
_..._  
**# ітерація по підписках і перевірка на відповідність теми  
 for subscription in tuple(self._subscriptions):  
 ...  
 # перевірка чи є співпадіння теми для цієї підписки  
 ...  
 # додавання повідомлення до підписки  
 subscription.add_message(message=message)**    

 ...  
 ...**def _subscribe(self, subscription: _MQTTSubscription):  
_""" Підписка на основі підписки. """_  
# додавання до _subscriptions  
 ...  
 # збільшення лічильника тем — створення списку тем для підписки (ті, на які ще не підписанося)  
 ...  
 # підписка на ці теми  
 ...**    

**def _unsubscribe(self, subscription: _MQTTSubscription):  
_""" Відписка на основі підписки. """_  
# видалення підписки з _subscriptions  
 ...
## зменшення лічильника тем — створення списку тем для відписки (тих, які більше не потрібні)  
 ...  
 # відписка від цих тем  
 ...**    

 def subscribe(self, ...) -> _Messages:  
 ...  
# підключення до клієнта  
 ...  
 # перевірка msg_count та qos  
 ...
**# створення об'єкта підписки  
 subscription = _MQTTSubscription(  
 topics=topics,  
 msg_count=msg_count,  
 qos=qos,  
 retained=retained  
 )  
 # підписка  
 self._subscribe(subscription=subscription)**    
 # чекаємо, поки всі повідомлення будуть отримані або тайм-аут  
 start_time = time.time()  
 while True:  
 should_timeout = time.time() > start_time + timeout  
 if **subscription.pending_msg_count==0** or should_timeout:  
**# відписка  
 self._unsubscribe(subscription=subscription)**    
 break  
 # затримка  
 time.sleep(0.1)  
**return subscription.messages**  
...

Повний код можна знайти в файлі mqtt_v3.pyфайл у фрагменті коду.

>>> # простий демонстраційний приклад  
>>> print(f"Викликаємо RPC для {len(devices)} пристроїв одночасно.")  
**Викликаємо RPC для 5 пристроїв одночасно.**  
>>> **with ThreadPoolExecutor(len(devices)) as executor:**  
... start = time.time()  
... # переконатися, що mqtt_agent підключений  
... mqtt_agent.connect()   
...
tuple(executor.map(  
... lambda device: rpc(device, "RPC.Ping"),  
... devices  
... ))  
... end = time.time()  
... print(f"Загальний час: {end - start}")  
...   
Connection Accepted.[0] mqtt client connected to mqtt-broker:1883.  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217424803584] Time taken: 0.14078974723815918  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217441588992] Time taken: 0.1967012882232666  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217433196288] Time taken: 0.19601941108703613  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217416410880] Time taken: 0.19498801231384277  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217449981696] Time taken: 0.6972782611846924  
**Загальний час: 1.314225673675537**

Тепер було можливо викликати функціональність MQTTAgent з кількох потоків одночасно безпечно.
Ми почали викликати команди для кількох пристроїв одночасно за допомогою Python ThreadPoolExecutor (як показано в наведеному вище демонстраційному коді), замість того щоб розподіляти їх між фоновими працівниками. Це не тільки зробило процес швидшим та ефективнішим, але також дозволило масштабувати рішення для одночасних викликів до більшої кількості пристроїв (тестувалося з 100 пристроями одночасно).

Що далі?

У нас є кілька ідей, які можуть вивести інструмент на новий рівень. Деякі з них:

  • API типу “Iter/Stream” для отримання повідомлень по підписці безпосередньо, як тільки вони надходять. Наразі всі повідомлення “збираються” і повертаються пізніше.
  • API типу “Future/Promise” для надання користувачам вибору між асинхронними та синхронними варіантами. Наразі вся функціональність блокує потік.
  • Підтримка кількох брокерів.

Поточна реалізація цілком задовольняє наші потреби, тому ми, ймовірно, не будемо працювати над цим. Але якщо ми отримаємо хороші відгуки про цей пост, ми можемо подумати над доопрацюванням та випуском цього рішення як відкритий пакет для Python.
Хоча Eclipse Paho є стабільним і надійним, йому потрібен "шар елегантності" зверху, щоб працювати бездоганно, як це робить пакет requests для urllib3. Цей пакет може бути саме таким — MQTT для людей™!

Перекладено з: MQTT with Python Web Server like a Charm

Leave a Reply

Your email address will not be published. Required fields are marked *