|
19 | 19 | from asyncio import ( |
20 | 20 | Future, |
21 | 21 | Protocol, |
22 | | - Queue, |
| 22 | + PriorityQueue, |
23 | 23 | Task, |
24 | 24 | TimerHandle, |
25 | 25 | gather, |
|
36 | 36 | from ..api import StickEvent |
37 | 37 | from ..constants import MESSAGE_FOOTER, MESSAGE_HEADER |
38 | 38 | from ..exceptions import MessageError |
| 39 | +from ..messages import Priority |
39 | 40 | from ..messages.responses import ( |
40 | 41 | BROADCAST_IDS, |
41 | 42 | PlugwiseResponse, |
@@ -73,7 +74,7 @@ def __init__( |
73 | 74 | self._buffer: bytes = bytes([]) |
74 | 75 | self._connection_state = False |
75 | 76 | self._reduce_logging = True |
76 | | - self._receive_queue: Queue[PlugwiseResponse | None] = Queue() |
| 77 | + self._receive_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue() |
77 | 78 | self._last_processed_messages: list[bytes] = [] |
78 | 79 | self._stick_future: futures.Future | None = None |
79 | 80 | self._responses: dict[bytes, Callable[[PlugwiseResponse], None]] = {} |
@@ -158,10 +159,10 @@ async def _stop_running_tasks(self) -> None: |
158 | 159 | """Cancel and stop any running task.""" |
159 | 160 | for task in self._delayed_processing_tasks.values(): |
160 | 161 | task.cancel() |
161 | | - await self._receive_queue.put(None) |
162 | | - if self._msg_processing_task is not None and not self._msg_processing_task.done(): |
163 | | - self._receive_queue.put_nowait(None) |
164 | | - await self._msg_processing_task |
| 162 | + cancel_response = StickResponse() |
| 163 | + cancel_response.priority = Priority.CANCEL |
| 164 | + await self._receive_queue.put(cancel_response) |
| 165 | + await self._receive_worker_task |
165 | 166 |
|
166 | 167 | def data_received(self, data: bytes) -> None: |
167 | 168 | """Receive data from USB-Stick connection. |
@@ -221,14 +222,14 @@ async def _receive_queue_worker(self): |
221 | 222 | """Process queue items.""" |
222 | 223 | _LOGGER.debug("Receive_queue_worker started") |
223 | 224 | while self.is_connected: |
224 | | - response: PlugwiseResponse | None = await self._receive_queue.get() |
| 225 | + response: PlugwiseResponse = await self._receive_queue.get() |
| 226 | + if response.priority == Priority.CANCEL: |
| 227 | + self._receive_queue.task_done() |
| 228 | + return |
225 | 229 | _LOGGER.debug("Process from receive queue: %s", response) |
226 | 230 | if isinstance(response, StickResponse): |
227 | 231 | _LOGGER.debug("Received %s", response) |
228 | 232 | await self._notify_stick_response_subscribers(response) |
229 | | - elif response is None: |
230 | | - self._receive_queue.task_done() |
231 | | - return |
232 | 233 | else: |
233 | 234 | await self._notify_node_response_subscribers(response) |
234 | 235 | self._receive_queue.task_done() |
|
0 commit comments