Skip to content

Commit 0711098

Browse files
committed
Update stick subscription
1 parent 04bfdba commit 0711098

File tree

1 file changed

+47
-13
lines changed

1 file changed

+47
-13
lines changed

plugwise_usb/connection/receiver.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from __future__ import annotations
1919

2020
from asyncio import (
21+
ensure_future,
2122
Future,
2223
Lock,
2324
PriorityQueue,
@@ -101,17 +102,22 @@ def __init__(
101102
# Message processing
102103
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
103104
self._last_processed_messages: list[bytes] = []
105+
self._current_seq_id: bytes | None = None
104106
self._responses: dict[bytes, Callable[[PlugwiseResponse], None]] = {}
105107
self._message_worker_task: Task[None] | None = None
106108
self._delayed_processing_tasks: dict[bytes, Task[None]] = {}
107109

108110
# Subscribers
109111
self._stick_subscription_lock = Lock()
110112
self._node_subscription_lock = Lock()
113+
111114
self._stick_event_subscribers: dict[
112115
Callable[[], None], StickEventSubscription
113116
] = {}
114-
self._stick_response_subscribers: dict[
117+
self._stick_subscribers_for_requests: dict[
118+
Callable[[], None], StickResponseSubscription
119+
] = {}
120+
self._stick_subscribers_for_responses: dict[
115121
Callable[[], None], StickResponseSubscription
116122
] = {}
117123

@@ -250,7 +256,7 @@ async def _put_message_in_queue(
250256
self, response: PlugwiseResponse, delay: float = 0.0
251257
) -> None:
252258
"""Put message in queue to be processed."""
253-
if delay > 0:
259+
if delay > 0.0:
254260
await sleep(delay)
255261
_LOGGER.debug("Add response to queue: %s", response)
256262
await self._message_queue.put(response)
@@ -270,11 +276,11 @@ async def _message_queue_worker(self) -> None:
270276
return
271277
_LOGGER.debug("Message queue worker queue: %s", response)
272278
if isinstance(response, StickResponse):
273-
await self._notify_stick_response_subscribers(response)
279+
await self._notify_stick_subscribers(response)
274280
else:
275281
await self._notify_node_response_subscribers(response)
276282
self._message_queue.task_done()
277-
await sleep(0.001)
283+
await sleep(0)
278284
_LOGGER.debug("Message queue worker stopped")
279285

280286
# endregion
@@ -319,23 +325,49 @@ async def subscribe_to_stick_responses(
319325
response_type: tuple[StickResponseType, ...] | None = None,
320326
) -> Callable[[], None]:
321327
"""Subscribe to response messages from stick."""
322-
await self._stick_subscription_lock.acquire()
323328

324-
def remove_subscription() -> None:
329+
def remove_subscription_for_requests() -> None:
330+
"""Remove update listener."""
331+
self._stick_subscribers_for_requests.pop(remove_subscription_for_requests)
332+
333+
def remove_subscription_for_responses() -> None:
325334
"""Remove update listener."""
326-
self._stick_response_subscribers.pop(remove_subscription)
335+
self._stick_subscribers_for_responses.pop(remove_subscription_for_responses)
327336

328-
self._stick_response_subscribers[remove_subscription] = (
337+
if seq_id is None:
338+
await self._stick_subscription_lock.acquire()
339+
self._stick_subscribers_for_requests[remove_subscription_for_requests] = (
340+
StickResponseSubscription(callback, seq_id, response_type)
341+
)
342+
self._stick_subscription_lock.release()
343+
return remove_subscription_for_requests
344+
345+
self._stick_subscribers_for_responses[remove_subscription_for_responses] = (
329346
StickResponseSubscription(callback, seq_id, response_type)
330347
)
331-
self._stick_subscription_lock.release()
332-
return remove_subscription
348+
return remove_subscription_for_responses
333349

334-
async def _notify_stick_response_subscribers(
350+
async def _notify_stick_subscribers(
335351
self, stick_response: StickResponse
336352
) -> None:
337353
"""Call callback for all stick response message subscribers."""
338-
for subscription in list(self._stick_response_subscribers.values()):
354+
await self._stick_subscription_lock.acquire()
355+
for subscription in self._stick_subscribers_for_requests.values():
356+
if (
357+
subscription.seq_id is not None
358+
and subscription.seq_id != stick_response.seq_id
359+
):
360+
continue
361+
if (
362+
subscription.stick_response_type is not None
363+
and stick_response.response_type not in subscription.stick_response_type
364+
):
365+
continue
366+
_LOGGER.debug("Notify stick request subscriber for %s", stick_response)
367+
await subscription.callback_fn(stick_response)
368+
self._stick_subscription_lock.release()
369+
370+
for subscription in list(self._stick_subscribers_for_responses.values()):
339371
if (
340372
subscription.seq_id is not None
341373
and subscription.seq_id != stick_response.seq_id
@@ -348,6 +380,9 @@ async def _notify_stick_response_subscribers(
348380
continue
349381
_LOGGER.debug("Notify stick response subscriber for %s", stick_response)
350382
await subscription.callback_fn(stick_response)
383+
_LOGGER.debug("Finished Notify stick response subscriber for %s", stick_response)
384+
385+
351386

352387
# endregion
353388
# region node
@@ -467,5 +502,4 @@ async def _notify_node_response_subscribers(
467502
name=f"Postpone subscription task for {node_response.seq_id!r} retry {node_response.retries}",
468503
)
469504

470-
471505
# endregion

0 commit comments

Comments
 (0)