Skip to content

Commit dca6c78

Browse files
committed
WIP: Support dipatches refering to different actors
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 0759a59 commit dca6c78

File tree

2 files changed

+46
-24
lines changed

2 files changed

+46
-24
lines changed

src/frequenz/dispatch/_actors_service.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import logging
88
from collections.abc import Callable
99
from dataclasses import dataclass
10+
from re import L
1011
from typing import Any
1112

1213
from frequenz.channels import Broadcast, Receiver
1314
from frequenz.client.dispatch.types import TargetComponents
1415
from frequenz.sdk.actor import Actor, BackgroundService
16+
from grpc.aio import Call
1517

1618
from ._dispatch import Dispatch
1719

@@ -41,7 +43,7 @@ class DispatchActorsService(BackgroundService):
4143
import os
4244
import asyncio
4345
from typing import override
44-
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
46+
from frequenz.dispatch import Dispatcher, DispatchActorsService, DispatchUpdate
4547
from frequenz.client.dispatch.types import TargetComponents
4648
from frequenz.client.common.microgrid.components import ComponentCategory
4749
from frequenz.channels import Receiver, Broadcast, select, selected_from
@@ -128,7 +130,7 @@ async def main():
128130
def actor_factory(initial_dispatch, dispatch_receiver):
129131
return MyActor.new_with_dispatch(initial_dispatch, dispatch_receiver)
130132
131-
managing_actor = DispatchManagingActor(
133+
managing_actor = DispatchActorsService(
132134
actor_factory=actor_factory,
133135
running_status_receiver=status_receiver,
134136
)
@@ -141,18 +143,22 @@ def __init__(
141143
self,
142144
actor_factory: Callable[[DispatchUpdate, Receiver[DispatchUpdate]], Actor],
143145
running_status_receiver: Receiver[Dispatch],
146+
map_dispatch: Callable[[Dispatch], int] = lambda dispatch: hash(dispatch.type),
144147
) -> None:
145148
"""Initialize the dispatch handler.
146149
147150
Args:
148151
actor_factory: A callable that creates an actor with some initial dispatch
149152
information.
150153
running_status_receiver: The receiver for dispatch running status changes.
154+
map_dispatch: A function to identify to which actor a dispatch
155+
refers. By default, it uses the hash of the dispatch type.
151156
"""
152157
super().__init__()
158+
self._map_dispatch = map_dispatch
153159
self._dispatch_rx = running_status_receiver
154160
self._actor_factory = actor_factory
155-
self._actor: Actor | None = None
161+
self._actors: dict[int, Actor] = {}
156162
self._updates_channel = Broadcast[DispatchUpdate](
157163
name="dispatch_updates_channel", resend_latest=True
158164
)
@@ -178,7 +184,9 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
178184
options=dispatch.payload,
179185
)
180186

181-
if self._actor:
187+
actor: Actor | None = self._actors.get(self._map_dispatch(dispatch))
188+
189+
if actor:
182190
sent_str = ""
183191
if self._updates_sender is not None:
184192
sent_str = ", sent a dispatch update instead of creating a new actor"
@@ -190,8 +198,10 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
190198
)
191199
else:
192200
_logger.info("Starting actor for dispatch type %r", dispatch.type)
193-
self._actor = self._actor_factory(dispatch_update, self.new_receiver())
194-
self._actor.start()
201+
actor = self._actor_factory(dispatch_update, self.new_receiver())
202+
self._actors[self._map_dispatch(dispatch)] = actor
203+
204+
actor.start()
195205

196206
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
197207
"""Stop all actors.
@@ -200,13 +210,12 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
200210
stopping_dispatch: The dispatch that is stopping the actor.
201211
msg: The message to be passed to the actors being stopped.
202212
"""
203-
if self._actor is None:
213+
if actor := self._actors.pop(self._map_dispatch(stopping_dispatch), None):
214+
await actor.stop(msg)
215+
else:
204216
_logger.warning(
205217
"Actor for dispatch type %r is not running", stopping_dispatch.type
206218
)
207-
else:
208-
await self._actor.stop(msg)
209-
self._actor = None
210219

211220
async def _run(self) -> None:
212221
"""Wait for dispatches and handle them."""

src/frequenz/dispatch/_bg_service.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ def filter(self) -> Callable[[Dispatch], bool]:
5353
"""
5454

5555

56-
class MergeByType(MergeStrategy):
57-
"""Merge running intervals based on the dispatch type."""
56+
class MergeByX(MergeStrategy):
57+
"""Merge running intervals based on a dispatch configuration."""
5858

5959
@property
6060
@override
6161
def filter(self) -> Callable[[Dispatch], bool]:
6262
"""Get a filter function for dispatches."""
6363
return self._filter_func
6464

65-
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
66-
"""Define the criteria for checking other running dispatches."""
67-
return previous_dispatch.type == new_dispatch.type
65+
@abstractmethod
66+
def identity(self, dispatch: Dispatch) -> int:
67+
"""Identity function for the merge criteria."""
6868

6969
def _filter_func(self, new_dispatch: Dispatch) -> bool:
7070
"""Filter dispatches based on the merge strategy.
@@ -76,27 +76,40 @@ def _filter_func(self, new_dispatch: Dispatch) -> bool:
7676
if new_dispatch.started:
7777
return True
7878

79-
# pylint: disable=protected-access
8079
other_dispatches_running = any(
8180
dispatch.started
8281
for dispatch in self._scheduler.dispatches
83-
if self.criteria(dispatch, new_dispatch)
82+
if self.identity(dispatch) == self.identity(new_dispatch)
8483
)
85-
# pylint: enable=protected-access
8684

8785
return not other_dispatches_running
8886

8987

88+
class MergeByType(MergeByX):
89+
"""Merge running intervals based on the dispatch type."""
90+
91+
@override
92+
def identity(self, dispatch: Dispatch) -> int:
93+
"""Identity function for the merge criteria."""
94+
return hash(dispatch.type)
95+
96+
9097
class MergeByTypeTarget(MergeByType):
9198
"""Merge running intervals based on the dispatch type and target."""
9299

93100
@override
94-
def criteria(self, previous_dispatch: Dispatch, new_dispatch: Dispatch) -> bool:
95-
"""Define the criteria for checking other running dispatches."""
96-
return (
97-
previous_dispatch.type == new_dispatch.type
98-
and previous_dispatch.target == new_dispatch.target
99-
)
101+
def identity(self, dispatch: Dispatch) -> int:
102+
"""Identity function for the merge criteria."""
103+
return hash((dispatch.type, dispatch.target))
104+
105+
106+
class MergeByTypeID(MergeByX):
107+
"""Merge running intervals based on the dispatch type and ID."""
108+
109+
@override
110+
def identity(self, dispatch: Dispatch) -> int:
111+
"""Identity function for the merge criteria."""
112+
return hash((dispatch.type, dispatch.id))
100113

101114

102115
# pylint: disable=too-many-instance-attributes

0 commit comments

Comments
 (0)