-
Notifications
You must be signed in to change notification settings - Fork 88
Open
Description
In our code we are subscribing to topics and publishing concurrently, from two different asyncio tasks. In rare situations when the subscription response is delayed, and context switch happens to publishing task we are getting a KeyError exception. But in the _on_subcribe callback the exception is only logged not raised, causing the subscribe call to wait infinitely.
We are using version 2.0.1 of aiomqtt and I am not sure why the TimeoutError was not raised after the default 10sec timeout interval. According to MQTT Broker logs the connection got disconnected due to no keep alive message but aiomqtt client was stuck until we restarted the whole application.
def _on_subscribe( # noqa: PLR0913
self,
client: mqtt.Client,
userdata: Any,
mid: int,
reason_codes: list[ReasonCode],
properties: Properties | None = None,
) -> None:
"""Called when we receive a SUBACK message from the broker."""
try:
fut = self._pending_subscribes.pop(mid)
if not fut.done():
fut.set_result(reason_codes)
except KeyError:
self._logger.exception(
'Unexpected message ID "%d" in on_subscribe callback', mid
)Here is the message from our client log:
2025-03-06 05:38:24.418 - client - ERROR - Unexpected message ID "5" in on_subscribe callback
Traceback (most recent call last):
File "/home/compulab/.venvs/lib/python3.11/site-packages/aiomqtt/client.py", line 576, in _on_subscribe
fut = self._pending_subscribes.pop(mid)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 5
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels