Skip to content

Commit ed93f93

Browse files
committed
Add Dispatcher.manage to manage dispatchable actors
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 3dd9af2 commit ed93f93

File tree

3 files changed

+150
-5
lines changed

3 files changed

+150
-5
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,5 @@ This release introduces a more flexible and powerful mechanism for managing disp
1919
## New Features
2020

2121
* A new feature "merger strategy" (`MergeByType`, `MergeByTypeTarget`) has been added to the `Dispatcher.new_running_state_event_receiver` method. Using it, you can automatically merge & unify consecutive and overlapping dispatch start/stop events of the same type. E.g. dispatch `A` starting at 10:10 and ending at 10:30 and dispatch `B` starts at 10:30 until 11:00, with the feature enabled this would in total trigger one start event, one reconfigure event at 10:30 and one stop event at 11:00.
22-
2322
* The SDK dependency was widened to allow versions up to (excluding) v1.0.0-rc1600.
24-
23+
* A new function `Dispatcher.manage` was added to simplify dispatchable actor management initialization.

src/frequenz/dispatch/_dispatcher.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,20 @@
44
"""A highlevel interface for the dispatch API."""
55

66

7+
import logging
8+
from typing import Callable
9+
710
from frequenz.channels import Receiver
811
from frequenz.client.dispatch import Client
12+
from frequenz.sdk.actor import Actor
913

14+
from ._actor_dispatcher import ActorDispatcher, DispatchInfo
1015
from ._bg_service import DispatchScheduler, MergeStrategy
1116
from ._dispatch import Dispatch
1217
from ._event import DispatchEvent
18+
from ._merge_strategies import MergeByIdentity
19+
20+
_logger = logging.getLogger(__name__)
1321

1422

1523
class Dispatcher:
@@ -178,11 +186,53 @@ def __init__(
178186
microgrid_id,
179187
self._client,
180188
)
189+
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
181190

182-
async def start(self) -> None:
191+
def start(self) -> None:
183192
"""Start the local dispatch service."""
184193
self._bg_service.start()
185194

195+
async def manage(
196+
self,
197+
dispatch_type: str,
198+
*,
199+
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
200+
merge_strategy: MergeByIdentity | None = None,
201+
) -> None:
202+
"""Manage actors for a given dispatch type.
203+
204+
Creates and manages an ActorDispatcher for the given type that will
205+
start, stop and reconfigure actors based on received dispatches.
206+
207+
Args:
208+
dispatch_type: The type of the dispatch to manage.
209+
actor_factory: The factory to create actors.
210+
merge_strategy: The strategy to merge running intervals.
211+
"""
212+
dispatcher = self._actor_dispatchers.get(dispatch_type)
213+
214+
if dispatcher is not None:
215+
_logger.debug(
216+
"Ignoring duplicate actor dispatcher request for %r", dispatch_type
217+
)
218+
return
219+
220+
def id_identity(dispatch: Dispatch) -> int:
221+
return dispatch.id
222+
223+
dispatcher = ActorDispatcher(
224+
actor_factory=actor_factory,
225+
running_status_receiver=await self.new_running_state_event_receiver(
226+
dispatch_type, merge_strategy=merge_strategy
227+
),
228+
map_dispatch=(
229+
id_identity if merge_strategy is None else merge_strategy.identity
230+
),
231+
)
232+
233+
self._actor_dispatchers[dispatch_type] = dispatcher
234+
dispatcher.start()
235+
186236
@property
187237
def client(self) -> Client:
188238
"""Return the client."""

tests/test_mananging_actor.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,39 @@
88
import logging
99
from dataclasses import dataclass, replace
1010
from datetime import datetime, timedelta, timezone
11-
from typing import AsyncIterator, Iterator, cast
11+
from typing import AsyncIterator, Callable, Iterator, cast
12+
from unittest.mock import patch
1213

1314
import async_solipsism
15+
import pytest
1416
import time_machine
1517
from frequenz.channels import Broadcast, Receiver, Sender
18+
from frequenz.client.dispatch import recurrence
1619
from frequenz.client.dispatch.recurrence import Frequency
20+
from frequenz.client.dispatch.test.client import FakeClient
1721
from frequenz.client.dispatch.test.generator import DispatchGenerator
1822
from frequenz.sdk.actor import Actor
1923
from pytest import fixture
2024

21-
from frequenz.dispatch import ActorDispatcher, Dispatch, DispatchInfo
25+
from frequenz.dispatch import (
26+
ActorDispatcher,
27+
Dispatch,
28+
Dispatcher,
29+
DispatchInfo,
30+
MergeByIdentity,
31+
MergeByType,
32+
MergeByTypeTarget,
33+
MergeStrategy,
34+
)
2235
from frequenz.dispatch._bg_service import DispatchScheduler
2336

2437

38+
@fixture
39+
def generator() -> DispatchGenerator:
40+
"""Return a dispatch generator."""
41+
return DispatchGenerator()
42+
43+
2544
@fixture
2645
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
2746
"""Set the event loop policy to use async_solipsism."""
@@ -232,3 +251,80 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -
232251

233252
# Give await actor.stop a chance to run
234253
await asyncio.sleep(1)
254+
255+
256+
@pytest.mark.parametrize("strategy", [MergeByTypeTarget(), MergeByType(), None])
257+
async def test_manage_abstraction(
258+
fake_time: time_machine.Coordinates,
259+
generator: DispatchGenerator,
260+
strategy: MergeByIdentity | None,
261+
) -> None:
262+
"""Test Dispatcher.manage sets up correctly."""
263+
identity: Callable[[Dispatch], int] = (
264+
strategy.identity if strategy else lambda dispatch: dispatch.id
265+
)
266+
267+
class MyFakeClient(FakeClient):
268+
"""Fake client for testing."""
269+
270+
def __init__(self, *, server_url: str, key: str):
271+
assert server_url
272+
assert key
273+
super().__init__()
274+
275+
mid = 1
276+
277+
# Patch `Client` class in Dispatcher with MyFakeClient
278+
with patch("frequenz.dispatch._dispatcher.Client", MyFakeClient):
279+
dispatcher = Dispatcher(
280+
microgrid_id=mid, server_url="grpc://test-url", key="test-key"
281+
)
282+
dispatcher.start()
283+
284+
channel = Broadcast[Dispatch](name="dispatch ready test channel")
285+
sender = channel.new_sender()
286+
287+
async def new_mock_receiver(
288+
_: Dispatcher, dispatch_type: str, *, merge_strategy: MergeStrategy | None
289+
) -> Receiver[Dispatch]:
290+
assert dispatch_type == "MANAGE_TEST"
291+
assert merge_strategy is strategy
292+
return channel.new_receiver()
293+
294+
with patch(
295+
"frequenz.dispatch._dispatcher.Dispatcher.new_running_state_event_receiver",
296+
new_mock_receiver,
297+
):
298+
await dispatcher.manage(
299+
dispatch_type="MANAGE_TEST",
300+
actor_factory=MockActor,
301+
merge_strategy=strategy,
302+
)
303+
304+
# pylint: disable=protected-access
305+
assert "MANAGE_TEST" in dispatcher._actor_dispatchers
306+
actor_manager = dispatcher._actor_dispatchers["MANAGE_TEST"]
307+
assert actor_manager._actor_factory == MockActor
308+
309+
dispatch = Dispatch(
310+
replace(
311+
generator.generate_dispatch(),
312+
start_time=_now(),
313+
duration=timedelta(minutes=10),
314+
recurrence=recurrence.RecurrenceRule(),
315+
active=True,
316+
type="MANAGE_TEST",
317+
)
318+
)
319+
320+
fake_time.move_to(dispatch.start_time + timedelta(seconds=1))
321+
assert dispatch.started
322+
323+
# Send a dispatch to start an actor instance
324+
await sender.send(dispatch)
325+
326+
# Give the actor a chance to start
327+
await asyncio.sleep(1)
328+
329+
# Check if actor instance is created
330+
assert identity(dispatch) in actor_manager._actors

0 commit comments

Comments
 (0)