Skip to content

Commit 3142cf9

Browse files
committed
Add retry delay and auto restart to ActorService
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent ae1f459 commit 3142cf9

File tree

4 files changed

+83
-10
lines changed

4 files changed

+83
-10
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
2323
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2424
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
2525
* Actor management with dispatches has been simplified:
26-
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
26+
* `Dispatcher.start_dispatching(dispatch_type, actor_factory, merge_strategy, retry_interval)` to manage your actor for the given type and merge strategy. All you need provide is an actor factory.
2727
* `Dispatcher.stop_dispatching(dispatch_type)` to stop dispatching for the given type.
2828
* `Dispatcher.is_dispatching(dispatch_type)` to check if dispatching is active for the given type.
29+
* Dispatches that failed to start will now be retried after a delay.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import logging
88
from collections.abc import Callable
99
from dataclasses import dataclass
10+
from datetime import timedelta
1011
from typing import Any, Awaitable
1112

12-
from frequenz.channels import Broadcast, Receiver
13+
from frequenz.channels import Broadcast, Receiver, select
14+
from frequenz.channels.timer import SkipMissedAndResync, Timer
1315
from frequenz.client.dispatch.types import TargetComponents
1416
from frequenz.sdk.actor import Actor, BackgroundService
1517

@@ -32,6 +34,7 @@ class DispatchInfo:
3234
"""Additional options."""
3335

3436

37+
# pylint: disable=too-many-instance-attributes
3538
class ActorDispatcher(BackgroundService):
3639
"""Helper class to manage actors based on dispatches.
3740
@@ -141,6 +144,7 @@ def __init__(
141144
],
142145
running_status_receiver: Receiver[Dispatch],
143146
dispatch_identity: Callable[[Dispatch], int] | None = None,
147+
retry_interval: timedelta = timedelta(seconds=60),
144148
) -> None:
145149
"""Initialize the dispatch handler.
146150
@@ -150,6 +154,7 @@ def __init__(
150154
running_status_receiver: The receiver for dispatch running status changes.
151155
dispatch_identity: A function to identify to which actor a dispatch refers.
152156
By default, it uses the dispatch ID.
157+
retry_interval: The interval between retries.
153158
"""
154159
super().__init__()
155160
self._dispatch_identity: Callable[[Dispatch], int] = (
@@ -159,6 +164,9 @@ def __init__(
159164
self._dispatch_rx = running_status_receiver
160165
self._actor_factory = actor_factory
161166
self._actors: dict[int, Actor] = {}
167+
self._start_failed_dispatches: dict[int, Dispatch] = {}
168+
self._retry_timer = Timer(retry_interval, SkipMissedAndResync())
169+
162170
self._updates_channel = Broadcast[DispatchInfo](
163171
name="dispatch_updates_channel", resend_latest=True
164172
)
@@ -176,7 +184,8 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
176184
options=dispatch.payload,
177185
)
178186

179-
actor: Actor | None = self._actors.get(self._dispatch_identity(dispatch))
187+
identity = self._dispatch_identity(dispatch)
188+
actor: Actor | None = self._actors.get(identity)
180189

181190
if actor:
182191
sent_str = ""
@@ -189,23 +198,28 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
189198
sent_str,
190199
)
191200
else:
201+
# Remove pending restarts for this identity
202+
self._start_failed_dispatches.pop(identity, None)
203+
192204
try:
193205
_logger.info("Starting actor for dispatch type %r", dispatch.type)
194206
actor = await self._actor_factory(
195207
dispatch_update,
196208
self._updates_channel.new_receiver(limit=1, warn_on_overflow=False),
197209
)
198-
self._actors[self._dispatch_identity(dispatch)] = actor
199210

200211
actor.start()
201212

202213
except Exception as e: # pylint: disable=broad-except
203214
_logger.error(
204-
"Failed to start actor for dispatch type %r: %s",
215+
"Failed to start actor for dispatch type %r",
205216
dispatch.type,
206-
e,
207-
exc_info=True,
217+
exc_info=e,
208218
)
219+
self._start_failed_dispatches[identity] = dispatch
220+
else:
221+
# No exception occurred, so we can add the actor to the list
222+
self._actors[identity] = actor
209223

210224
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
211225
"""Stop all actors.
@@ -222,9 +236,16 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
222236
)
223237

224238
async def _run(self) -> None:
225-
"""Wait for dispatches and handle them."""
226-
async for dispatch in self._dispatch_rx:
227-
await self._handle_dispatch(dispatch=dispatch)
239+
"""Run the background service."""
240+
async for selected in select(self._retry_timer, self._dispatch_rx):
241+
if self._retry_timer.triggered(selected) and self._start_failed_dispatches:
242+
_logger.info("Retrying failed dispatches")
243+
failed_dispatches = self._start_failed_dispatches.values()
244+
self._start_failed_dispatches = {}
245+
for dispatch in failed_dispatches:
246+
await self._start_actor(dispatch)
247+
elif self._dispatch_rx.triggered(selected):
248+
await self._handle_dispatch(selected.message)
228249

229250
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
230251
"""Handle a dispatch.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import asyncio
99
import logging
1010
from asyncio import Event
11+
from datetime import timedelta
1112
from typing import Awaitable, Callable
1213

1314
from frequenz.channels import Receiver
@@ -245,6 +246,7 @@ async def start_dispatching(
245246
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
246247
],
247248
merge_strategy: MergeStrategy | None = None,
249+
retry_interval: timedelta = timedelta(seconds=60),
248250
) -> None:
249251
"""Manage actors for a given dispatch type.
250252
@@ -259,6 +261,7 @@ async def start_dispatching(
259261
dispatch_type: The type of the dispatch to manage.
260262
actor_factory: The factory to create actors.
261263
merge_strategy: The strategy to merge running intervals.
264+
retry_interval: Retry interval for when actor creation fails.
262265
"""
263266
dispatcher = self._actor_dispatchers.get(dispatch_type)
264267

@@ -281,6 +284,7 @@ def id_identity(dispatch: Dispatch) -> int:
281284
dispatch_identity=(
282285
id_identity if merge_strategy is None else merge_strategy.identity
283286
),
287+
retry_interval=retry_interval,
284288
)
285289

286290
self._actor_dispatchers[dispatch_type] = dispatcher

tests/test_mananging_actor.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ async def create(
8686
actor = cls(initial_dispatch, receiver)
8787
return actor
8888

89+
@classmethod
90+
async def create_fail(
91+
cls, __: DispatchInfo, _: Receiver[DispatchInfo]
92+
) -> "MockActor":
93+
"""Create a new actor."""
94+
raise ValueError("Failed to create actor")
95+
8996

9097
@dataclass
9198
class TestEnv:
@@ -175,6 +182,45 @@ async def test_simple_start_stop(
175182
# pylint: enable=protected-access
176183

177184

185+
async def test_start_failed(
186+
test_env: TestEnv, fake_time: time_machine.Coordinates
187+
) -> None:
188+
"""Test auto-retry after 60 seconds."""
189+
# pylint: disable=protected-access
190+
test_env.actors_service._actor_factory = MockActor.create_fail
191+
192+
now = _now()
193+
duration = timedelta(minutes=10)
194+
dispatch = test_env.generator.generate_dispatch()
195+
dispatch = replace(
196+
dispatch,
197+
id=1,
198+
active=True,
199+
dry_run=False,
200+
duration=duration,
201+
start_time=now,
202+
payload={"test": True},
203+
type="UNIT_TEST",
204+
recurrence=replace(
205+
dispatch.recurrence,
206+
frequency=Frequency.UNSPECIFIED,
207+
),
208+
)
209+
210+
# Send status update to start actor, expect no DispatchInfo for the start
211+
await test_env.running_status_sender.send(Dispatch(dispatch))
212+
fake_time.shift(timedelta(seconds=1))
213+
await asyncio.sleep(1)
214+
215+
assert test_env.actors_service._start_failed_dispatches[1].id == dispatch.id
216+
test_env.actors_service._actor_factory = MockActor.create
217+
218+
fake_time.shift(timedelta(seconds=65))
219+
await asyncio.sleep(65)
220+
221+
assert test_env.actor(1).is_running is True
222+
223+
178224
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
179225
"""Test that the heapq compare function works."""
180226
dispatch1 = test_env.generator.generate_dispatch()
@@ -311,6 +357,7 @@ async def new_mock_receiver(
311357
# pylint: disable=protected-access
312358
assert "MANAGE_TEST" in dispatcher._actor_dispatchers
313359
actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"]
360+
# pylint: disable=comparison-with-callable
314361
assert actor_manager._actor_factory == MockActor.create
315362

316363
dispatch = Dispatch(

0 commit comments

Comments
 (0)