Skip to content

Commit ad32c96

Browse files
committed
Create data receiver queue worker
to do extracting in separate task
1 parent 22e6933 commit ad32c96

File tree

1 file changed

+144
-77
lines changed

1 file changed

+144
-77
lines changed

plugwise_usb/connection/receiver.py

Lines changed: 144 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
from asyncio import (
2121
Future,
22+
Lock,
2223
PriorityQueue,
2324
Protocol,
25+
Queue,
2426
Task,
25-
TimerHandle,
2627
gather,
2728
get_running_loop,
29+
sleep,
2830
)
2931
from collections.abc import Awaitable, Callable, Coroutine
3032
from dataclasses import dataclass
@@ -64,7 +66,7 @@ class StickResponseSubscription:
6466

6567
callback_fn: Callable[[StickResponse], Coroutine[Any, Any, None]]
6668
seq_id: bytes | None
67-
stick_response_type: StickResponseType | None
69+
stick_response_type: tuple[StickResponseType, ...] | None
6870

6971

7072
@dataclass
@@ -89,14 +91,23 @@ def __init__(
8991
self._loop = get_running_loop()
9092
self._connected_future = connected_future
9193
self._transport: SerialTransport | None = None
92-
self._buffer: bytes = bytes([])
9394
self._connection_state = False
94-
self._receive_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
95+
96+
# Data processing
97+
self._buffer: bytes = bytes([])
98+
self._data_queue: Queue[bytes] = Queue()
99+
self._data_worker_task: Task[None] | None = None
100+
101+
# Message processing
102+
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
95103
self._last_processed_messages: list[bytes] = []
96104
self._responses: dict[bytes, Callable[[PlugwiseResponse], None]] = {}
97-
self._receive_worker_task: Task[None] | None = None
98-
self._delayed_processing_tasks: dict[bytes, TimerHandle] = {}
105+
self._message_worker_task: Task[None] | None = None
106+
self._delayed_processing_tasks: dict[bytes, Task[None]] = {}
107+
99108
# Subscribers
109+
self._stick_subscription_lock = Lock()
110+
self._node_subscription_lock = Lock()
100111
self._stick_event_subscribers: dict[
101112
Callable[[], None], StickEventSubscription
102113
] = {}
@@ -110,7 +121,7 @@ def __init__(
110121

111122
def connection_lost(self, exc: Exception | None = None) -> None:
112123
"""Call when port was closed expectedly or unexpectedly."""
113-
_LOGGER.info("Connection lost")
124+
_LOGGER.warning("Connection lost")
114125
if exc is not None:
115126
_LOGGER.warning("Connection to Plugwise USB-stick lost %s", exc)
116127
self._loop.create_task(self.close())
@@ -150,14 +161,19 @@ async def _stop_running_tasks(self) -> None:
150161
for task in self._delayed_processing_tasks.values():
151162
task.cancel()
152163
if (
153-
self._receive_worker_task is not None
154-
and not self._receive_worker_task.done()
164+
self._message_worker_task is not None
165+
and not self._message_worker_task.done()
155166
):
156167
cancel_response = StickResponse()
157168
cancel_response.priority = Priority.CANCEL
158-
await self._receive_queue.put(cancel_response)
159-
await self._receive_worker_task
160-
self._receive_worker_task = None
169+
await self._message_queue.put(cancel_response)
170+
await self._message_worker_task
171+
self._message_worker_task = None
172+
if self._data_worker_task is not None and not self._data_worker_task.done():
173+
await self._data_queue.put(b"FFFFFFFF")
174+
await self._data_worker_task
175+
176+
# region Process incoming data
161177

162178
def data_received(self, data: bytes) -> None:
163179
"""Receive data from USB-Stick connection.
@@ -167,74 +183,103 @@ def data_received(self, data: bytes) -> None:
167183
_LOGGER.debug("Received data from USB-Stick: %s", data)
168184
self._buffer += data
169185
if MESSAGE_FOOTER in self._buffer:
170-
msgs = self._buffer.split(MESSAGE_FOOTER)
171-
for msg in msgs[:-1]:
172-
if (response := self.extract_message_from_line_buffer(msg)) is not None:
173-
self._put_message_in_receiver_queue(response)
174-
if len(msgs) > 4:
175-
_LOGGER.debug("Reading %d messages at once from USB-Stick", len(msgs))
176-
self._buffer = msgs[-1] # whatever was left over
177-
if self._buffer == b"\x83":
178-
self._buffer = b""
179-
180-
def _put_message_in_receiver_queue(self, response: PlugwiseResponse) -> None:
181-
"""Put message in queue."""
182-
_LOGGER.debug("Add response to queue: %s", response)
183-
self._receive_queue.put_nowait(response)
184-
if self._receive_worker_task is None or self._receive_worker_task.done():
185-
self._receive_worker_task = self._loop.create_task(
186-
self._receive_queue_worker(), name="Receive queue worker"
186+
data_of_messages = self._buffer.split(MESSAGE_FOOTER)
187+
for msg_data in data_of_messages[:-1]:
188+
# Ignore ASCII messages without a header and footer like:
189+
# # SENDING PING UNICAST: Macid: ????????????????
190+
# # HANDLE: 0x??
191+
# # APSRequestNodeInfo
192+
if (header_index := msg_data.find(MESSAGE_HEADER)) != -1:
193+
data = msg_data[header_index:]
194+
self._put_data_in_queue(data)
195+
if len(data_of_messages) > 4:
196+
_LOGGER.debug(
197+
"Reading %d messages at once from USB-Stick", len(data_of_messages)
198+
)
199+
self._buffer = data_of_messages[-1] # whatever was left over
200+
201+
def _put_data_in_queue(self, data: bytes) -> None:
202+
"""Put raw message data in queue to be converted to messages."""
203+
self._data_queue.put_nowait(data)
204+
if self._data_worker_task is None or self._data_worker_task.done():
205+
self._data_worker_task = self._loop.create_task(
206+
self._data_queue_worker(), name="Plugwise data receiver queue worker"
187207
)
188208

189-
def extract_message_from_line_buffer(self, msg: bytes) -> PlugwiseResponse | None:
209+
async def _data_queue_worker(self) -> None:
210+
"""Convert collected data into messages and place then im message queue."""
211+
_LOGGER.debug("Data queue worker started")
212+
while self.is_connected:
213+
if (data := await self._data_queue.get()) != b"FFFFFFFF":
214+
if (response := self.extract_message_from_data(data)) is not None:
215+
await self._put_message_in_queue(response)
216+
self._data_queue.task_done()
217+
else:
218+
self._data_queue.task_done()
219+
return
220+
await sleep(0)
221+
_LOGGER.debug("Data queue worker stopped")
222+
223+
def extract_message_from_data(self, msg_data: bytes) -> PlugwiseResponse | None:
190224
"""Extract message from buffer."""
191-
# Lookup header of message, there are stray \x83
192-
if (_header_index := msg.find(MESSAGE_HEADER)) == -1:
193-
return None
194-
_LOGGER.debug("Extract message from data: %s", msg)
195-
msg = msg[_header_index:]
196-
# Detect response message type
197-
identifier = msg[4:8]
198-
seq_id = msg[8:12]
199-
msg_length = len(msg)
200-
if (response := get_message_object(identifier, msg_length, seq_id)) is None:
201-
_raw_msg_data = msg[2:][: msg_length - 4]
202-
_LOGGER.warning("Drop unknown message type %s", str(_raw_msg_data))
225+
identifier = msg_data[4:8]
226+
seq_id = msg_data[8:12]
227+
msg_data_length = len(msg_data)
228+
if (
229+
response := get_message_object(identifier, msg_data_length, seq_id)
230+
) is None:
231+
_raw_msg_data_data = msg_data[2:][: msg_data_length - 4]
232+
_LOGGER.warning("Drop unknown message type %s", str(_raw_msg_data_data))
203233
return None
204234

205235
# Populate response message object with data
206236
try:
207-
response.deserialize(msg, has_footer=False)
237+
response.deserialize(msg_data, has_footer=False)
208238
except MessageError as err:
209239
_LOGGER.warning(err)
210240
return None
211241

212-
_LOGGER.debug("Data %s converted into %s", msg, response)
242+
_LOGGER.debug("Data %s converted into %s", msg_data, response)
213243
return response
214244

215-
async def _receive_queue_worker(self) -> None:
216-
"""Process queue items."""
217-
_LOGGER.debug("Receive_queue_worker started")
245+
# endregion
246+
247+
# region Process incoming messages
248+
249+
async def _put_message_in_queue(
250+
self, response: PlugwiseResponse, delay: float = 0.0
251+
) -> None:
252+
"""Put message in queue to be processed."""
253+
if delay > 0:
254+
await sleep(delay)
255+
_LOGGER.debug("Add response to queue: %s", response)
256+
await self._message_queue.put(response)
257+
if self._message_worker_task is None or self._message_worker_task.done():
258+
self._message_worker_task = self._loop.create_task(
259+
self._message_queue_worker(),
260+
name="Plugwise message receiver queue worker",
261+
)
262+
263+
async def _message_queue_worker(self) -> None:
264+
"""Process messages in receiver queue."""
265+
_LOGGER.debug("Message queue worker started")
218266
while self.is_connected:
219-
response: PlugwiseResponse = await self._receive_queue.get()
267+
response: PlugwiseResponse = await self._message_queue.get()
220268
if response.priority == Priority.CANCEL:
221-
self._receive_queue.task_done()
269+
self._message_queue.task_done()
222270
return
223-
_LOGGER.debug("Process from receive queue: %s", response)
271+
_LOGGER.debug("Message queue worker queue: %s", response)
224272
if isinstance(response, StickResponse):
225273
await self._notify_stick_response_subscribers(response)
226274
else:
227275
await self._notify_node_response_subscribers(response)
228-
self._receive_queue.task_done()
229-
_LOGGER.debug("Receive_queue_worker stopped")
276+
self._message_queue.task_done()
277+
await sleep(0.001)
278+
_LOGGER.debug("Message queue worker stopped")
230279

231-
def _reset_buffer(self, new_buffer: bytes) -> None:
232-
if new_buffer[:2] == MESSAGE_FOOTER:
233-
new_buffer = new_buffer[2:]
234-
if new_buffer == b"\x83":
235-
# Skip additional byte sometimes appended after footer
236-
new_buffer = bytes([])
237-
self._buffer = new_buffer
280+
# endregion
281+
282+
# region Stick
238283

239284
def subscribe_to_stick_events(
240285
self,
@@ -267,13 +312,14 @@ async def _notify_stick_event_subscribers(
267312
if len(callback_list) > 0:
268313
await gather(*callback_list)
269314

270-
def subscribe_to_stick_responses(
315+
async def subscribe_to_stick_responses(
271316
self,
272317
callback: Callable[[StickResponse], Coroutine[Any, Any, None]],
273318
seq_id: bytes | None = None,
274-
response_type: StickResponseType | None = None,
319+
response_type: tuple[StickResponseType, ...] | None = None,
275320
) -> Callable[[], None]:
276321
"""Subscribe to response messages from stick."""
322+
await self._stick_subscription_lock.acquire()
277323

278324
def remove_subscription() -> None:
279325
"""Remove update listener."""
@@ -282,6 +328,7 @@ def remove_subscription() -> None:
282328
self._stick_response_subscribers[remove_subscription] = (
283329
StickResponseSubscription(callback, seq_id, response_type)
284330
)
331+
self._stick_subscription_lock.release()
285332
return remove_subscription
286333

287334
async def _notify_stick_response_subscribers(
@@ -296,13 +343,16 @@ async def _notify_stick_response_subscribers(
296343
continue
297344
if (
298345
subscription.stick_response_type is not None
299-
and subscription.stick_response_type != stick_response.response_type
346+
and stick_response.response_type not in subscription.stick_response_type
300347
):
301348
continue
302349
_LOGGER.debug("Notify stick response subscriber for %s", stick_response)
303350
await subscription.callback_fn(stick_response)
304351

305-
def subscribe_to_node_responses(
352+
# endregion
353+
# region node
354+
355+
async def subscribe_to_node_responses(
306356
self,
307357
node_response_callback: Callable[[PlugwiseResponse], Coroutine[Any, Any, bool]],
308358
mac: bytes | None = None,
@@ -313,13 +363,27 @@ def subscribe_to_node_responses(
313363
314364
Returns function to unsubscribe.
315365
"""
366+
await self._node_subscription_lock.acquire()
316367

317368
def remove_listener() -> None:
318369
"""Remove update listener."""
370+
_LOGGER.debug(
371+
"Node response subscriber removed: mac=%s, msg_idS=%s, seq_id=%s",
372+
mac,
373+
message_ids,
374+
seq_id,
375+
)
319376
self._node_response_subscribers.pop(remove_listener)
320377

321378
self._node_response_subscribers[remove_listener] = NodeResponseSubscription(
322-
node_response_callback,
379+
callback_fn=node_response_callback,
380+
mac=mac,
381+
response_ids=message_ids,
382+
seq_id=seq_id,
383+
)
384+
self._node_subscription_lock.release()
385+
_LOGGER.debug(
386+
"Node response subscriber added: mac=%s, msg_idS=%s, seq_id=%s",
323387
mac,
324388
message_ids,
325389
seq_id,
@@ -338,6 +402,8 @@ async def _notify_node_response_subscribers(
338402
_LOGGER.debug("Drop previously processed duplicate %s", node_response)
339403
return
340404

405+
await self._node_subscription_lock.acquire()
406+
341407
notify_tasks: list[Coroutine[Any, Any, bool]] = []
342408
for node_subscription in self._node_response_subscribers.values():
343409
if (
@@ -357,17 +423,20 @@ async def _notify_node_response_subscribers(
357423
continue
358424
notify_tasks.append(node_subscription.callback_fn(node_response))
359425

426+
self._node_subscription_lock.release()
360427
if len(notify_tasks) > 0:
361-
_LOGGER.info("Received %s", node_response)
428+
_LOGGER.debug("Received %s", node_response)
362429
if node_response.seq_id not in BROADCAST_IDS:
363430
self._last_processed_messages.append(node_response.seq_id)
364-
if node_response.seq_id in self._delayed_processing_tasks:
365-
del self._delayed_processing_tasks[node_response.seq_id]
366431
# Limit tracking to only the last appended request (FIFO)
367432
self._last_processed_messages = self._last_processed_messages[
368433
-CACHED_REQUESTS:
369434
]
370435

436+
# Cleanup pending task
437+
if node_response.seq_id in self._delayed_processing_tasks:
438+
del self._delayed_processing_tasks[node_response.seq_id]
439+
371440
# execute callbacks
372441
_LOGGER.debug(
373442
"Notify node response subscribers (%s) about %s",
@@ -388,17 +457,15 @@ async def _notify_node_response_subscribers(
388457

389458
if node_response.retries > 10:
390459
_LOGGER.warning(
391-
"No subscriber to handle %s, seq_id=%s from %s after 10 retries",
392-
node_response.__class__.__name__,
393-
node_response.seq_id,
394-
node_response.mac_decoded,
460+
"No subscriber to handle %s after 10 retries",
461+
node_response,
395462
)
396463
return
397464
node_response.retries += 1
398-
if node_response.retries > 2:
399-
_LOGGER.info("No subscription for %s, retry later", node_response)
400-
self._delayed_processing_tasks[node_response.seq_id] = self._loop.call_later(
401-
0.1 * node_response.retries,
402-
self._put_message_in_receiver_queue,
403-
node_response,
465+
self._delayed_processing_tasks[node_response.seq_id] = self._loop.create_task(
466+
self._put_message_in_queue(node_response, 0.1 * node_response.retries),
467+
name=f"Postpone subscription task for {node_response.seq_id!r} retry {node_response.retries}",
404468
)
469+
470+
471+
# endregion

0 commit comments

Comments
 (0)