Skip to content

Commit c7f515b

Browse files
committed
retain birth/keepalive message
1 parent 63cc251 commit c7f515b

File tree

3 files changed

+148
-29
lines changed

3 files changed

+148
-29
lines changed

src/publisher/core.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,61 @@ def is_connected(self) -> bool:
5555

5656
@abstractmethod
5757
def publish_json(
58-
self, key: str, data: dict[str, Any], no_prefix: bool = False
58+
self,
59+
key: str,
60+
data: dict[str, Any],
61+
no_prefix: bool = False,
62+
retain: bool = False,
63+
qos: int = 0,
5964
) -> None:
6065
raise NotImplementedError
6166

6267
@abstractmethod
63-
def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None:
68+
def publish_str(
69+
self,
70+
key: str,
71+
value: str,
72+
no_prefix: bool = False,
73+
retain: bool = False,
74+
qos: int = 0,
75+
) -> None:
6476
raise NotImplementedError
6577

6678
@abstractmethod
67-
def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None:
79+
def publish_int(
80+
self,
81+
key: str,
82+
value: int,
83+
no_prefix: bool = False,
84+
retain: bool = False,
85+
qos: int = 0,
86+
) -> None:
6887
raise NotImplementedError
6988

7089
@abstractmethod
71-
def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None:
90+
def publish_bool(
91+
self,
92+
key: str,
93+
value: bool,
94+
no_prefix: bool = False,
95+
retain: bool = False,
96+
qos: int = 0,
97+
) -> None:
7298
raise NotImplementedError
7399

74100
@abstractmethod
75-
def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None:
101+
def publish_float(
102+
self,
103+
key: str,
104+
value: float,
105+
no_prefix: bool = False,
106+
retain: bool = False,
107+
qos: int = 0,
108+
) -> None:
76109
raise NotImplementedError
77110

78111
@abstractmethod
79-
def clear_topic(self, key: str, no_prefix: bool = False) -> None:
112+
def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None:
80113
raise NotImplementedError
81114

82115
def get_mqtt_account_prefix(self) -> str:
@@ -154,7 +187,9 @@ def __anonymize(self, data: T) -> T:
154187
return data
155188

156189
def keepalive(self) -> None:
157-
self.publish_str(mqtt_topics.INTERNAL_LWT, "online", False)
190+
self.publish_str(
191+
mqtt_topics.INTERNAL_LWT, "online", no_prefix=False, retain=True, qos=1
192+
)
158193

159194
@staticmethod
160195
def anonymize_str(value: str) -> str:

src/publisher/log_publisher.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,62 @@ def enable_commands(self) -> None:
2424

2525
@override
2626
def publish_json(
27-
self, key: str, data: dict[str, Any], no_prefix: bool = False
27+
self,
28+
key: str,
29+
data: dict[str, Any],
30+
no_prefix: bool = False,
31+
retain: bool = False,
32+
qos: int = 0,
2833
) -> None:
2934
anonymized_json = self.dict_to_anonymized_json(data)
3035
self.internal_publish(key, anonymized_json)
3136

3237
@override
33-
def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None:
38+
def publish_str(
39+
self,
40+
key: str,
41+
value: str,
42+
no_prefix: bool = False,
43+
retain: bool = False,
44+
qos: int = 0,
45+
) -> None:
3446
self.internal_publish(key, value)
3547

3648
@override
37-
def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None:
49+
def publish_int(
50+
self,
51+
key: str,
52+
value: int,
53+
no_prefix: bool = False,
54+
retain: bool = False,
55+
qos: int = 0,
56+
) -> None:
3857
self.internal_publish(key, value)
3958

4059
@override
41-
def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None:
60+
def publish_bool(
61+
self,
62+
key: str,
63+
value: bool,
64+
no_prefix: bool = False,
65+
retain: bool = False,
66+
qos: int = 0,
67+
) -> None:
4268
self.internal_publish(key, value)
4369

4470
@override
45-
def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None:
71+
def publish_float(
72+
self,
73+
key: str,
74+
value: float,
75+
no_prefix: bool = False,
76+
retain: bool = False,
77+
qos: int = 0,
78+
) -> None:
4679
self.internal_publish(key, value)
4780

4881
@override
49-
def clear_topic(self, key: str, no_prefix: bool = False) -> None:
82+
def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None:
5083
self.internal_publish(key, None)
5184

5285
def internal_publish(self, key: str, value: Any) -> None:

src/publisher/mqtt_publisher.py

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ async def __run_loop(self) -> None:
7070
topic=self.get_topic(mqtt_topics.INTERNAL_LWT, False),
7171
payload="offline",
7272
retain=True,
73+
qos=1,
7374
),
7475
)
7576
client.pending_calls_threshold = 150
@@ -220,19 +221,23 @@ async def __on_message_real(self, *, topic: str, payload: str) -> None:
220221
vin=vin, topic=topic, payload=payload
221222
)
222223

223-
def __publish(self, topic: str, payload: Any) -> None:
224+
def __publish(
225+
self, topic: str, payload: Any, retain: bool = False, qos: int = 0
226+
) -> None:
224227
LOG.debug("Publishing to MQTT topic %s with payload %s", topic, payload)
225228
loop = asyncio.get_running_loop()
226229
asyncio.run_coroutine_threadsafe(
227-
self.__async_publish(topic, payload, retain=True), loop
230+
self.__async_publish(topic, payload, retain=retain, qos=qos), loop
228231
)
229232

230-
async def __async_publish(self, topic: str, payload: Any, retain: bool) -> None:
233+
async def __async_publish(
234+
self, topic: str, payload: Any, retain: bool, qos: int
235+
) -> None:
231236
if not (self.client and self.is_connected()):
232237
LOG.error("Failed to publish: MQTT client is not connected")
233238
return
234239
try:
235-
await self.client.publish(topic, payload, retain)
240+
await self.client.publish(topic, payload, retain=retain, qos=qos)
236241
except aiomqtt.MqttError as e:
237242
LOG.error(
238243
f"Failed to publish to MQTT topic {topic} with payload {payload}: {e}"
@@ -244,30 +249,76 @@ def is_connected(self) -> bool:
244249

245250
@override
246251
def publish_json(
247-
self, key: str, data: dict[str, Any], no_prefix: bool = False
252+
self,
253+
key: str,
254+
data: dict[str, Any],
255+
no_prefix: bool = False,
256+
retain: bool = False,
257+
qos: int = 0,
248258
) -> None:
249259
payload = self.dict_to_anonymized_json(data)
250-
self.__publish(topic=self.get_topic(key, no_prefix), payload=payload)
260+
self.__publish(
261+
topic=self.get_topic(key, no_prefix),
262+
payload=payload,
263+
retain=retain,
264+
qos=qos,
265+
)
251266

252267
@override
253-
def publish_str(self, key: str, value: str, no_prefix: bool = False) -> None:
254-
self.__publish(topic=self.get_topic(key, no_prefix), payload=value)
268+
def publish_str(
269+
self,
270+
key: str,
271+
value: str,
272+
no_prefix: bool = False,
273+
retain: bool = False,
274+
qos: int = 0,
275+
) -> None:
276+
self.__publish(
277+
topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos
278+
)
255279

256280
@override
257-
def publish_int(self, key: str, value: int, no_prefix: bool = False) -> None:
258-
self.__publish(topic=self.get_topic(key, no_prefix), payload=value)
281+
def publish_int(
282+
self,
283+
key: str,
284+
value: int,
285+
no_prefix: bool = False,
286+
retain: bool = False,
287+
qos: int = 0,
288+
) -> None:
289+
self.__publish(
290+
topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos
291+
)
259292

260293
@override
261-
def publish_bool(self, key: str, value: bool, no_prefix: bool = False) -> None:
262-
self.__publish(topic=self.get_topic(key, no_prefix), payload=value)
294+
def publish_bool(
295+
self,
296+
key: str,
297+
value: bool,
298+
no_prefix: bool = False,
299+
retain: bool = False,
300+
qos: int = 0,
301+
) -> None:
302+
self.__publish(
303+
topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos
304+
)
263305

264306
@override
265-
def publish_float(self, key: str, value: float, no_prefix: bool = False) -> None:
266-
self.__publish(topic=self.get_topic(key, no_prefix), payload=value)
307+
def publish_float(
308+
self,
309+
key: str,
310+
value: float,
311+
no_prefix: bool = False,
312+
retain: bool = False,
313+
qos: int = 0,
314+
) -> None:
315+
self.__publish(
316+
topic=self.get_topic(key, no_prefix), payload=value, retain=retain, qos=qos
317+
)
267318

268319
@override
269-
def clear_topic(self, key: str, no_prefix: bool = False) -> None:
270-
self.__publish(topic=self.get_topic(key, no_prefix), payload=None)
320+
def clear_topic(self, key: str, no_prefix: bool = False, qos: int = 0) -> None:
321+
self.__publish(topic=self.get_topic(key, no_prefix), payload=None, qos=qos)
271322

272323
def get_vin_from_topic(self, topic: str) -> str:
273324
global_topic_removed = topic[len(self.configuration.mqtt_topic) + 1 :]

0 commit comments

Comments
 (0)