Skip to content

Commit a247d26

Browse files
committed
Dispatches that failed to start will now be retried after a delay.
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent e3fc477 commit a247d26

File tree

4 files changed

+138
-12
lines changed

4 files changed

+138
-12
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
2323
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2424
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
2525
* Actor management with dispatches has been simplified:
26-
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
26+
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
2727
* `Dispatcher.stop_dispatching(dispatch_type)` to stop dispatching for the given type.
2828
* `Dispatcher.is_dispatching(dispatch_type)` to check if dispatching is active for the given type.
29+
* Dispatches that failed to start will now be retried after a delay.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 85 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import logging
88
from collections.abc import Callable
99
from dataclasses import dataclass
10+
from datetime import timedelta
1011
from typing import Any, Awaitable
1112

12-
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels import Broadcast, Receiver, select
1314
from frequenz.client.dispatch.types import TargetComponents
1415
from frequenz.sdk.actor import Actor, BackgroundService
1516

@@ -134,13 +135,58 @@ async def main():
134135
```
135136
"""
136137

137-
def __init__(
138+
class RetryFailedDispatches:
139+
"""Manages the retry of failed dispatches."""
140+
141+
def __init__(self, retry_interval: timedelta) -> None:
142+
"""Initialize the retry manager.
143+
144+
Args:
145+
retry_interval: The interval between retries.
146+
"""
147+
self._retry_interval = retry_interval
148+
self._channel = Broadcast[Dispatch](name="retry_channel")
149+
self._sender = self._channel.new_sender()
150+
151+
def new_receiver(self) -> Receiver[Dispatch]:
152+
"""Create a new receiver for dispatches to retry.
153+
154+
Returns:
155+
The receiver.
156+
"""
157+
return self._channel.new_receiver()
158+
159+
def retry(self, dispatch: Dispatch) -> None:
160+
"""Retry a dispatch.
161+
162+
Args:
163+
dispatch: The dispatch information to retry.
164+
"""
165+
asyncio.create_task(self._retry_after_delay(dispatch))
166+
167+
async def _retry_after_delay(self, dispatch: Dispatch) -> None:
168+
"""Retry a dispatch after a delay.
169+
170+
Args:
171+
dispatch: The dispatch information to retry.
172+
"""
173+
_logger.info(
174+
"Will retry dispatch %s after %s",
175+
dispatch.id,
176+
self._retry_interval,
177+
)
178+
await asyncio.sleep(self._retry_interval.total_seconds())
179+
_logger.info("Retrying dispatch %s now", dispatch.id)
180+
await self._sender.send(dispatch)
181+
182+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
138183
self,
139184
actor_factory: Callable[
140185
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
141186
],
142187
running_status_receiver: Receiver[Dispatch],
143188
dispatch_identity: Callable[[Dispatch], int] | None = None,
189+
retry_interval: timedelta | None = timedelta(seconds=60),
144190
) -> None:
145191
"""Initialize the dispatch handler.
146192
@@ -150,6 +196,7 @@ def __init__(
150196
running_status_receiver: The receiver for dispatch running status changes.
151197
dispatch_identity: A function to identify to which actor a dispatch refers.
152198
By default, it uses the dispatch ID.
199+
retry_interval: The interval between retries. If `None`, retries are disabled.
153200
"""
154201
super().__init__()
155202
self._dispatch_identity: Callable[[Dispatch], int] = (
@@ -163,6 +210,11 @@ def __init__(
163210
name="dispatch_updates_channel", resend_latest=True
164211
)
165212
self._updates_sender = self._updates_channel.new_sender()
213+
self._retrier = (
214+
ActorDispatcher.RetryFailedDispatches(retry_interval)
215+
if retry_interval
216+
else None
217+
)
166218

167219
def start(self) -> None:
168220
"""Start the background service."""
@@ -176,7 +228,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
176228
options=dispatch.payload,
177229
)
178230

179-
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
231+
identity = self._dispatch_identity(dispatch)
232+
actor: Actor | None = self._actors.get(identity)
180233

181234
if actor:
182235
sent_str = ""
@@ -195,17 +248,24 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
195248
dispatch_update,
196249
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
197250
)
198-
self._actors[self._dispatch_identity(dispatch)] = actor
199251

200252
actor.start()
201253

202254
except Exception as e: # pylint: disable=broad-except
203255
_logger.error(
204-
"Failed to start actor for dispatch type %r: %s",
256+
"Failed to start actor for dispatch type %r",
205257
dispatch.type,
206-
e,
207-
exc_info=True,
258+
exc_info=e,
208259
)
260+
if self._retrier:
261+
self._retrier.retry(dispatch)
262+
else:
263+
_logger.error(
264+
"No retry mechanism enabled, dispatch %r failed", dispatch
265+
)
266+
else:
267+
# No exception occurred, so we can add the actor to the list
268+
self._actors[identity] = actor
209269

210270
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
211271
"""Stop all actors.
@@ -214,17 +274,31 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
214274
stopping_dispatch: The dispatch that is stopping the actor.
215275
msg: The message to be passed to the actors being stopped.
216276
"""
217-
if actor := self._actors.pop(self._dispatch_identity(stopping_dispatch), None):
277+
actor: Actor | None = None
278+
identity = self._dispatch_identity(stopping_dispatch)
279+
280+
actor = self._actors.get(identity)
281+
282+
if actor:
218283
await actor.stop(msg)
219284
else:
220285
_logger.warning(
221286
"Actor for dispatch type %r is not running", stopping_dispatch.type
222287
)
223288

224289
async def _run(self) -> None:
225-
"""Wait for dispatches and handle them."""
226-
async for dispatch in self._dispatch_rx:
227-
await self._handle_dispatch(dispatch=dispatch)
290+
"""Run the background service."""
291+
if not self._retrier:
292+
async for dispatch in self._dispatch_rx:
293+
await self._handle_dispatch(dispatch)
294+
else:
295+
retry_recv = self._retrier.new_receiver()
296+
297+
async for selected in select(retry_recv, self._dispatch_rx):
298+
if retry_recv.triggered(selected):
299+
self._retrier.retry(selected.message)
300+
elif self._dispatch_rx.triggered(selected):
301+
await self._handle_dispatch(selected.message)
228302

229303
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
230304
"""Handle a dispatch.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ async def start_dispatching(
241241
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
242242
],
243243
merge_strategy: MergeStrategy | None = None,
244+
retry_interval: timedelta = timedelta(seconds=60),
244245
initial_fetch_timeout: timedelta = timedelta(seconds=10),
245246
) -> None:
246247
"""Manage actors for a given dispatch type.
@@ -256,6 +257,7 @@ async def start_dispatching(
256257
dispatch_type: The type of the dispatch to manage.
257258
actor_factory: The factory to create actors.
258259
merge_strategy: The strategy to merge running intervals.
260+
retry_interval: Retry interval for when actor creation fails.
259261
initial_fetch_timeout: Timeout for the initial fetch of dispatches.
260262
After the timeout, the receiver will continue to start and
261263
listen for new dispatches, while the initial fetch is still
@@ -284,6 +286,7 @@ def id_identity(dispatch: Dispatch) -> int:
284286
dispatch_identity=(
285287
id_identity if merge_strategy is None else merge_strategy.identity
286288
),
289+
retry_interval=retry_interval,
287290
)
288291

289292
self._actor_dispatchers[dispatch_type] = dispatcher

tests/test_mananging_actor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ async def create(
8686
actor = cls(initial_dispatch, receiver)
8787
return actor
8888

89+
@classmethod
90+
async def create_fail(
91+
cls, __: DispatchInfo, _: Receiver[DispatchInfo]
92+
) -> "MockActor":
93+
"""Create a new actor."""
94+
raise ValueError("Failed to create actor")
95+
8996

9097
@dataclass
9198
class TestEnv:
@@ -175,6 +182,47 @@ async def test_simple_start_stop(
175182
# pylint: enable=protected-access
176183

177184

185+
async def test_start_failed(
186+
test_env: TestEnv, fake_time: time_machine.Coordinates
187+
) -> None:
188+
"""Test auto-retry after 60 seconds."""
189+
# pylint: disable=protected-access
190+
test_env.actors_service._actor_factory = MockActor.create_fail
191+
192+
now = _now()
193+
duration = timedelta(minutes=10)
194+
dispatch = test_env.generator.generate_dispatch()
195+
dispatch = replace(
196+
dispatch,
197+
id=1,
198+
active=True,
199+
dry_run=False,
200+
duration=duration,
201+
start_time=now,
202+
payload={"test": True},
203+
type="UNIT_TEST",
204+
recurrence=replace(
205+
dispatch.recurrence,
206+
frequency=Frequency.UNSPECIFIED,
207+
),
208+
)
209+
210+
# Send status update to start actor, expect no DispatchInfo for the start
211+
await test_env.running_status_sender.send(Dispatch(dispatch))
212+
fake_time.shift(timedelta(seconds=1))
213+
214+
# Replace failing mock actor factory with a working one
215+
test_env.actors_service._actor_factory = MockActor.create
216+
217+
# Give retry task time to start
218+
await asyncio.sleep(1)
219+
220+
fake_time.shift(timedelta(seconds=65))
221+
await asyncio.sleep(65)
222+
223+
assert test_env.actor(1).is_running is True
224+
225+
178226
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
179227
"""Test that the heapq compare function works."""
180228
dispatch1 = test_env.generator.generate_dispatch()

0 commit comments

Comments
 (0)