Skip to content

Commit 76da703

Browse files
authored
Fixed that dispatches are never retried, but forever logged. (#162)
2 parents 7510149 + 4894cfc commit 76da703

File tree

2 files changed

+31
-66
lines changed

2 files changed

+31
-66
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414

1515
## Bug Fixes
1616

17-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
17+
* Fixed that dispatches are never retried on failure, but instead an infinite loop of retry logs is triggered.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from typing import Any, Awaitable, cast
1212

1313
from frequenz.channels import Broadcast, Receiver, Sender, select
14+
from frequenz.channels.timer import SkipMissedAndDrift, Timer
1415
from frequenz.client.common.microgrid.components import ComponentCategory
1516
from frequenz.client.microgrid import ComponentId
1617
from frequenz.sdk.actor import Actor, BackgroundService
@@ -142,59 +143,6 @@ async def main():
142143
```
143144
"""
144145

145-
class FailedDispatchesRetrier(BackgroundService):
146-
"""Manages the retring of failed dispatches."""
147-
148-
def __init__(self, retry_interval: timedelta) -> None:
149-
"""Initialize the retry manager.
150-
151-
Args:
152-
retry_interval: The interval between retries.
153-
"""
154-
super().__init__()
155-
self._retry_interval = retry_interval
156-
self._channel = Broadcast[Dispatch](name="retry_channel")
157-
self._sender = self._channel.new_sender()
158-
159-
def start(self) -> None:
160-
"""Start the background service.
161-
162-
This is a no-op.
163-
"""
164-
165-
def new_receiver(self) -> Receiver[Dispatch]:
166-
"""Create a new receiver for dispatches to retry.
167-
168-
Returns:
169-
The receiver.
170-
"""
171-
return self._channel.new_receiver()
172-
173-
def retry(self, dispatch: Dispatch) -> None:
174-
"""Retry a dispatch.
175-
176-
Args:
177-
dispatch: The dispatch information to retry.
178-
"""
179-
task = asyncio.create_task(self._retry_after_delay(dispatch))
180-
self._tasks.add(task)
181-
task.add_done_callback(self._tasks.remove)
182-
183-
async def _retry_after_delay(self, dispatch: Dispatch) -> None:
184-
"""Retry a dispatch after a delay.
185-
186-
Args:
187-
dispatch: The dispatch information to retry.
188-
"""
189-
_logger.info(
190-
"Will retry dispatch %s after %s",
191-
dispatch.id,
192-
self._retry_interval,
193-
)
194-
await asyncio.sleep(self._retry_interval.total_seconds())
195-
_logger.info("Retrying dispatch %s now", dispatch.id)
196-
await self._sender.send(dispatch)
197-
198146
@dataclass(frozen=True, kw_only=True)
199147
class ActorAndChannel:
200148
"""Actor and its sender."""
@@ -225,18 +173,19 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
225173
running_status_receiver: The receiver for dispatch running status changes.
226174
dispatch_identity: A function to identify to which actor a dispatch refers.
227175
By default, it uses the dispatch ID.
228-
retry_interval: The interval between retries.
176+
retry_interval: How long to wait until trying to start failed actors again.
229177
"""
230178
super().__init__()
231179
self._dispatch_identity: Callable[[Dispatch], int] = (
232180
dispatch_identity if dispatch_identity else lambda d: d.id
233181
)
234182

235183
self._dispatch_rx = running_status_receiver
184+
self._retry_timer_rx = Timer(retry_interval, SkipMissedAndDrift())
236185
self._actor_factory = actor_factory
237186
self._actors: dict[int, ActorDispatcher.ActorAndChannel] = {}
238-
239-
self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval)
187+
self._failed_dispatches: dict[int, Dispatch] = {}
188+
"""Failed dispatches that will be retried later."""
240189

241190
def start(self) -> None:
242191
"""Start the background service."""
@@ -292,7 +241,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
292241
dispatch.type,
293242
exc_info=e,
294243
)
295-
self._retrier.retry(dispatch)
244+
self._failed_dispatches[identity] = dispatch
296245
else:
297246
# No exception occurred, so we can add the actor to the list
298247
self._actors[identity] = ActorDispatcher.ActorAndChannel(
@@ -318,21 +267,37 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
318267

319268
async def _run(self) -> None:
320269
"""Run the background service."""
321-
async with self._retrier:
322-
retry_recv = self._retrier.new_receiver()
270+
async for selected in select(self._retry_timer_rx, self._dispatch_rx):
271+
if self._retry_timer_rx.triggered(selected):
272+
if not self._failed_dispatches:
273+
continue
274+
275+
_logger.info(
276+
"Retrying %d failed actor starts",
277+
len(self._failed_dispatches),
278+
)
279+
keys = list(self._failed_dispatches.keys())
280+
for identity in keys:
281+
dispatch = self._failed_dispatches[identity]
323282

324-
async for selected in select(retry_recv, self._dispatch_rx):
325-
if retry_recv.triggered(selected):
326-
self._retrier.retry(selected.message)
327-
elif self._dispatch_rx.triggered(selected):
328-
await self._handle_dispatch(selected.message)
283+
await self._handle_dispatch(dispatch)
284+
elif self._dispatch_rx.triggered(selected):
285+
await self._handle_dispatch(selected.message)
329286

330287
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
331-
"""Handle a dispatch.
288+
"""Process a dispatch to start, update, or stop an actor.
289+
290+
If a newer version of a previously failed dispatch is received, the
291+
pending retry for the older version is canceled to ensure only the
292+
latest dispatch is processed.
332293
333294
Args:
334295
dispatch: The dispatch to handle.
335296
"""
297+
identity = self._dispatch_identity(dispatch)
298+
if identity in self._failed_dispatches:
299+
self._failed_dispatches.pop(identity)
300+
336301
if dispatch.started:
337302
await self._start_actor(dispatch)
338303
else:

0 commit comments

Comments
 (0)