MQTT з Python на веб-сервері — як по маслу

pic

DALL-E: Пітон змія з конвертом поштового повідомлення в ньому

Попередження: Стаття була написана в жовтні 2021 року, але опублікована лише в грудні 2024 року. Тому деяка інформація може бути застарілою.

MQTT — популярний протокол, особливо для IoT-систем. Він ідеально підходить для підключення обмежених вбудованих пристроїв до хмари завдяки своїй легкості, моделі публікації-підписки, двосторонньому з'єднанню та підтримці безпечного з'єднання через TLS.

Загалом, вбудований пристрій підключається до MQTT-брокера для публікації даних — в основному показників датчиків — та підпису на тему для прослуховування команд з хмари. Клієнт веб-сайту або додатка користувача може надсилати ці команди безпосередньо до брокера, або сервер (від імені користувача) може їх надіслати.
Цей пост детально розглядає останній метод; він є важливою частиною системи, особливо коли потрібно, щоб клієнти, які не підтримують MQTT (наприклад, додаток OAuth), могли отримати доступ до пристрою, оснащеного IoT.

┌──────────────────────────────┐  
 │ ХМАРА │  
 ┌──────┐ │ ┌──────┐ ┌────────────┐ │  
 │ПРИСТРІЙ│◄──┼─►│ │ │ │ │  
 │ A │ │ │ │ │ │ │  
 └──────┘ │ │ │ │ │ │  
 ┌──────┐ │ │ │ │ │ │  
 │ПРИСТРІЙ│◄──┼─►│ MQTT |◄───►| ВЕБ-СЕРВЕР │ │  
 │ B │ │ │БРОКЕР│ │ │ │  
 └──────┘ │ │ │ │ │ │  
 ┌──────┐ │ │ │ │ │ │  
 │ПРИСТРІЙ│◄──┼─►│ │ │ │ │  
 │ C │ │ │ │ │ │ │  
 └──────┘ │ └───▲──┘ └──────▲─────┘ │  
 └──────┼───────────────┼───────┘  
 │ │  
 ┌───▼───────────────▼───┐  
 │ КОРИСТУВАЧ │  
 ├──────┬──────┬─────────┤  
 │МОБІЛЬНИЙ│ ВЕБ │3RD PARTY│  
 │КЛІЄНТ│КЛІЄНТ│OAUTH APP│  
 └──────┴──────┴─────────┘

Оскільки наша хмарна програма написана на Python, всі згадані тут деталі реалізації стосуються саме її, але ті ж самі підходи можна застосувати і в інших мовах/стеках.
Також цей пост обмежений версіями MQTT v3.1 та v3.1.1._

Трохи контексту

Як згадувалося раніше, у нашому IoT-проекті була подібна конфігурація — пристрої підключалися до брокера MQTT, а користувачі (через мобільні додатки) підключалися до нашого веб-сервера. Ми хотіли надсилати “команди” на пристрої з веб-сервера. Команди могли бути або типу "fire-and-forget", або типу запит-відповідь, виконуватись в процесі веб-запиту або фоновими працівниками, надсилатись на один або кілька пристроїв одночасно.

Беручи до уваги ці сценарії використання, ми змогли записати основні утиліти, необхідні для цього:

  • проста блокуюча функція для публікації будь-якого повідомлення
  • проста блокуюча функція для підписки на теми, очікування n кількості повідомлень або до тайм-ауту, повертаючи отримані повідомлення
  • проста блокуюча функція для досягнення функціональності типу запит-відповідь — може бути обгорткою для двох попередніх

Після цього ми почали шукати MQTT-клієнта для підключення веб-сервера до брокера.
Ми вирішили вибрати paho-mqtt — Python-модуль, який є частиною проєкту Eclipse Paho, що також підтримує MQTT-бібліотеки для інших популярних мов. Цей модуль стабільний, багатий на функції та добре спроектований. Проте нам знадобилося деякий час та кілька ітерацій, щоб правильно з ним працювати.

Ця стаття розповідає про наш шлях до того, як MQTT почав працювати з нашим веб-сервером як годинник — вона представляє різні ітерації, уроки та корисні поради, які ми отримали протягом року роботи над цим проєктом.

Я пишу цей пост у вигляді серії дій, подій і рішень, щоб дати повну картину.
Якщо ви хочете попрацювати з кодом, ось посилання на фрагменти коду нижче.

[

Допоміжні функції та класи для MQTT на Python [Snippets]

GitLab.com

gitlab.com

](https://gitlab.com/-/snippets/2183967?source=post_page-----9425dec116ef--------------------------------)

[v1] Прості функції-обгортки

Ми думали, що це буде легко — адже paho в основному виконував всю важку роботу і мав необхідні блокуючі API (ну, майже, ви побачите!).

Функція публікації була простою. Це була «обгортка» навколо функції paho.mqtt.publish.single.

def publish_to_mqtt(  
 topic: str, payload: str, qos: int = 0, retain: bool = False  
) -> MQTTMessageInfo:  
 return publish.single(  
 ...  
 )

Аналогічно, ми знайшли paho.mqtt.subscribe.simple, але цього було недостатньо. Вона не мала жодного «тайм-ауту», що означало, що вона буде блокуватися нескінченно, якщо не буде отримано повідомлення на цій темі. Ми вирішили скопіювати код цієї функції та додати перевірку на тайм-аут (жирним шрифтом нижче).
Це вирішило проблему.

def subscribe_to_mqtt(  
 topics: Union[str, List[str]],  
 qos: int = 0,  
 msg_count: int = 1,  
 retained: bool = False,  
 timeout: int = 5,  
 userdata: dict = None  
) -> Union[MQTTMessage, List[MQTTMessage]]:  
_"""  
 змінена методика paho.mqtt.subscribe.simple() для підтримки тайм-ауту  
 """_ # callback для on_connect  
 def _on_connect(..., userdata: dict, ...):  
 # викликати виняток, якщо не вдалося підключитись  
 ...  
 # підписатись на теми з userdata  
 ...  


 # callback для on_message  
 def _on_message_simple(..., userdata: dict, message: MQTTMessage):  
 # ігнорувати, якщо msg_count вже 0  
 ...  
 # ігнорувати, якщо повідомлення «залишено» і користувач цього не хоче  
 ...  
 # зменшити msg_count і додати/додати його до повідомлень у userdata  
 ...  


 # перевірити msg_count і qos  
 ...  
 # Налаштуємо себе для повернення одного повідомлення, якщо msg_count == 1, або списку, якщо > 1  
 ...  
 # створити userdata  
 ...
# створення екземпляра клієнта, налаштування callback і користувацьких даних  
 ...  
 # підключення до брокера, визначення start_time і запуск "мережевого циклу в окремому потоці" з клієнтом  
 ...  
**# чекати, поки не отримано всі повідомлення або досягнуто тайм-ауту  
 while True:  
 should_timeout = time.time() > start_time + timeout  
 if userdata["msg_count"] == 0 or should_timeout:  
 client.loop_stop()  
 client.disconnect()  
 break  
 time.sleep(0.2)**    
**# викликати помилку, якщо не знайдено або знайдено менше повідомлень  
 messages = userdata["messages"]  
 if not messages or len(messages) < msg_count:  
 raise TimeoutError(...)** return messages

Ми наївно думали, що поєднання цих двох функцій вирішить задачу для запиту-відповіді.
Але це не спрацювало так добре, оскільки екземпляри клієнтів створювались і підключались двічі, що робило все дуже повільним і практично марним — адже більшу частину часу наші пристрої надсилали відповіді ще до того, як сервер підписувався.

Тому замість використання функції publish_to_mqtt, ми змінили вище описану функцію subscribe_to_mqtt так, щоб “аргументи публікації” передавались через userdata. Коли клієнт підключається, він публікує ці дані після підписки. Ця оптимізація гарантує, що публікація відбудеться в тому ж з'єднанні, що і підписка, а також, що підписка виконується перед публікацією, щоб відповідь не була втрачена.

def subscribe_to_mqtt(...):  
 ...  
# callback для on_connect  
 def _on_connect(client: MQTTClient, userdata: dict, flags: int, rc: int):  
 ...  
 # підписка на теми в userdata  
 ...
**# публікація даних на основі pub_on_connect в userdata  
 якщо "pub_on_connect" є в userdata:  
 ...** def request_response_to_mqtt(  
 topic: str,  
 payload: str,  
 response_topic: str,  
 qos: int = 0,  
 response_msg_count: int = 1,  
 response_timeout: int = 5,  
 retained: bool = False  
) -> Union[MQTTMessage, List[MQTTMessage]]:  
**# створити userdata з `pub_on_connect`  
 userdata = {  
 "pub_on_connect": {  
 "topic": topic, "payload": payload, "qos": qos  
 }  
 }**    
 # підписатися з створеним userdata і  
 # чекати на повідомлення на response_topic  
 return subscribe_to_mqtt(  
**userdata=userdata,**  
 ...  
 )

Повний код для цього можна знайти в файлі mqtt_v1.py в кодовому фрагменті.

>>> # простий демо приклад  
>>> for i in range(5):  
... start=time.time(); rpc(device, "RPC.Ping"); end=time.time()  
... print(f"[{i}] Час виконання: {end - start}")  
...
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[0] Час виконання: 1.220142126083374  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[1] Час виконання: 0.8168301582336426  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[2] Час виконання: 1.0094208717346191  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[3] Час виконання: 1.0069520473480225  
**RPC: RPC.Ping відповіді: b'{"result": "pong"}'**  
[4] Час виконання: 1.003695011138916

І ось і все. Система працювала і проходила базові тести, цього було достатньо, поки наші користувачі не почали скаржитися на затримки — більше про це в наступному розділі.

[v2] Сінглтон, що повторно використовує з'єднання

Після того, як ми запустили систему в бета-версії з близько 20–25 пристроями та їхніми користувачами, ми отримали відгуки від них.
Вони зазнавали помітної затримки (більше 3 секунд), коли виконували команди для керування своїми пристроями.

Після проведення базового профілювання стало зрозуміло, що основна причина затримки — це процес TLS-рукопожаття та встановлення з'єднання з брокером. Кожного разу, коли сервер взаємодіє з брокером, йому потрібно встановлювати нове з'єднання TCP+TLS. Ми вирішили оптимізувати це так само, як це зазвичай роблять з’єднання з базами даних у веб-серверах — зберігаючи з'єднання і повторно його використовуючи. Оскільки більшість вебсерверів (принаймні WSGI-сервери) повторно використовують процеси, створені для обробки веб-запитів, є можливість мати MQTT-з'єднання, яке може пережити цикл HTTP-запит-відповідь.

Щоб це реалізувати, ми вирішили використати клас Singleton, який зберігатиме клієнта та його з'єднання. Цей клас потім надасть відповідні утиліти і за лаштунками використовуватиме збережене з'єднання. Більшість роботи полягала в реорганізації наявного коду з попередньої версії для підтримки цього шаблону.
Клас синглтона (ми назвали його MQTTAgent) виглядав приблизно так:

class MQTTAgent:  
 _instance: "MQTTAgent" = None  
**_client: Optional[MQTTClient] = None**  

 @staticmethod  
 def get_instance() -> "MQTTAgent":  
_""" Статичний метод доступу. """  
 ..._ def __init__(self):  
_""" Віртуально приватний конструктор. """  
 ..._    

 def clean_up(self):  
_""" Очищення мережевого з'єднання та клієнта. """  
 ..._    

 def _connect_async(self):  
_""" Налаштування клієнта та з'єднання асинхронно. """  
 ..._    

 def connect(self, timeout: int = 5):  
_""" Підключення синхронно з тайм-аутом. """  
 ..._    

 def disconnect(self):  
_""" Відключення від брокера синхронно. """  
 ..._    

 def __on_connect(self,  
 client: MQTTClient,  
 userdata: dict,  
 flags: int,  
 rc: int):  
_""" Колбек, який викликається, коли клієнт підключається.
""  
 ..._    

 def __on_message(self,  
 client: MQTTClient,  
 userdata: dict,  
 message: MQTTMessage):  
_""" Колбек, який викликається, коли отримано повідомлення. """  
 ..._    

**def is_connected(self) -> bool:**  
_""" Перевірка, чи підключено до брокера. """  
 ..._    

**def publish(self,  
 topic: str,  
 payload: str,  
 qos: int = 1,  
 retain: bool = False) -> MQTTMessageInfo:**    
_""" Публікація повідомлення синхронно. """  
 ..._    

**def subscribe(self,  
 topics: _Topics,  
 qos: int = 1,  
 retained: bool = False,  
 msg_count: int = 1,  
 timeout: int = 5) -> _Messages:**    
 _..._    

**def request_response(self,  
 topic: str,  
 payload: str,  
 response_topics: _Topics,  
 qos: int = 1,  
 response_msg_count: int = 1,  
 response_timeout: int = 5,  
 retained=False) -> _Messages:**    
 _..._    


## зручний глобальний екземпляр, який можна  
## безпосередньо імпортувати та використовувати.
**mqtt_agent = MQTTAgent.get_instance()**

Повний код цього можна знайти у файлі mqtt_v2.pyфайл у кодовому фрагменті.

>>> # простий демонстраційний приклад  
>>> for i in range(5):  
... start=time.time(); device.rpc("RPC.Ping"); end=time.time()  
... print(f"[{i}] Час виконання: {end - start}")  
... **Підключення прийнято.[0] mqtt клієнт підключено до mqtt-брокера:1883.**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[0] Час виконання: 1.0129196643829346**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[1] Час виконання: 0.10076355934143066**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[2] Час виконання: 0.10155487060546875**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[3] Час виконання: 0.10111713409423828**  
RPC: RPC.Ping відповіді: b'{"result": "pong"}'  
**[4] Час виконання: 0.1007685661315918**

Модуль MQTT тепер чистий, елегантний у використанні та ефективний.
Затримка зменшилась в 10 разів (перевірте результат виведення демонстраційного коду вище). Користувачі помітили різницю та оцінили це. Перемога на сьогодні!

[v3] Забезпечення безпеки в багатопоточному середовищі

Коли ми отримали вимогу забезпечити одночасне виконання команд до кількох пристроїв, у нас було два варіанти — виконати їх у різних потоках або використати систему розподіленої черги завдань (ми вже мали налаштований Celery на сервері).

Перший варіант здавався найкращим рішенням, але вимагав зробити рішення v2 "безпечним для потоків", оскільки mqtt_agent.subscribe (а отже, і mqtt_agent.request_response) не були спроектовані з урахуванням конкуренції. Замість того, щоб робити це, ми обрали підхід з використанням "черги завдань". Вона б планувала одночасні виклики команд і передавала їх на виконання розподіленим фоновим працівникам. Це виконало свою задачу, але через те, що завдання "ставили в чергу" і "розподілено виконувались", продуктивність була нестабільною.
Це займало більше часу, ніж очікувалося — коли черга довга або працівники зайняті. Таке часто траплялося, коли кількість пристроїв на одного користувача збільшувалась. Здавалося, що в нас немає іншого вибору, окрім як зробити MQTTAgent безпечним для потоків.

У v2, userdata (яке встановлюється як атрибут MQTTClient і передається в обробник on_message) зберігає "стан підписки"; ключ до того, щоб зробити MQTTAgent безпечним для потоків, — це зробити його безпечним для потоків. Ідея полягає в тому, щоб мати "сховище" всіх "станів підписок", і коли повідомлення отримано, перевіряти це сховище. Якщо будь-яка "підписка" запитала отримання повідомлень з цієї теми, ми додаємо це повідомлення до відповідного "стану підписки". Нам також потрібно розумно обробляти підписки на теми — якщо два потоки підписуються на одну і ту саму тему, то на внутрішньому рівні підписка повинна бути лише одна, а обидва потоки отримуватимуть те саме повідомлення.

По-перше, ми створили клас _MQTTSubscription для зберігання "стану підписки" замість використання dict.
Також ми перемістили логіку додавання нових повідомлень всередину класу.

**class _MQTTSubscription:**  
**def __init__(self,  
 topics: _Topics,  
 msg_count: int = 1,  
 qos: int = 1,  
 retained: bool = False):**    
 self.thread_id: int = current_thread().ident  
**self.topics: List[str] = topics if isinstance(topics, list) else [topics]**  
**self.messages: _Messages = list() if msg_count > 1 else None**  
**self.pending_msg_count: int = msg_count**  
 self._msg_count: int = msg_count  
 self.qos: int = qos  
 self.retained: bool = retained  

**def add_message(self, message: MQTTMessage):  
_""" Обробка нового повідомлення для цієї підписки. """_**    
# ігноруємо - pending_msg_count <= 0 або повідомлення збережене, але запит не для збережених  
 ...  
 # зменшуємо pending_msg_count і додаємо в messages  
 ...

Після цього ми додали атрибути "сховища підписок" та "лічильника тем" до MQTTAgent.
"Сховище підписок" — це набір всіх екземплярів підписок, які чекають на повідомлення. "Лічильник посилань на тему" — це відображення теми та кількості екземплярів підписок, які підписані на неї.

class MQTTAgent:  
 ...  
**_subscriptions: Set[_MQTTSubscription] = set()  
 _topics_count: Dict[str, int] = dict()**    

 ...  

 def _disconnect(self)  
 ...  
**# скидаємо _subscriptions, _topic_count  
 self._subscriptions = set()  
 self._topics_count = dict()**    
 ...

Коли запитується нова "підписка", створюється екземпляр subscription і додається в набір _subscriptions, а "лічильник посилань на тему" збільшується для цих тем. _client підписується на теми, на які ще не підписано (з лічильником посилань рівним 1). Коли підписка повинна бути "закрита або скасована", лічильник посилань зменшується, і _client скасовує підписку на теми, на які більше не існує посилань (з лічильником посилань рівним 0).
У зворотньому виклику __on_message, ми перевіряємо всі _subscriptions і викликаємо subscription.add_message для тих, у яких тема підписки та тема повідомлення співпадають.

class MQTTAgent:  
 ...  

 def __on_message(self, ...):  
_..._  
**# ітеруємо по підписках і шукаємо ті, що мають цю тему  
 for subscription in tuple(self._subscriptions):  
 ...  
 # перевіряємо, чи співпадає хоча б одна тема з цією підпискою  
 ...  
 # додаємо повідомлення до підписки  
 subscription.add_message(message=message)**    

 ...  
 ...**def _subscribe(self, subscription: _MQTTSubscription):  
_""" Підписатися на основі підписки. """_  
# додаємо до _subscriptions  
 ...  
 # збільшуємо лічильник тем - створюємо список тем для підписки (ті, що ще не підписані)  
 ...  
 # підписуємось на ці теми  
 ...**    

**def _unsubscribe(self, subscription: _MQTTSubscription):  
_""" Відписатися на основі підписки. """_  
# видаляємо підписку з _subscriptions  
 ...
# зменшуємо лічильник тем - створюємо список тем для відписки (ті, що більше не потрібні)  
 ...  
 # відписуємось від цих тем  
 ...**    

 def subscribe(self, ...) -> _Messages:  
 ...  
# підключення до клієнта  
 ...  
 # перевірка msg_count і qos  
 ...
**# створюємо об'єкт підписки  
 subscription = _MQTTSubscription(  
 topics=topics,  
 msg_count=msg_count,  
 qos=qos,  
 retained=retained  
 )  
 # підписуємось  
 self._subscribe(subscription=subscription)**    
 # чекаємо, поки всі повідомлення не будуть отримані або не спливе час очікування  
 start_time = time.time()  
 while True:  
 should_timeout = time.time() > start_time + timeout  
 if **subscription.pending_msg_count==0** or should_timeout:  
**# відписуємось  
 self._unsubscribe(subscription=subscription)**    
 break  
 # затримка  
 time.sleep(0.1)  
**return subscription.messages**  
...

Повний код можна знайти в файлі mqtt_v3.pyфайл в кодовому фрагменті.

>>> # простий демонстраційний код  
>>> print(f"Викликаємо RPC для {len(devices)} пристроїв одночасно.")  
**Викликаємо RPC для 5 пристроїв одночасно.**  
>>> **with ThreadPoolExecutor(len(devices)) as executor:**  
... start = time.time()  
... # переконуємось, що mqtt_agent підключений  
... mqtt_agent.connect()   
...
tuple(executor.map(  
... lambda device: rpc(device, "RPC.Ping"),  
... devices  
... ))  
... end = time.time()  
... print(f"Загальний час: {end - start}")  
...   
Connection Accepted.[0] mqtt client connected to mqtt-broker:1883.  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217424803584] Час виконання: 0.14078974723815918  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217441588992] Час виконання: 0.1967012882232666  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217433196288] Час виконання: 0.19601941108703613  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217416410880] Час виконання: 0.19498801231384277  
RPC: RPC.Ping responses: b'{"result": "pong"}'  
[140217449981696] Час виконання: 0.6972782611846924  
**Загальний час: 1.314225673675537**

Тепер було можливо безпечно викликати функціональність MQTTAgent з кількох потоків.
Ми почали викликати команди до кількох пристроїв одночасно за допомогою Python ThreadPoolExecutor (як показано в прикладі вище) замість розподілу завдань між фоновими працівниками. Це не лише зробило систему швидшою та ефективнішою, але й дозволило масштабувати її для одночасних викликів на більшу кількість пристроїв (тестувалось з 100 пристроями одночасно).

Що далі?

У нас є кілька ідей, які можуть підняти цей інструмент на новий рівень. Деякі з них такі:

  • API типу "Iter/Stream" для отримання повідомлень по підписці в міру їх надходження. Наразі всі повідомлення "збираються", а потім повертаються.
  • API типу "Future/Promise" для надання як асинхронних, так і синхронних варіантів на рівні користувача. Наразі вся функціональність є блокуючою.
  • Підтримка кількох брокерів.

Поточна реалізація достатня для наших випадків використання, тому, ймовірно, ми не будемо працювати над цим далі. Але якщо ми отримаємо хороші відгуки на цей пост, ми можемо подумати про розробку та випуск цього як відкритого Python-пакету.
Хоча Eclipse Paho є надійним та стабільним, йому потрібен "шар елегантності" поверх нього, щоб він працював як годинник, подібно до того, як пакет requests працює з urllib3. Цей пакет може бути саме тим — MQTT для людей™!

Перекладено з: MQTT with Python Web Server like a Charm

Leave a Reply

Your email address will not be published. Required fields are marked *