Skip to content

Commit 31805be

Browse files
committed
Fixed that dispatches are never retried, but forever logged.
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 7510149 commit 31805be

File tree

2 files changed

+27
-65
lines changed

2 files changed

+27
-65
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 26 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Any, Awaitable, cast
1212

1313
from frequenz.channels import Broadcast, Receiver, Sender, select
14+
from frequenz.channels.timer import SkipMissedAndDrift, Timer
1415
from frequenz.client.common.microgrid.components import ComponentCategory
1516
from frequenz.client.microgrid import ComponentId
1617
from frequenz.sdk.actor import Actor, BackgroundService
@@ -142,59 +143,6 @@ async def main():
142143
```
143144
"""
144145

145-
class FailedDispatchesRetrier(BackgroundService):
146-
"""Manages the retring of failed dispatches."""
147-
148-
def __init__(self, retry_interval: timedelta) -> None:
149-
"""Initialize the retry manager.
150-
151-
Args:
152-
retry_interval: The interval between retries.
153-
"""
154-
super().__init__()
155-
self._retry_interval = retry_interval
156-
self._channel = Broadcast[Dispatch](name="retry_channel")
157-
self._sender = self._channel.new_sender()
158-
159-
def start(self) -> None:
160-
"""Start the background service.
161-
162-
This is a no-op.
163-
"""
164-
165-
def new_receiver(self) -> Receiver[Dispatch]:
166-
"""Create a new receiver for dispatches to retry.
167-
168-
Returns:
169-
The receiver.
170-
"""
171-
return self._channel.new_receiver()
172-
173-
def retry(self, dispatch: Dispatch) -> None:
174-
"""Retry a dispatch.
175-
176-
Args:
177-
dispatch: The dispatch information to retry.
178-
"""
179-
task = asyncio.create_task(self._retry_after_delay(dispatch))
180-
self._tasks.add(task)
181-
task.add_done_callback(self._tasks.remove)
182-
183-
async def _retry_after_delay(self, dispatch: Dispatch) -> None:
184-
"""Retry a dispatch after a delay.
185-
186-
Args:
187-
dispatch: The dispatch information to retry.
188-
"""
189-
_logger.info(
190-
"Will retry dispatch %s after %s",
191-
dispatch.id,
192-
self._retry_interval,
193-
)
194-
await asyncio.sleep(self._retry_interval.total_seconds())
195-
_logger.info("Retrying dispatch %s now", dispatch.id)
196-
await self._sender.send(dispatch)
197-
198146
@dataclass(frozen=True, kw_only=True)
199147
class ActorAndChannel:
200148
"""Actor and its sender."""
@@ -225,18 +173,18 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
225173
running_status_receiver: The receiver for dispatch running status changes.
226174
dispatch_identity: A function to identify to which actor a dispatch refers.
227175
By default, it uses the dispatch ID.
228-
retry_interval: The interval between retries.
176+
retry_interval: How long to wait until trying to start failed actors again.
229177
"""
230178
super().__init__()
231179
self._dispatch_identity: Callable[[Dispatch], int] = (
232180
dispatch_identity if dispatch_identity else lambda d: d.id
233181
)
234182

235183
self._dispatch_rx = running_status_receiver
184+
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
236185
self._actor_factory = actor_factory
237186
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
238-
239-
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
187+
self._failed_dispatches: dict[int, Dispatch] = {}
240188

241189
def start(self) -> None:
242190
"""Start the background service."""
@@ -292,7 +240,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
292240
dispatch.type,
293241
exc_info=e,
294242
)
295-
self._retrier.retry(dispatch)
243+
self._failed_dispatches[identity] = dispatch
296244
else:
297245
# No exception occurred, so we can add the actor to the list
298246
self._actors[identity] = ActorDispatcher.ActorAndChannel(
@@ -318,21 +266,35 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
318266

319267
async def _run(self) -> None:
320268
"""Run the background service."""
321-
async with self._retrier:
322-
retry_recv = self._retrier.new_receiver()
269+
async for selected in select(self._retry_timer_rx, self._dispatch_rx):
270+
if self._retry_timer_rx.triggered(selected):
271+
if not self._failed_dispatches:
272+
continue
273+
274+
_logger.info(
275+
"Retrying %d failed actor starts",
276+
len(self._failed_dispatches),
277+
)
278+
keys = list(self._failed_dispatches.keys())
279+
for identity in keys:
280+
dispatch = self._failed_dispatches[identity]
323281

324-
async for selected in select(retry_recv, self._dispatch_rx):
325-
if retry_recv.triggered(selected):
326-
self._retrier.retry(selected.message)
327-
elif self._dispatch_rx.triggered(selected):
328-
await self._handle_dispatch(selected.message)
282+
await self._handle_dispatch(dispatch)
283+
elif self._dispatch_rx.triggered(selected):
284+
await self._handle_dispatch(selected.message)
329285

330286
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
331287
"""Handle a dispatch.
332288
289+
Removes the dispatch from the failed dispatches cache
290+
333291
Args:
334292
dispatch: The dispatch to handle.
335293
"""
294+
identity = self._dispatch_identity(dispatch)
295+
if identity in self._failed_dispatches:
296+
self._failed_dispatches.pop(identity)
297+
336298
if dispatch.started:
337299
await self._start_actor(dispatch)
338300
else:

0 commit comments

Comments
 (0)