Skip to content

Commit 42e2f1e

Browse files
committed
fix client not reconnecting
1 parent 5db90ab commit 42e2f1e

File tree

1 file changed

+19
-13
lines changed

1 file changed

+19
-13
lines changed

src/publisher/mqtt_publisher.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,32 +73,41 @@ async def __run_loop(self) -> None:
7373
),
7474
)
7575
client.pending_calls_threshold = 150
76-
self.client = client
7776
reconnect_interval = 5
7877
while True:
7978
try:
80-
async with client:
79+
LOG.debug(
80+
"Connecting to %s:%s as %s",
81+
self.host,
82+
self.port,
83+
self.publisher_id,
84+
)
85+
async with client as client_context:
86+
self.client = client_context
8187
self.__connected.set()
8288
await self.__on_connect()
83-
async for message in client.messages:
89+
async for message in client_context.messages:
8490
await self._on_message(
85-
client,
91+
client_context,
8692
str(message.topic),
8793
message.payload,
8894
message.qos,
8995
message.properties,
9096
)
91-
except aiomqtt.MqttError as e:
97+
except aiomqtt.MqttError:
9298
LOG.error(
93-
f"Connection to MQTT broker lost; Reconnecting in {reconnect_interval} seconds ...: {e}"
99+
"Connection to %s:%s lost; Reconnecting in %d seconds ...",
100+
self.host,
101+
self.port,
102+
reconnect_interval,
103+
exc_info=True,
94104
)
95105
await asyncio.sleep(reconnect_interval)
96106
except asyncio.exceptions.CancelledError:
97107
LOG.debug("MQTT publisher loop cancelled")
98108
raise
99109
finally:
100110
self.__connected.clear()
101-
self.client = None
102111
LOG.info("MQTT client disconnected")
103112

104113
@override
@@ -123,7 +132,7 @@ def enable_commands(self) -> None:
123132

124133
async def __enable_commands(self) -> None:
125134
if not self.__connected.is_set() or not self.client:
126-
LOG.error("MQTT client is not connected")
135+
LOG.error("Failed to enable commands: MQTT client is not connected")
127136
return
128137
try:
129138
LOG.info("Subscribing to MQTT command topics")
@@ -213,18 +222,15 @@ async def __on_message_real(self, *, topic: str, payload: str) -> None:
213222
)
214223

215224
def __publish(self, topic: str, payload: Any) -> None:
216-
if not (self.client and self.is_connected()):
217-
LOG.error("MQTT client is not connected")
218-
return
225+
LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload)
219226
loop = asyncio.get_running_loop()
220227
asyncio.run_coroutine_threadsafe(
221228
self.__async_publish(topic, payload, retain=True), loop
222229
)
223-
LOG.debug(f"Publishing to MQTT topic {topic} with payload {payload}")
224230

225231
async def __async_publish(self, topic: str, payload: Any, retain: bool) -> None:
226232
if not (self.client and self.is_connected()):
227-
LOG.error("MQTT client is not connected")
233+
LOG.error("Failed to publish: MQTT client is not connected")
228234
return
229235
try:
230236
await self.client.publish(topic, payload, retain)

0 commit comments

Comments
 (0)