Skip to content

Commit d450f03

Browse files
committed
Correct message overload on intitial circle log fetch.
Limit outstanding messages to 4
1 parent 0179fad commit d450f03

File tree

5 files changed

+44
-15
lines changed

5 files changed

+44
-15
lines changed

plugwise_usb/connection/manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ def __init__(self) -> None:
3636
] = {}
3737
self._unsubscribe_stick_events: Callable[[], None] | None = None
3838

39+
@property
40+
def queue_depth(self) -> int:
41+
return self._sender.processed_messages - self._receiver.processed_messages
42+
43+
def correct_received_messages(self, correction: int) -> None:
44+
self._receiver.correct_processed_messages(correction)
45+
3946
@property
4047
def serial_path(self) -> str:
4148
"""Return current port."""

plugwise_usb/connection/queue.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
8282
)
8383

8484
while request.resend and not request.waiting_for_response:
85-
_LOGGER.warning("submit | start (%s) %s", request.retries_left, request)
85+
_LOGGER.debug("submit | start (%s) %s", request.retries_left, request)
8686
if not self._running or self._stick is None:
8787
raise StickError(
8888
f"Cannot send message {request.__class__.__name__} for"
@@ -91,6 +91,7 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
9191
await self._add_request_to_queue(request)
9292
try:
9393
response: PlugwiseResponse = await request.response_future()
94+
return response
9495
except (NodeTimeout, StickTimeout) as e:
9596
if isinstance(request, NodePingRequest):
9697
# For ping requests it is expected to receive timeouts, so lower log level
@@ -103,17 +104,19 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
103104
_LOGGER.warning("%s, cancel request", e) # type: ignore[unreachable]
104105
except StickError as exception:
105106
_LOGGER.error(exception)
107+
self._stick.correct_received_messages(1)
106108
raise StickError(
107109
f"No response received for {request.__class__.__name__} "
108110
+ f"to {request.mac_decoded}"
109111
) from exception
110112
except BaseException as exception:
113+
self._stick.correct_received_messages(1)
111114
raise StickError(
112115
f"No response received for {request.__class__.__name__} "
113116
+ f"to {request.mac_decoded}"
114117
) from exception
115118

116-
return response
119+
return None
117120

118121
async def _add_request_to_queue(self, request: PlugwiseRequest) -> None:
119122
"""Add request to send queue."""
@@ -133,8 +136,13 @@ async def _send_queue_worker(self) -> None:
133136
if request.priority == Priority.CANCEL:
134137
self._submit_queue.task_done()
135138
return
139+
140+
while self._stick.queue_depth > 3:
141+
_LOGGER.info("Awaiting plugwise responses %d", self._stick.queue_depth)
142+
await sleep(0.125)
143+
136144
await self._stick.write_to_stick(request)
137145
self._submit_queue.task_done()
138-
await sleep(0.001)
146+
139147
_LOGGER.debug("Sent from queue %s", request)
140148
_LOGGER.debug("Send_queue_worker stopped")

plugwise_usb/connection/receiver.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def __init__(
9999
self._data_worker_task: Task[None] | None = None
100100

101101
# Message processing
102+
self._processed_msgs = 0
102103
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
103104
self._last_processed_messages: list[bytes] = []
104105
self._current_seq_id: bytes | None = None
@@ -137,11 +138,20 @@ def connection_lost(self, exc: Exception | None = None) -> None:
137138
self._transport = None
138139
self._connection_state = False
139140

141+
@property
142+
def processed_messages(self) -> int:
143+
"""Return the number of processed messages."""
144+
return self._processed_msgs
145+
140146
@property
141147
def is_connected(self) -> bool:
142148
"""Return current connection state of the USB-Stick."""
143149
return self._connection_state
144150

151+
def correct_processed_messages(self, correction: int) -> None:
152+
"""Return the number of processed messages."""
153+
self._processed_msgs += correction
154+
145155
def connection_made(self, transport: SerialTransport) -> None:
146156
"""Call when the serial connection to USB-Stick is established."""
147157
_LOGGER.info("Connection made")
@@ -278,6 +288,7 @@ async def _message_queue_worker(self) -> None:
278288
await self._notify_stick_subscribers(response)
279289
else:
280290
await self._notify_node_response_subscribers(response)
291+
self._processed_msgs += 1
281292
self._message_queue.task_done()
282293
await sleep(0)
283294
_LOGGER.debug("Message queue worker stopped")
@@ -457,7 +468,7 @@ async def _notify_node_response_subscribers(
457468

458469
self._node_subscription_lock.release()
459470
if len(notify_tasks) > 0:
460-
_LOGGER.debug("Received %s", node_response)
471+
_LOGGER.debug("Received %s %s", node_response, node_response.seq_id)
461472
if node_response.seq_id not in BROADCAST_IDS:
462473
self._last_processed_messages.append(node_response.seq_id)
463474
# Limit tracking to only the last appended request (FIFO)

plugwise_usb/connection/sender.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,17 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None:
3838
self._loop = get_running_loop()
3939
self._receiver = stick_receiver
4040
self._transport = transport
41+
self._processed_msgs = 0
4142
self._stick_response: Future[StickResponse] | None = None
4243
self._stick_lock = Lock()
4344
self._current_request: None | PlugwiseRequest = None
4445
self._unsubscribe_stick_response: Callable[[], None] | None = None
4546

47+
@property
48+
def processed_messages(self) -> int:
49+
"""Return the number of processed messages."""
50+
return self._processed_msgs
51+
4652
async def start(self) -> None:
4753
"""Start the sender."""
4854
# Subscribe to ACCEPT stick responses, which contain the seq_id we need.
@@ -133,6 +139,7 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None:
133139
finally:
134140
self._stick_response.cancel()
135141
self._stick_lock.release()
142+
self._processed_msgs += 1
136143

137144
async def _process_stick_response(self, response: StickResponse) -> None:
138145
"""Process stick response."""

plugwise_usb/nodes/circle.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from asyncio import Task, create_task, gather
5+
from asyncio import Task, create_task
66
from collections.abc import Awaitable, Callable
77
from dataclasses import replace
88
from datetime import UTC, datetime
@@ -453,11 +453,8 @@ async def get_missing_energy_logs(self) -> None:
453453
log_address, _ = calc_log_address(log_address, 1, -4)
454454
total_addresses -= 1
455455

456-
if not all(await gather(*log_update_tasks)):
457-
_LOGGER.info(
458-
"Failed to request one or more update energy log for %s",
459-
self._mac_in_str,
460-
)
456+
for task in log_update_tasks:
457+
await task
461458

462459
if self._cache_enabled:
463460
await self._energy_log_records_save_to_cache()
@@ -475,9 +472,8 @@ async def get_missing_energy_logs(self) -> None:
475472
)
476473

477474
missing_addresses = sorted(missing_addresses, reverse=True)
478-
await gather(
479-
*[self.energy_log_update(address) for address in missing_addresses]
480-
)
475+
for address in missing_addresses:
476+
await self.energy_log_update(address)
481477

482478
if self._cache_enabled:
483479
await self._energy_log_records_save_to_cache()
@@ -528,7 +524,7 @@ async def _energy_log_records_load_from_cache(self) -> bool:
528524
"""Load energy_log_record from cache."""
529525
cache_data = self._get_cache(CACHE_ENERGY_COLLECTION)
530526
if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None:
531-
_LOGGER.debug(
527+
_LOGGER.warning(
532528
"Failed to restore energy log records from cache for node %s", self.name
533529
)
534530
return False
@@ -811,7 +807,7 @@ async def _load_from_cache(self) -> bool:
811807
return False
812808
# Energy collection
813809
if await self._energy_log_records_load_from_cache():
814-
_LOGGER.debug(
810+
_LOGGER.warning(
815811
"Node %s failed to load energy_log_records from cache",
816812
self._mac_in_str,
817813
)

0 commit comments

Comments
 (0)