@@ -49,6 +49,7 @@ def __init__(self, port: str, message_processor, node_state):
4949 self .connection = None
5050 self .discovery_finished = False
5151 self .expected_responses = {}
52+ self .lock_expected_responses = threading .Lock ()
5253 self .init_callback = None
5354 self .last_seq_id = None
5455 self .message_processor = message_processor
@@ -151,66 +152,67 @@ def send(
151152 def resend (self , seq_id ):
152153 """Resend message."""
153154 _mac = "<unknown>"
154- if not self .expected_responses .get (seq_id ):
155- _LOGGER .warning (
156- "Cannot resend unknown request %s" ,
157- str (seq_id ),
158- )
159- else :
160- if self .expected_responses [seq_id ][0 ].mac :
161- _mac = self .expected_responses [seq_id ][0 ].mac .decode (UTF8_DECODE )
162- _request = self .expected_responses [seq_id ][0 ].__class__ .__name__
155+ with self .lock_expected_responses :
156+ if not self .expected_responses .get (seq_id ):
157+ _LOGGER .warning (
158+ "Cannot resend unknown request %s" ,
159+ str (seq_id ),
160+ )
161+ else :
162+ if self .expected_responses [seq_id ][0 ].mac :
163+ _mac = self .expected_responses [seq_id ][0 ].mac .decode (UTF8_DECODE )
164+ _request = self .expected_responses [seq_id ][0 ].__class__ .__name__
163165
164- if self .expected_responses [seq_id ][2 ] == - 1 :
165- _LOGGER .debug ("Drop single %s to %s " , _request , _mac )
166- elif self .expected_responses [seq_id ][2 ] <= MESSAGE_RETRY :
167- if (
168- isinstance (self .expected_responses [seq_id ][0 ], NodeInfoRequest )
169- and not self .discovery_finished
170- ):
171- # Time out for node which is not discovered yet
172- # to speedup the initial discover phase skip retries and mark node as not discovered.
173- _LOGGER .debug (
174- "Skip retry %s to %s to speedup discover process" ,
175- _request ,
176- _mac ,
177- )
178- if self .expected_responses [seq_id ][1 ]:
179- self .expected_responses [seq_id ][1 ]()
166+ if self .expected_responses [seq_id ][2 ] == - 1 :
167+ _LOGGER .debug ("Drop single %s to %s " , _request , _mac )
168+ elif self .expected_responses [seq_id ][2 ] <= MESSAGE_RETRY :
169+ if (
170+ isinstance (self .expected_responses [seq_id ][0 ], NodeInfoRequest )
171+ and not self .discovery_finished
172+ ):
173+ # Time out for node which is not discovered yet
174+ # to speedup the initial discover phase skip retries and mark node as not discovered.
175+ _LOGGER .debug (
176+ "Skip retry %s to %s to speedup discover process" ,
177+ _request ,
178+ _mac ,
179+ )
180+ if self .expected_responses [seq_id ][1 ]:
181+ self .expected_responses [seq_id ][1 ]()
182+ else :
183+ _LOGGER .info (
184+ "Resend %s for %s, retry %s of %s" ,
185+ _request ,
186+ _mac ,
187+ str (self .expected_responses [seq_id ][2 ] + 1 ),
188+ str (MESSAGE_RETRY + 1 ),
189+ )
190+ self .send (
191+ self .expected_responses [seq_id ][0 ],
192+ self .expected_responses [seq_id ][1 ],
193+ self .expected_responses [seq_id ][2 ] + 1 ,
194+ )
180195 else :
181- _LOGGER .info (
182- "Resend %s for %s, retry %s of %s" ,
196+ _LOGGER .warning (
197+ "Drop %s to %s because max retries %s reached " ,
183198 _request ,
184199 _mac ,
185- str (self .expected_responses [seq_id ][2 ] + 1 ),
186200 str (MESSAGE_RETRY + 1 ),
187201 )
188- self .send (
189- self .expected_responses [seq_id ][0 ],
190- self .expected_responses [seq_id ][1 ],
191- self .expected_responses [seq_id ][2 ] + 1 ,
192- )
193- else :
194- _LOGGER .warning (
195- "Drop %s to %s because max retries %s reached" ,
196- _request ,
197- _mac ,
198- str (MESSAGE_RETRY + 1 ),
199- )
200- # Report node as unavailable for missing NodePingRequest
201- if isinstance (self .expected_responses [seq_id ][0 ], NodePingRequest ):
202- self .node_state (_mac , False )
203- else :
204- _LOGGER .debug (
205- "Do a single ping request to %s to validate if node is reachable" ,
206- _mac ,
207- )
208- self .send (
209- NodePingRequest (self .expected_responses [seq_id ][0 ].mac ),
210- None ,
211- MESSAGE_RETRY + 1 ,
212- )
213- del self .expected_responses [seq_id ]
202+ # Report node as unavailable for missing NodePingRequest
203+ if isinstance (self .expected_responses [seq_id ][0 ], NodePingRequest ):
204+ self .node_state (_mac , False )
205+ else :
206+ _LOGGER .debug (
207+ "Do a single ping request to %s to validate if node is reachable" ,
208+ _mac ,
209+ )
210+ self .send (
211+ NodePingRequest (self .expected_responses [seq_id ][0 ].mac ),
212+ None ,
213+ MESSAGE_RETRY + 1 ,
214+ )
215+ del self .expected_responses [seq_id ]
214216
215217 def _send_message_loop (self ):
216218 """Daemon to send messages waiting in queue."""
@@ -225,28 +227,26 @@ def _send_message_loop(self):
225227 # Calc next seq_id based last received ack message
226228 # if previous seq_id is unknown use fake b"0000"
227229 seq_id = inc_seq_id (self .last_seq_id )
228- self .expected_responses [seq_id ] = request_set
229- mac = "None"
230- if hasattr (self .expected_responses [seq_id ][0 ], "mac" ):
231- mac = self .expected_responses [seq_id ][0 ].mac
232- if self .expected_responses [seq_id ][2 ] == 0 :
233- _LOGGER .info (
234- "Send %s to %s using seq_id %s" ,
235- self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
236- mac ,
237- str (seq_id ),
238- )
239- else :
240- _LOGGER .info (
241- "Resend %s to %s using seq_id %s, retry %s" ,
242- self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
243- mac ,
244- str (seq_id ),
245- str (self .expected_responses [seq_id ][2 ]),
246- )
247- self .expected_responses [seq_id ][3 ] = datetime .now ()
248- # Send request
249- self .connection .send (self .expected_responses [seq_id ][0 ])
230+ with self .lock_expected_responses :
231+ self .expected_responses [seq_id ] = request_set
232+ if self .expected_responses [seq_id ][2 ] == 0 :
233+ _LOGGER .info (
234+ "Send %s to %s using seq_id %s" ,
235+ self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
236+ self .expected_responses [seq_id ][0 ].mac ,
237+ str (seq_id ),
238+ )
239+ else :
240+ _LOGGER .info (
241+ "Resend %s to %s using seq_id %s, retry %s" ,
242+ self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
243+ self .expected_responses [seq_id ][0 ].mac ,
244+ str (seq_id ),
245+ str (self .expected_responses [seq_id ][2 ]),
246+ )
247+ self .expected_responses [seq_id ][3 ] = datetime .now ()
248+ # Send request
249+ self .connection .send (self .expected_responses [seq_id ][0 ])
250250 time .sleep (SLEEP_TIME )
251251 timeout_counter = 0
252252 # Wait max 1 second for acknowledge response from USB-stick
@@ -289,63 +289,71 @@ def message_handler(self, message):
289289 )
290290
291291 def _post_message_action (self , seq_id , ack_response = None , request = "unknown" ):
292- """Execute action if request has been successful.."""
293- if seq_id in self .expected_responses :
294- if ack_response in (* REQUEST_SUCCESS , None ):
295- if self .expected_responses [seq_id ][1 ]:
296- _LOGGER .debug (
297- "Execute action %s of request with seq_id %s" ,
298- self .expected_responses [seq_id ][1 ].__name__ ,
299- str (seq_id ),
300- )
301- try :
302- self .expected_responses [seq_id ][1 ]()
303- # TODO: narrow exception
304- except Exception as err : # pylint: disable=broad-except
305- _LOGGER .error (
306- "Execution of %s for request with seq_id %s failed: %s" ,
292+ """Execute action if request has been successful."""
293+ resend_request = False
294+ with self .lock_expected_responses :
295+ if seq_id in self .expected_responses :
296+ if ack_response in (* REQUEST_SUCCESS , None ):
297+ if self .expected_responses [seq_id ][1 ]:
298+ _LOGGER .debug (
299+ "Execute action %s of request with seq_id %s" ,
307300 self .expected_responses [seq_id ][1 ].__name__ ,
308301 str (seq_id ),
309- err ,
310302 )
311- del self .expected_responses [seq_id ]
312- elif ack_response in REQUEST_FAILED :
313- self .resend (seq_id )
314- else :
315- if not self .last_seq_id :
316- if b"0000" in self .expected_responses :
317- self .expected_responses [seq_id ] = self .expected_responses [b"0000" ]
318- del self .expected_responses [b"0000" ]
319- self .last_seq_id = seq_id
303+ try :
304+ self .expected_responses [seq_id ][1 ]()
305+ # TODO: narrow exception
306+ except Exception as err : # pylint: disable=broad-except
307+ _LOGGER .error (
308+ "Execution of %s for request with seq_id %s failed: %s" ,
309+ self .expected_responses [seq_id ][1 ].__name__ ,
310+ str (seq_id ),
311+ err ,
312+ )
313+ del self .expected_responses [seq_id ]
314+ elif ack_response in REQUEST_FAILED :
315+ resend_request = True
320316 else :
321- _LOGGER .info (
322- "Drop unexpected %s%s using seq_id %s" ,
323- STATUS_RESPONSES .get (ack_response , "" ) + " " ,
324- request ,
325- str (seq_id ),
326- )
317+ if not self .last_seq_id :
318+ if b"0000" in self .expected_responses :
319+ self .expected_responses [seq_id ] = self .expected_responses [
320+ b"0000"
321+ ]
322+ del self .expected_responses [b"0000" ]
323+ self .last_seq_id = seq_id
324+ else :
325+ _LOGGER .info (
326+ "Drop unexpected %s%s using seq_id %s" ,
327+ STATUS_RESPONSES .get (ack_response , "" ) + " " ,
328+ request ,
329+ str (seq_id ),
330+ )
331+ if resend_request :
332+ self .resend (seq_id )
327333
328334 def _receive_timeout_loop (self ):
329335 """Daemon to time out open requests without any (n)ack response message."""
330336 while self ._receive_timeout_thread_state :
331- for seq_id in list (self .expected_responses .keys ()):
332- if self .expected_responses [seq_id ][3 ] is not None :
333- if self .expected_responses [seq_id ][3 ] < (
334- datetime .now () - timedelta (seconds = MESSAGE_TIME_OUT )
335- ):
336- _mac = "<unknown>"
337- if self .expected_responses [seq_id ][0 ].mac :
338- _mac = self .expected_responses [seq_id ][0 ].mac .decode (
339- UTF8_DECODE
337+ resend_list = []
338+ with self .lock_expected_responses :
339+ for seq_id in list (self .expected_responses .keys ()):
340+ if self .expected_responses [seq_id ][3 ] is not None :
341+ if self .expected_responses [seq_id ][3 ] < (
342+ datetime .now () - timedelta (seconds = MESSAGE_TIME_OUT )
343+ ):
344+ _mac = "<unknown>"
345+ if self .expected_responses [seq_id ][0 ].mac :
346+ _mac = self .expected_responses [seq_id ][0 ].mac
347+ _LOGGER .info (
348+ "No response within %s seconds timeout for %s to %s with sequence ID %s" ,
349+ str (MESSAGE_TIME_OUT ),
350+ self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
351+ _mac ,
352+ str (seq_id ),
340353 )
341- _LOGGER .info (
342- "No response within %s seconds timeout for %s to %s with sequence ID %s" ,
343- str (MESSAGE_TIME_OUT ),
344- self .expected_responses [seq_id ][0 ].__class__ .__name__ ,
345- _mac ,
346- str (seq_id ),
347- )
348- self .resend (seq_id )
354+ resend_list .append (seq_id )
355+ for seq_id in resend_list :
356+ self .resend (seq_id )
349357 receive_timeout_checker = 0
350358 while (
351359 receive_timeout_checker < MESSAGE_TIME_OUT
@@ -366,25 +374,26 @@ def _log_status_message(self, message, status=None):
366374 str (message .seq_id ),
367375 )
368376 else :
369- if self .expected_responses .get (message .seq_id ):
370- _LOGGER .warning (
371- "Received unmanaged (%s) %s in response to %s with seq_id %s" ,
372- str (status ),
373- message .__class__ .__name__ ,
374- str (
375- self .expected_responses [message .seq_id ][
376- 1
377- ].__class__ .__name__
378- ),
379- str (message .seq_id ),
380- )
381- else :
382- _LOGGER .warning (
383- "Received unmanaged (%s) %s for unknown request with seq_id %s" ,
384- str (status ),
385- message .__class__ .__name__ ,
386- str (message .seq_id ),
387- )
377+ with self .lock_expected_responses :
378+ if self .expected_responses .get (message .seq_id ):
379+ _LOGGER .warning (
380+ "Received unmanaged (%s) %s in response to %s with seq_id %s" ,
381+ str (status ),
382+ message .__class__ .__name__ ,
383+ str (
384+ self .expected_responses [message .seq_id ][
385+ 1
386+ ].__class__ .__name__
387+ ),
388+ str (message .seq_id ),
389+ )
390+ else :
391+ _LOGGER .warning (
392+ "Received unmanaged (%s) %s for unknown request with seq_id %s" ,
393+ str (status ),
394+ message .__class__ .__name__ ,
395+ str (message .seq_id ),
396+ )
388397 else :
389398 _LOGGER .info (
390399 "Received %s from %s with sequence id %s" ,
0 commit comments