Skip to content

Commit 27252a8

Browse files
committed
ActorDispatcher: Support dispatches referring to different actors
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 8f91403 commit 27252a8

File tree

2 files changed

+30
-32
lines changed

2 files changed

+30
-32
lines changed

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ActorDispatcher(BackgroundService):
4141
import os
4242
import asyncio
4343
from typing import override
44-
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchInfo
44+
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
4545
from frequenz.client.dispatch.types import TargetComponents
4646
from frequenz.client.common.microgrid.components import ComponentCategory
4747
from frequenz.channels import Receiver, Broadcast, select, selected_from
@@ -125,9 +125,10 @@ async def main():
125125
126126
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
127127
128-
managing_actor = DispatchManagingActor(
128+
managing_actor = ActorDispatcher(
129129
actor_factory=MyActor.new_with_dispatch,
130130
running_status_receiver=status_receiver,
131+
map_dispatch=lambda dispatch: dispatch.id,
131132
)
132133
133134
await run(managing_actor)
@@ -138,18 +139,21 @@ def __init__(
138139
self,
139140
actor_factory: Callable[[DispatchInfo, Receiver[DispatchInfo]], Actor],
140141
running_status_receiver: Receiver[Dispatch],
142+
map_dispatch: Callable[[Dispatch], int],
141143
) -> None:
142144
"""Initialize the dispatch handler.
143145
144146
Args:
145147
actor_factory: A callable that creates an actor with some initial dispatch
146148
information.
147149
running_status_receiver: The receiver for dispatch running status changes.
150+
map_dispatch: A function to identify to which actor a dispatch refers.
148151
"""
149152
super().__init__()
153+
self._map_dispatch = map_dispatch
150154
self._dispatch_rx = running_status_receiver
151155
self._actor_factory = actor_factory
152-
self._actor: Actor | None = None
156+
self._actors: dict[int, Actor] = {}
153157
self._updates_channel = Broadcast[DispatchInfo](
154158
name="dispatch_updates_channel", resend_latest=True
155159
)
@@ -167,7 +171,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
167171
options=dispatch.payload,
168172
)
169173

170-
if self._actor:
174+
actor: Actor | None = self._actors.get(self._map_dispatch(dispatch))
175+
176+
if actor:
171177
sent_str = ""
172178
if self._updates_sender is not None:
173179
sent_str = ", sent a dispatch update instead of creating a new actor"
@@ -179,10 +185,12 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
179185
)
180186
else:
181187
_logger.info("Starting actor for dispatch type %r", dispatch.type)
182-
self._actor = self._actor_factory(
188+
actor = self._actor_factory(
183189
dispatch_update, self._updates_channel.new_receiver()
184190
)
185-
self._actor.start()
191+
self._actors[self._map_dispatch(dispatch)] = actor
192+
193+
actor.start()
186194

187195
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
188196
"""Stop all actors.
@@ -191,13 +199,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
191199
stopping_dispatch: The dispatch that is stopping the actor.
192200
msg: The message to be passed to the actors being stopped.
193201
"""
194-
if self._actor is None:
202+
if actor := self._actors.pop(self._map_dispatch(stopping_dispatch), None):
203+
await actor.stop(msg)
204+
else:
195205
_logger.warning(
196206
"Actor for dispatch type %r is not running", stopping_dispatch.type
197207
)
198-
else:
199-
await self._actor.stop(msg)
200-
self._actor = None
201208

202209
async def _run(self) -> None:
203210
"""Wait for dispatches and handle them."""

tests/test_mananging_actor.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,13 @@ class TestEnv:
6969
running_status_sender: Sender[Dispatch]
7070
generator: DispatchGenerator = DispatchGenerator()
7171

72-
@property
73-
def actor(self) -> MockActor | None:
72+
def actor(self, identity: int) -> MockActor:
7473
"""Return the actor."""
7574
# pylint: disable=protected-access
76-
if self.actors_service._actor is None:
77-
return None
78-
return cast(MockActor, self.actors_service._actor)
75+
assert identity in self.actors_service._actors
76+
return cast(MockActor, self.actors_service._actors[identity])
7977
# pylint: enable=protected-access
8078

81-
@property
82-
def updates_receiver(self) -> Receiver[DispatchInfo]:
83-
"""Return the updates receiver."""
84-
assert self.actor is not None
85-
return self.actor.receiver
86-
8779

8880
@fixture
8981
async def test_env() -> AsyncIterator[TestEnv]:
@@ -93,6 +85,7 @@ async def test_env() -> AsyncIterator[TestEnv]:
9385
actors_service = ActorDispatcher(
9486
actor_factory=MockActor,
9587
running_status_receiver=channel.new_receiver(),
88+
map_dispatch=lambda dispatch: dispatch.id,
9689
)
9790

9891
actors_service.start()
@@ -116,6 +109,7 @@ async def test_simple_start_stop(
116109
dispatch = test_env.generator.generate_dispatch()
117110
dispatch = replace(
118111
dispatch,
112+
id=1,
119113
active=True,
120114
dry_run=False,
121115
duration=duration,
@@ -135,24 +129,24 @@ async def test_simple_start_stop(
135129
await asyncio.sleep(1)
136130
logging.info("Sent dispatch")
137131

138-
assert test_env.actor is not None
139-
event = test_env.actor.initial_dispatch
132+
event = test_env.actor(1).initial_dispatch
140133
assert event.options == {"test": True}
141134
assert event.components == dispatch.target
142135
assert event.dry_run is False
143136

144137
logging.info("Received dispatch")
145138

146-
assert test_env.actor is not None
147-
assert test_env.actor.is_running is True
139+
assert test_env.actor(1).is_running is True
148140

149141
fake_time.shift(duration)
150142
await test_env.running_status_sender.send(Dispatch(dispatch))
151143

152144
# Give await actor.stop a chance to run
153145
await asyncio.sleep(1)
154146

155-
assert test_env.actor is None
147+
# pylint: disable=protected-access
148+
assert 1 not in test_env.actors_service._actors
149+
# pylint: enable=protected-access
156150

157151

158152
def test_heapq_dispatch_compare(test_env: TestEnv) -> None:
@@ -209,6 +203,7 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -
209203
dispatch = test_env.generator.generate_dispatch()
210204
dispatch = replace(
211205
dispatch,
206+
id=1,
212207
dry_run=True,
213208
active=True,
214209
start_time=_now(),
@@ -224,20 +219,16 @@ async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -
224219
fake_time.shift(timedelta(seconds=1))
225220
await asyncio.sleep(1)
226221

227-
assert test_env.actor is not None
228-
event = test_env.actor.initial_dispatch
222+
event = test_env.actor(1).initial_dispatch
229223

230224
assert event.dry_run is dispatch.dry_run
231225
assert event.components == dispatch.target
232226
assert event.options == dispatch.payload
233-
assert test_env.actor is not None
234-
assert test_env.actor.is_running is True
227+
assert test_env.actor(1).is_running is True
235228

236229
assert dispatch.duration is not None
237230
fake_time.shift(dispatch.duration)
238231
await test_env.running_status_sender.send(Dispatch(dispatch))
239232

240233
# Give await actor.stop a chance to run
241234
await asyncio.sleep(1)
242-
243-
assert test_env.actor is None

0 commit comments

Comments
 (0)