Skip to content

Commit 0060283

Browse files
committed
The ActorDispatcher now supports persistent instances of actors.
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent a247d26 commit 0060283

File tree

4 files changed

+33
-2
lines changed

4 files changed

+33
-2
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ This release introduces a more flexible and powerful mechanism for managing disp
1919

2020
## New Features
2121

22+
* The `ActorDispatcher` now supports persistent instances of actors. This allows you to keep the actor instance around after stopping the dispatch. The actor will be started again when an according dispatch is received.
2223
* A new parameter `initial_fetch_timeout` has been added to `Dispatcher.new_running_state_event_receiver` and related methods. It allows you to specify a timeout for the initial fetch of dispatches. After the timeout, startup continues as normal.
2324
* A new feature "merge strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
2425
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class DispatchInfo:
3333
"""Additional options."""
3434

3535

36+
# pylint: disable=too-many-instance-attributes
3637
class ActorDispatcher(BackgroundService):
3738
"""Helper class to manage actors based on dispatches.
3839
@@ -186,6 +187,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
186187
],
187188
running_status_receiver: Receiver[Dispatch],
188189
dispatch_identity: Callable[[Dispatch], int] | None = None,
190+
persist_instances: bool = True,
189191
retry_interval: timedelta | None = timedelta(seconds=60),
190192
) -> None:
191193
"""Initialize the dispatch handler.
@@ -196,16 +198,20 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
196198
running_status_receiver: The receiver for dispatch running status changes.
197199
dispatch_identity: A function to identify to which actor a dispatch refers.
198200
By default, it uses the dispatch ID.
201+
persist_instances: Whether to keep instances of actors that have
202+
stopped. Not recommended when you expect a varying number of instances.
199203
retry_interval: The interval between retries. If `None`, retries are disabled.
200204
"""
201205
super().__init__()
202206
self._dispatch_identity: Callable[[Dispatch], int] = (
203207
dispatch_identity if dispatch_identity else lambda d: d.id
204208
)
205209

210+
self._persist_instances = persist_instances
206211
self._dispatch_rx = running_status_receiver
207212
self._actor_factory = actor_factory
208213
self._actors: dict[int, Actor] = {}
214+
209215
self._updates_channel = Broadcast[DispatchInfo](
210216
name="dispatch_updates_channel", resend_latest=True
211217
)
@@ -277,10 +283,18 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
277283
actor: Actor | None = None
278284
identity = self._dispatch_identity(stopping_dispatch)
279285

280-
actor = self._actors.get(identity)
286+
if self._persist_instances:
287+
actor = self._actors.get(identity)
288+
else:
289+
# Remove reference to actor
290+
actor = self._actors.pop(identity, None)
281291

282292
if actor:
283293
await actor.stop(msg)
294+
295+
if not self._persist_instances:
296+
del actor
297+
284298
else:
285299
_logger.warning(
286300
"Actor for dispatch type %r is not running", stopping_dispatch.type

src/frequenz/dispatch/_dispatcher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ def is_managed(self, dispatch_type: str) -> bool:
233233
"""
234234
return dispatch_type in self._actor_dispatchers
235235

236+
# pylint: disable=too-many-arguments, too-many-positional-arguments
236237
async def start_dispatching(
237238
self,
238239
dispatch_type: str,
@@ -241,6 +242,7 @@ async def start_dispatching(
241242
[DispatchInfo, Receiver[DispatchInfo]], Awaitable[Actor]
242243
],
243244
merge_strategy: MergeStrategy | None = None,
245+
persist_instances: bool = True,
244246
retry_interval: timedelta = timedelta(seconds=60),
245247
initial_fetch_timeout: timedelta = timedelta(seconds=10),
246248
) -> None:
@@ -257,6 +259,8 @@ async def start_dispatching(
257259
dispatch_type: The type of the dispatch to manage.
258260
actor_factory: The factory to create actors.
259261
merge_strategy: The strategy to merge running intervals.
262+
persist_instances: Whether to reuse actor instances. Not recommended
263+
when you expect a varying number of instances.
260264
retry_interval: Retry interval for when actor creation fails.
261265
initial_fetch_timeout: Timeout for the initial fetch of dispatches.
262266
After the timeout, the receiver will continue to start and
@@ -286,12 +290,15 @@ def id_identity(dispatch: Dispatch) -> int:
286290
dispatch_identity=(
287291
id_identity if merge_strategy is None else merge_strategy.identity
288292
),
293+
persist_instances=persist_instances,
289294
retry_interval=retry_interval,
290295
)
291296

292297
self._actor_dispatchers[dispatch_type] = dispatcher
293298
dispatcher.start()
294299

300+
# pylint: enable=too-many-arguments, too-many-positional-arguments
301+
295302
async def stop_dispatching(self, dispatch_type: str) -> None:
296303
"""Stop managing actors for a given dispatch type.
297304

tests/test_mananging_actor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,17 @@ async def test_env() -> AsyncIterator[TestEnv]:
132132
await actors_service.stop()
133133

134134

135+
@pytest.mark.parametrize("persist_instances", [True, False])
135136
async def test_simple_start_stop(
136137
test_env: TestEnv,
137138
fake_time: time_machine.Coordinates,
139+
persist_instances: bool,
138140
) -> None:
139141
"""Test behavior when receiving start/stop messages."""
142+
# pylint: disable=protected-access
143+
test_env.actors_service._persist_instances = persist_instances
144+
# pylint: enable=protected-access
145+
140146
now = _now()
141147
duration = timedelta(minutes=10)
142148
dispatch = test_env.generator.generate_dispatch()
@@ -178,7 +184,10 @@ async def test_simple_start_stop(
178184
await asyncio.sleep(1)
179185

180186
# pylint: disable=protected-access
181-
assert 1 not in test_env.actors_service._actors
187+
if persist_instances:
188+
assert 1 in test_env.actors_service._actors
189+
else:
190+
assert 1 not in test_env.actors_service._actors
182191
# pylint: enable=protected-access
183192

184193

0 commit comments

Comments
 (0)