Skip to content

Commit 0053fc1

Browse files
committed
Update queue.py
1 parent 8d52da0 commit 0053fc1

File tree

1 file changed

+25
-32
lines changed

1 file changed

+25
-32
lines changed

plugwise_usb/connection/queue.py

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Manage the communication sessions towards the USB-Stick."""
2+
23
from __future__ import annotations
34

45
from asyncio import PriorityQueue, Task, get_running_loop, sleep
@@ -7,7 +8,7 @@
78
import logging
89

910
from ..api import StickEvent
10-
from ..exceptions import NodeTimeout, StickError, StickTimeout
11+
from ..exceptions import MessageError, NodeTimeout, StickError, StickTimeout
1112
from ..messages import Priority
1213
from ..messages.requests import NodePingRequest, PlugwiseCancelRequest, PlugwiseRequest
1314
from ..messages.responses import PlugwiseResponse
@@ -41,21 +42,15 @@ def is_running(self) -> bool:
4142
"""Return the state of the queue."""
4243
return self._running
4344

44-
def start(
45-
self,
46-
stick_connection_manager: StickConnectionManager
47-
) -> None:
45+
def start(self, stick_connection_manager: StickConnectionManager) -> None:
4846
"""Start sending request from queue."""
4947
if self._running:
5048
raise StickError("Cannot start queue manager, already running")
5149
self._stick = stick_connection_manager
5250
if self._stick.is_connected:
5351
self._running = True
54-
self._unsubscribe_connection_events = (
55-
self._stick.subscribe_to_stick_events(
56-
self._handle_stick_event,
57-
(StickEvent.CONNECTED, StickEvent.DISCONNECTED)
58-
)
52+
self._unsubscribe_connection_events = self._stick.subscribe_to_stick_events(
53+
self._handle_stick_event, (StickEvent.CONNECTED, StickEvent.DISCONNECTED)
5954
)
6055

6156
async def _handle_stick_event(self, event: StickEvent) -> None:
@@ -79,56 +74,54 @@ async def stop(self) -> None:
7974
self._stick = None
8075
_LOGGER.debug("queue stopped")
8176

82-
async def submit(
83-
self, request: PlugwiseRequest
84-
) -> PlugwiseResponse:
77+
async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
8578
"""Add request to queue and return the response of node. Raises an error when something fails."""
86-
_LOGGER.debug("Submit %s", request)
87-
while request.resend:
79+
if request.waiting_for_response:
80+
raise MessageError(
81+
f"Cannot send message {request} which is currently waiting for response."
82+
)
83+
84+
while request.resend and not request.waiting_for_response:
85+
_LOGGER.warning("submit | start (%s) %s", request.retries_left, request)
8886
if not self._running or self._stick is None:
8987
raise StickError(
90-
f"Cannot send message {request.__class__.__name__} for" +
91-
f"{request.mac_decoded} because queue manager is stopped"
88+
f"Cannot send message {request.__class__.__name__} for"
89+
+ f"{request.mac_decoded} because queue manager is stopped"
9290
)
9391
await self._add_request_to_queue(request)
9492
try:
9593
response: PlugwiseResponse = await request.response_future()
9694
except (NodeTimeout, StickTimeout) as e:
9795
if isinstance(request, NodePingRequest):
9896
# For ping requests it is expected to receive timeouts, so lower log level
99-
_LOGGER.debug("%s, cancel because timeout is expected for NodePingRequests", e)
97+
_LOGGER.debug(
98+
"%s, cancel because timeout is expected for NodePingRequests", e
99+
)
100100
elif request.resend:
101-
_LOGGER.info("%s, retrying", e)
101+
_LOGGER.debug("%s, retrying", e)
102102
else:
103103
_LOGGER.warning("%s, cancel request", e) # type: ignore[unreachable]
104104
except StickError as exception:
105105
_LOGGER.error(exception)
106106
raise StickError(
107-
f"No response received for {request.__class__.__name__} " +
108-
f"to {request.mac_decoded}"
107+
f"No response received for {request.__class__.__name__} "
108+
+ f"to {request.mac_decoded}"
109109
) from exception
110110
except BaseException as exception:
111111
raise StickError(
112-
f"No response received for {request.__class__.__name__} " +
113-
f"to {request.mac_decoded}"
112+
f"No response received for {request.__class__.__name__} "
113+
+ f"to {request.mac_decoded}"
114114
) from exception
115-
else:
116-
return response
117115

118-
raise StickError(
119-
f"Failed to send {request.__class__.__name__} " +
120-
f"to node {request.mac_decoded}, maximum number " +
121-
f"of retries ({request.max_retries}) has been reached"
122-
)
116+
return response
123117

124118
async def _add_request_to_queue(self, request: PlugwiseRequest) -> None:
125119
"""Add request to send queue."""
126120
_LOGGER.debug("Add request to queue: %s", request)
127121
await self._submit_queue.put(request)
128122
if self._submit_worker_task is None or self._submit_worker_task.done():
129123
self._submit_worker_task = self._loop.create_task(
130-
self._send_queue_worker(),
131-
name="Send queue worker"
124+
self._send_queue_worker(), name="Send queue worker"
132125
)
133126

134127
async def _send_queue_worker(self) -> None:

0 commit comments

Comments
 (0)