Skip to content

Commit af9c1b8

Browse files
committed
Added simple deactivation/reactivation mechanism
Code restructuring
1 parent 34330d9 commit af9c1b8

File tree

2 files changed

+116
-103
lines changed

2 files changed

+116
-103
lines changed

modbus2mqtt_2/main.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,29 @@ def __init__(self, diag_rate:float, mqtt_client:MqttClient, mb_master:ModbusMast
4040
self.runtask = None
4141

4242
def run_workloop(self, task_group):
43-
if self.diag_rate > 0:
44-
self.runtask = task_group.create_task(self._workloop())
45-
46-
async def _workloop(self):
47-
try:
48-
while True:
43+
#...........................................................................................
44+
async def workloop():
45+
try:
46+
while True:
4947

50-
try:
51-
await self.publish_modbus_diag(self.mb_master)
52-
except Exception as e:
53-
logger.error(f'Publishing modbus diagnostics for {self.mb_master}: {e}')
54-
55-
for dev in Device.all_devices.values():
5648
try:
57-
await self.publish_device_diag( dev)
49+
await self.publish_modbus_diag(self.mb_master)
5850
except Exception as e:
59-
logger.error(f'Publishing device diagnostics for {dev}: {e}')
51+
logger.error(f'Publishing modbus diagnostics for {self.mb_master}: {e}')
52+
53+
for dev in Device.all_devices.values():
54+
try:
55+
await self.publish_device_diag( dev)
56+
except Exception as e:
57+
logger.error(f'Publishing device diagnostics for {dev}: {e}')
6058

61-
await asyncio.sleep(self.diag_rate)
59+
await asyncio.sleep(self.diag_rate)
6260

63-
except asyncio.exceptions.CancelledError as e:
64-
logger.debug(f'Diagnostics task stopped ({self}).')
61+
except asyncio.exceptions.CancelledError as e:
62+
logger.debug(f'Diagnostics task stopped ({self}).')
63+
#...........................................................................................
64+
if self.diag_rate > 0:
65+
self.runtask = task_group.create_task(workloop())
6566

6667
async def publish_modbus_diag(self, mb_master:ModbusMaster) -> None :
6768
(stats, stats_old) = mb_master.get_statistics()

modbus2mqtt_2/modbus_objects.py

Lines changed: 98 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -132,26 +132,30 @@ async def read_from_slave(self, function_code:int, start_reg:int, len_regs:int,
132132

133133

134134
def run_workloop(self, task_group=None):
135-
self.runtask = task_group.create_task(self._workloop())
136-
137-
async def _workloop(self) -> None:
138-
try:
139-
while True:
140-
if self.master.connected:
141-
await asyncio.sleep(2)
142-
continue
143-
logger.info(f'Connecting to Modbus')
144-
await self.master.connect()
145-
if self.master.connected:
146-
for dev in self.devices:
147-
dev.enable()
148-
logger.info(f'Modbus connected successfully')
149-
else:
150-
for dev in self.devices:
151-
dev.disable()
152-
logger.info(f'Modbus NOT connected')
153-
except asyncio.exceptions.CancelledError as e:
154-
logger.debug(f'Modbus master task stopped ({self}).')
135+
#...........................................................................................
136+
async def workloop() -> None:
137+
try:
138+
while True:
139+
if self.master.connected:
140+
#logger.info(f'Modbus STILL connected')
141+
await asyncio.sleep(2)
142+
continue
143+
logger.info(f'Connecting to Modbus')
144+
await self.master.connect()
145+
if self.master.connected:
146+
for dev in self.devices:
147+
dev.enable()
148+
logger.info(f'Modbus connected successfully')
149+
else:
150+
for dev in self.devices:
151+
dev.disable()
152+
self.stats.reads_error += 1
153+
self.stats.reads_total += 1
154+
logger.info(f'Modbus NOT connected')
155+
except asyncio.exceptions.CancelledError as e:
156+
logger.debug(f'Modbus master task stopped ({self}).')
157+
#...........................................................................................
158+
self.runtask = task_group.create_task(workloop())
155159

156160
def register_device(self, device:'Device') -> None:
157161
self.devices.append(device)
@@ -179,33 +183,34 @@ def add_set_request(self,req_userdata, req_msg):
179183
self.set_request_queue.put_nowait((req_userdata, req_msg))
180184

181185
def run_workloop(self, task_group):
182-
self.runtask = task_group.create_task(self._workloop())
183-
184-
async def _workloop(self):
185-
try:
186-
while True:
187-
(req_userdata, req_msg) = await self.set_request_queue.get()
188-
try:
189-
short_topic = str(req_msg.topic).removeprefix(self.mqtt_client.get_topic_base()+'/')
190-
topic_parts = short_topic.split('/')
191-
device_name = topic_parts[0]
192-
value_topic = topic_parts[-1]
193-
if device_name == self.mqtt_client.clientid:
194-
# Here go any device level subscriptions
195-
pass
196-
else:
197-
the_dev:Device = Device.all_devices[device_name]
198-
if the_dev is None:
199-
logger.warning( f'Tried writing to unknown device {device_name} by MQTT topic {req_msg.topic}.')
186+
#...........................................................................................
187+
async def workloop():
188+
try:
189+
while True:
190+
(req_userdata, req_msg) = await self.set_request_queue.get()
191+
try:
192+
short_topic = str(req_msg.topic).removeprefix(self.mqtt_client.get_topic_base()+'/')
193+
topic_parts = short_topic.split('/')
194+
device_name = topic_parts[0]
195+
value_topic = topic_parts[-1]
196+
if device_name == self.mqtt_client.clientid:
197+
# Here go any device level subscriptions
198+
pass
200199
else:
201-
payload = str(req_msg.payload.decode("utf-8"))
202-
await the_dev.write_to_device( payload, req_msg.topic, device_name, value_topic)
203-
except Exception as e:
204-
logger.error(f'Error handling MQTT set request: {e}')
205-
206-
self.set_request_queue.task_done()
207-
except asyncio.exceptions.CancelledError as e:
208-
logger.debug(f'Modbus writer task stopped ({self}).')
200+
the_dev:Device = Device.all_devices[device_name]
201+
if the_dev is None:
202+
logger.warning( f'Tried writing to unknown device {device_name} by MQTT topic {req_msg.topic}.')
203+
else:
204+
payload = str(req_msg.payload.decode("utf-8"))
205+
await the_dev.write_to_device( payload, req_msg.topic, device_name, value_topic)
206+
except Exception as e:
207+
logger.error(f'Error handling MQTT set request: {e}')
208+
209+
self.set_request_queue.task_done()
210+
except asyncio.exceptions.CancelledError as e:
211+
logger.debug(f'Modbus writer task stopped ({self}).')
212+
#...........................................................................................
213+
self.runtask = task_group.create_task(workloop())
209214

210215

211216
class Device:
@@ -262,18 +267,32 @@ def __init__(self, config_source, mqttc:MqttClient, modbus_master:ModbusMaster,
262267
#
263268

264269
def disable(self) -> None :
265-
if self.enabled: # do not use the method is_enabled() here. Just look at our own status!
270+
if self.enabled: # do not use the method is_ready_to_comm() here. Just look at our own status!
266271
self.mqttc.publish_device_availability(self.name, False)
267272
self.enabled = False
268273

269274
def enable(self) -> None :
270-
if not self.enabled: # do not use the method is_enabled() here. Just look at our own status!
275+
if not self.enabled: # do not use the method is_ready_to_comm() here. Just look at our own status!
271276
self.mqttc.publish_device_availability(self.name, True)
272277
self.enabled = True
273278

274-
def is_enabled(self) -> bool:
279+
def is_ready_to_comm(self) -> bool:
275280
return (self.modbus_master.is_connected() and self.enabled)
281+
276282

283+
def schedule_reenable(self, task_group):
284+
#...........................................................................................
285+
async def workloop(sleeptime) -> None :
286+
try:
287+
logger.info(f'Scheduled reenabling {self} in {sleeptime}s')
288+
await asyncio.sleep(sleeptime)
289+
self.enable()
290+
except asyncio.exceptions.CancelledError as e:
291+
logger.debug(f'Reenabler task stopped ({self}).')
292+
#...........................................................................................
293+
sleeptime = 120 # XXX Make this configurable
294+
self.reenable_task = task_group.create_task(workloop(sleeptime))
295+
277296

278297
#------------------------------------------------------------------------------------------------------------------
279298
# Registering
@@ -300,7 +319,7 @@ def get_statistics(self):
300319
return (stats, stats_last)
301320

302321

303-
def count_new_poll( self, was_successfull:bool):
322+
def count_new_poll( self, was_successfull:bool, task_group):
304323
self.stats.reads_total += 1
305324
if was_successfull:
306325
self.consec_fail_cnt = 0
@@ -310,19 +329,12 @@ def count_new_poll( self, was_successfull:bool):
310329
self.stats.reads_error +=1
311330
self.consec_fail_cnt += 1
312331
if self.consec_fail_cnt == 3:
313-
self.mqttc.publish_device_availability(self.name, False) # replace with self.disable() once re-enable logic is done
314-
#if globs.args.autoremove:
315-
# globs.logger.info("Poller "+self.topic+" with Slave-ID "+str(self.slaveid)+" disabled (functioncode: "+str(
316-
# self.functioncode)+", start reference: "+str(self.reference)+", size: "+str(self.size)+").")
317-
# for p in pollers: # also fail all pollers with the same slave id
318-
# if p.slaveid == self.slaveid:
319-
# p.failcounter = 3
320-
# p.disabled = True
321-
# globs.logger.info("Poller "+p.topic+" with Slave-ID "+str(p.slaveid)+" disabled (functioncode: "+str(
322-
# p.functioncode)+", start reference: "+str(p.reference)+", size: "+str(p.size)+").")
323-
#self.disable()
332+
self.disable()
333+
self.schedule_reenable(task_group)
334+
logger.info(f'Disable device {self} after {self.consec_fail_cnt} consequtive failures.')
324335
self.last_poll_success = was_successfull
325336

337+
326338
async def write_to_device(self, payload_str:str, full_topic:str, dev_topic, val_topic) -> None:
327339
the_ref:Reference = self.references[val_topic]
328340
if the_ref is None :
@@ -425,15 +437,15 @@ def __init__(self, config_source, device:Device, start_reg:int, len_regs:int, re
425437
logger.debug(f'Added poller {self}')
426438

427439

428-
def is_enabled(self) -> bool:
429-
return self.device.is_enabled()
440+
def is_ready_to_comm(self) -> bool:
441+
return self.device.is_ready_to_comm()
430442

431443

432-
async def poll(self) -> None :
444+
async def poll(self, task_group) -> None :
433445
try:
434446
data = await self.device.modbus_master.read_from_slave(self.function_code, self.start_reg, self.len_regs, self.device.slaveid)
435447
except Exception as e:
436-
self.device.count_new_poll( False)
448+
self.device.count_new_poll( False, task_group)
437449
raise Exception( f'Error reading from Modbus ({self}): {e}')
438450

439451
try:
@@ -442,33 +454,33 @@ async def poll(self) -> None :
442454
raw_val = data[ref.start_reg_relative : (ref.data_converter.reg_cnt+ref.start_reg_relative)]
443455
ref.publish_value(raw_val)
444456
except Exception as e:
445-
self.device.count_new_poll( False)
457+
self.device.count_new_poll( False, task_group)
446458
raise Exception( f'Error publishing value from Modbus ({self}): {e}')
447459

448-
self.device.count_new_poll( True)
460+
self.device.count_new_poll( True, task_group)
449461

450462

451463
def run_workloop(self, task_group):
452-
self.runtask = task_group.create_task(self._workloop())
453-
454-
455-
async def _workloop(self) -> None :
456-
while not self.is_enabled(): # Wait with our initial delay to be enabled
457-
await asyncio.sleep(0.5)
458-
await asyncio.sleep(self.poll_rate*random.uniform(0, 1)) # Delay start for a random time to distribute bus usage a bit
459-
try:
460-
while True:
461-
if not self.is_enabled(): # If we're disabled, just wait a bit and give it another try
464+
#...........................................................................................
465+
async def workloop() -> None :
466+
try:
467+
while not self.is_ready_to_comm(): # Wait with our initial delay to be ready for communication
462468
await asyncio.sleep(0.5)
463-
continue
464-
logger.debug(f'Polling... ({self}).')
465-
try:
466-
await self.poll()
467-
except Exception as e:
468-
logger.error(f'Error polling ({self}): {e}')
469-
await asyncio.sleep(self.poll_rate)
470-
except asyncio.exceptions.CancelledError as e:
471-
logger.debug(f'Poller task stopped ({self}).')
469+
await asyncio.sleep(self.poll_rate*random.uniform(0, 1)) # Delay start for a random time to distribute bus usage a bit
470+
while True:
471+
if not self.is_ready_to_comm(): # If we're unable to communicate, just wait a bit and give it another try
472+
await asyncio.sleep(0.5)
473+
continue
474+
logger.debug(f'Polling... ({self}).')
475+
try:
476+
await self.poll(task_group)
477+
except Exception as e:
478+
logger.error(f'Error polling ({self}): {e}')
479+
await asyncio.sleep(self.poll_rate)
480+
except asyncio.exceptions.CancelledError as e:
481+
logger.debug(f'Poller task stopped ({self}).')
482+
#...........................................................................................
483+
self.runtask = task_group.create_task(workloop())
472484

473485

474486
def register_reference(self, new_ref:'Reference') -> None :

0 commit comments

Comments
 (0)