Skip to content

Connection drop not caught #381

@bj00rn

Description

@bj00rn

Thanks for your effort on this project!

Seems that connection drop is not caught under some circumstances, and reconnection is not being triggered.

Im working on a pr to add aimqtt to a project to replace gmqtt with aiomqtt. It seems that connection drop is not detected sometimes. I've checked the broker and see that LWT has been sent but no MqttError was raised.

Interestingly restarting the broker while the client is in this state will correctly trigger an MqttError and trigger reconnection.

Any help would be greatly appreciated!

I read messages in a loop, and send messages in separate tasks.

       while True:
            try:
                LOG.debug(
                    "Connecting to %s:%s as %s",
                    self.host,
                    self.port,
                    self.publisher_id,
                )
                async with client as client_context:
                    self.client = client_context
                    self.__connected.set()
                    await self.__on_connect()
                    async for message in client_context.messages:
                        await self._on_message(
                            client_context,
                            str(message.topic),
                            message.payload,
                            message.qos,
                            message.properties,
                        )
            except aiomqtt.MqttError:
                LOG.warning(
                    "Connection to %s:%s lost; Reconnecting in %d seconds ...",
                    self.host,
                    self.port,
                    reconnect_interval,
                )
                await asyncio.sleep(reconnect_interval)
            except asyncio.exceptions.CancelledError:
                LOG.debug("MQTT publisher loop cancelled")
                raise
            finally:
                self.__connected.clear()
                LOG.info("MQTT client disconnected")
def __publish(self, topic: str, payload: Any) -> None:
        LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload)
        loop = asyncio.get_running_loop()
        asyncio.run_coroutine_threadsafe(
            self.__async_publish(topic, payload, retain=True), loop
        )

    async def __async_publish(self, topic: str, payload: Any, retain: bool) -> None:
        if not (self.client and self.is_connected()):
            LOG.error("Failed to publish: MQTT client is not connected")
            return
        try:
            await self.client.publish(topic, payload, retain)
        except aiomqtt.MqttError as e:
            LOG.error(
                f"Failed to publish to MQTT topic {topic} with payload {payload}: {e}"
            )

https://github.com/bj00rn/saic-python-mqtt-gateway/blob/63cc251bd662dc5bb6190a26a0a72f56195acb73/src/publisher/mqtt_publisher.py#L57-L110

  • The version of aiomqtt 2.4.0
  • A minimal self-contained code snippet that reproduces the bug (you can use test.mosquitto.org for a publicly available MQTT broker)
  • Possibly related issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions