Skip to content

Commit 92a8862

Browse files
committed
Fix subscribe error and improve MQTT 5 session persistence
1 parent c7e0cf5 commit 92a8862

File tree

2 files changed

+38
-11
lines changed

2 files changed

+38
-11
lines changed

inelsmqtt/__init__.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from typing import Any, Callable, Optional
1111

1212
import paho.mqtt.client as mqtt
13+
from paho.mqtt.packettypes import PacketTypes
14+
from paho.mqtt.properties import Properties
1315

1416
from inelsmqtt.const import GATEWAY
1517
from inelsmqtt.utils.core import ProtocolHandlerMapper
@@ -59,7 +61,7 @@ def __init__(
5961
transport (str): transportation protocol. Can be used tcp or websockets, defaltut tcp
6062
debug (bool): flag for debuging mqtt comunication. Default False
6163
"""
62-
proto = config.get(MQTT_PROTOCOL) if config.get(MQTT_PROTOCOL) else mqtt.MQTTv311
64+
self.__proto = config.get(MQTT_PROTOCOL) if config.get(MQTT_PROTOCOL) else mqtt.MQTTv311
6365

6466
_t: str = (config.get(MQTT_TRANSPORT) if config.get(MQTT_TRANSPORT) else "tcp").lower()
6567

@@ -69,7 +71,7 @@ def __init__(
6971
if (client_id := config.get(MQTT_CLIENT_ID)) is None:
7072
client_id = mqtt.base62(uuid.uuid4().int, padding=22)
7173

72-
self.__client = mqtt.Client(client_id, protocol=proto, transport=_t)
74+
self.__client = mqtt.Client(client_id, protocol=self.__proto, transport=_t)
7375

7476
self.__client.on_connect = self.__on_connect
7577
self.__client.on_subscribe = self.__on_subscribe
@@ -210,7 +212,18 @@ def __connect(self) -> None:
210212
"""
211213
if not self.__client.is_connected():
212214
try:
213-
self.__client.connect(self.__host, self.__port)
215+
if self.__proto == 5:
216+
properties = Properties(PacketTypes.CONNECT)
217+
properties.SessionExpiryInterval = 3600 # in seconds
218+
self.__client.connect(
219+
self.__host,
220+
self.__port,
221+
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY,
222+
properties=properties,
223+
keepalive=60,
224+
)
225+
else:
226+
self.__client.connect(self.__host, self.__port, keepalive=60)
214227
self.__client.loop_start()
215228
except Exception as e:
216229
_LOGGER.error("Failed to connect to MQTT broker: %s", e)
@@ -219,9 +232,9 @@ def __connect(self) -> None:
219232
start_time = datetime.now()
220233

221234
while self.__try_connect is False:
222-
# there should be timeout to discover all topics
223235
time_delta = datetime.now() - start_time
224236
if time_delta.total_seconds() > self.__timeout:
237+
_LOGGER.error("Connection attempt timed out")
225238
self.__try_connect = self.__is_available = False
226239
break
227240

@@ -241,12 +254,18 @@ def __on_disconnect(
241254
userdata (Any): users data
242255
reason_code (number): reason code
243256
"""
244-
_LOGGER.info("%s - disconnecting reason [%s]", self.__host, reason_code)
257+
_LOGGER.warning("%s - disconnecting reason [%s]", self.__host, reason_code)
258+
259+
self.__is_available = False
245260

246261
for item in self.__is_subscribed_list.keys():
247262
self.__is_subscribed_list[item] = False
248263
_LOGGER.info("Disconnected %s", item)
249264

265+
# Notify any condition variables waiting on __expected_mid
266+
with self.__subscription_condition:
267+
self.__subscription_condition.notify_all()
268+
250269
def __on_connect(
251270
self,
252271
client: mqtt.Client, # pylint: disable=unused-argument
@@ -346,10 +365,10 @@ def subscribe(self, topics, qos=0, options=None, properties=None) -> dict[str, s
346365
for topic, _ in filtered_topics:
347366
self.__expected_mid[topic] = mid
348367

349-
with self.__subscription_condition:
350-
self.__subscription_condition.wait_for(
351-
lambda: not self.__expected_mid.get(topic), timeout=self.__timeout
352-
)
368+
with self.__subscription_condition:
369+
self.__subscription_condition.wait_for(
370+
lambda: not self.__expected_mid.get(topic), timeout=self.__timeout
371+
)
353372

354373
for topic, _ in filtered_topics:
355374
if not self.__is_subscribed_list[topic]:
@@ -557,11 +576,19 @@ def __notify_listeners(self, stripped_topic: str, is_connected_message: bool) ->
557576

558577
def __disconnect(self) -> None:
559578
"""Disconnecting from broker and stopping broker's loop"""
560-
self.close()
561579
self.client.disconnect()
580+
self.close()
562581

563582
def close(self) -> None:
564583
"""Close loop."""
584+
_LOGGER.warning("Close called from HA")
585+
586+
self.__is_available = False
587+
588+
# Notify any condition variables waiting on __expected_mid
589+
with self.__subscription_condition:
590+
self.__subscription_condition.notify_all()
591+
565592
self.client.loop_stop()
566593

567594
def disconnect(self) -> None:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
setup(
66
name="elkoep-mqtt",
7-
version="0.2.33.beta.5",
7+
version="0.2.33.beta.6",
88
url="https://github.com/epdevlab/elkoep-mqtt",
99
license="MIT",
1010
author="Elko EP s.r.o.",

0 commit comments

Comments
 (0)