33
44"""Helper class to manage actors based on dispatches."""
55
6+ import asyncio
67import logging
8+ from collections .abc import Callable
79from dataclasses import dataclass
8- from typing import Any , Set
10+ from typing import Any
911
10- from frequenz .channels import Receiver , Sender
12+ from frequenz .channels import Broadcast , Receiver
1113from frequenz .client .dispatch .types import TargetComponents
12- from frequenz .sdk .actor import Actor
14+ from frequenz .sdk .actor import Actor , BackgroundService
1315
1416from ._dispatch import Dispatch
1517
@@ -30,38 +32,71 @@ class DispatchUpdate:
3032 """Additional options."""
3133
3234
33- class DispatchManagingActor (Actor ):
35+ class DispatchManagingActor (BackgroundService ):
3436 """Helper class to manage actors based on dispatches.
3537
3638 Example usage:
3739
3840 ```python
3941 import os
4042 import asyncio
43+ from typing import override
4144 from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
4245 from frequenz.client.dispatch.types import TargetComponents
4346 from frequenz.client.common.microgrid.components import ComponentCategory
44-
45- from frequenz.channels import Receiver, Broadcast
47+ from frequenz.channels import Receiver, Broadcast, select, selected_from
48+ from frequenz.sdk.actor import Actor, run
4649
4750 class MyActor(Actor):
48- def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49- super().__init__()
50- self._updates_channel = updates_channel
51- self._dry_run: bool
52- self._options : dict[str, Any]
53-
51+ def __init__(
52+ self,
53+ *,
54+ name: str | None = None,
55+ ) -> None:
56+ super().__init__(name=name)
57+ self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None
58+ self._dry_run: bool = False
59+ self._options: dict[str, Any] = {}
60+
61+ @classmethod
62+ def new_with_dispatch(
63+ cls,
64+ initial_dispatch: DispatchUpdate,
65+ dispatch_updates_receiver: Receiver[DispatchUpdate],
66+ *,
67+ name: str | None = None,
68+ ) -> "Self":
69+ self = cls(name=name)
70+ self._dispatch_updates_receiver = dispatch_updates_receiver
71+ self._update_dispatch_information(initial_dispatch)
72+ return self
73+
74+ @override
5475 async def _run(self) -> None:
55- while True:
56- update = await self._updates_channel.receive()
57- print("Received update:", update)
58-
59- self.set_components(update.components)
60- self._dry_run = update.dry_run
61- self._options = update.options
76+ other_recv: Receiver[Any] = ...
6277
63- def set_components(self, components: TargetComponents) -> None:
64- match components:
78+ if self._dispatch_updates_receiver is None:
79+ async for msg in other_recv:
80+ # do stuff
81+ ...
82+ else:
83+ await self._run_with_dispatch(other_recv)
84+
85+ async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
86+ async for selected in select(self._dispatch_updates_receiver, other_recv):
87+ if selected_from(selected, self._dispatch_updates_receiver):
88+ self._update_dispatch_information(selected.message)
89+ elif selected_from(selected, other_recv):
90+ # do stuff
91+ ...
92+ else:
93+ assert False, f"Unexpected selected receiver: {selected}"
94+
95+ def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None:
96+ print("Received update:", dispatch_update)
97+ self._dry_run = dispatch_update.dry_run
98+ self._options = dispatch_update.options
99+ match dispatch_update.components:
65100 case []:
66101 print("Dispatch: Using all components")
67102 case list() as ids if isinstance(ids[0], int):
@@ -75,7 +110,7 @@ def set_components(self, components: TargetComponents) -> None:
75110 unsupported,
76111 )
77112
78- async def run ():
113+ async def main ():
79114 url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
80115 key = os.getenv("DISPATCH_API_KEY", "some-key")
81116
@@ -86,64 +121,92 @@ async def run():
86121 server_url=url,
87122 key=key
88123 )
89-
90- # Create update channel to receive dispatch update events pre-start and mid-run
91- dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
92-
93- # Start actor and give it an dispatch updates channel receiver
94- my_actor = MyActor(dispatch_updates_channel.new_receiver())
124+ dispatcher.start()
95125
96126 status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
97127
128+ def actor_factory(initial_dispatch, dispatch_receiver):
129+ return MyActor.new_with_dispatch(initial_dispatch, dispatch_receiver)
130+
98131 managing_actor = DispatchManagingActor(
99- actor=my_actor ,
132+ actor_factory=actor_factory ,
100133 running_status_receiver=status_receiver,
101- updates_sender=dispatch_updates_channel.new_sender(),
102134 )
103135
104- await asyncio.gather(dispatcher.start(), managing_actor.start() )
136+ await run( managing_actor)
105137 ```
106138 """
107139
108140 def __init__ (
109141 self ,
110- actor : Actor | Set [ Actor ],
142+ actor_factory : Callable [[ DispatchUpdate , Receiver [ DispatchUpdate ]], Actor ],
111143 running_status_receiver : Receiver [Dispatch ],
112- updates_sender : Sender [DispatchUpdate ] | None = None ,
113144 ) -> None :
114145 """Initialize the dispatch handler.
115146
116147 Args:
117- actor: A set of actors or a single actor to manage.
148+ actor_factory: A callable that creates an actor with some initial dispatch
149+ information.
118150 running_status_receiver: The receiver for dispatch running status changes.
119- updates_sender: The sender for dispatch events
120151 """
121152 super ().__init__ ()
122153 self ._dispatch_rx = running_status_receiver
123- self ._actors : frozenset [Actor ] = frozenset (
124- [actor ] if isinstance (actor , Actor ) else actor
154+ self ._actor_factory = actor_factory
155+ self ._actor : Actor | None = None
156+ self ._updates_channel = Broadcast [DispatchUpdate ](
157+ name = "dispatch_updates_channel" , resend_latest = True
125158 )
126- self ._updates_sender = updates_sender
159+ self ._updates_sender = self ._updates_channel .new_sender ()
160+
161+ def start (self ) -> None :
162+ """Start the background service."""
163+ self ._tasks .add (asyncio .create_task (self ._run ()))
164+
165+ def new_receiver (self ) -> Receiver [DispatchUpdate ]:
166+ """Create a new receiver for dispatch updates.
167+
168+ Returns:
169+ A new receiver for dispatch updates.
170+ """
171+ return self ._updates_channel .new_receiver ()
127172
128- def _start_actors (self ) -> None :
173+ async def _start_actor (self , dispatch : Dispatch ) -> None :
129174 """Start all actors."""
130- for actor in self ._actors :
131- if actor .is_running :
132- _logger .warning ("Actor %s is already running" , actor .name )
133- else :
134- actor .start ()
175+ dispatch_update = DispatchUpdate (
176+ components = dispatch .target ,
177+ dry_run = dispatch .dry_run ,
178+ options = dispatch .payload ,
179+ )
180+
181+ if self ._actor :
182+ sent_str = ""
183+ if self ._updates_sender is not None :
184+ sent_str = ", sent a dispatch update instead of creating a new actor"
185+ await self ._updates_sender .send (dispatch_update )
186+ _logger .warning (
187+ "Actor for dispatch type %r is already running%s" ,
188+ dispatch .type ,
189+ sent_str ,
190+ )
191+ else :
192+ _logger .info ("Starting actor for dispatch type %r" , dispatch .type )
193+ self ._actor = self ._actor_factory (dispatch_update , self .new_receiver ())
194+ self ._actor .start ()
135195
136- async def _stop_actors (self , msg : str ) -> None :
196+ async def _stop_actor (self , stopping_dispatch : Dispatch , msg : str ) -> None :
137197 """Stop all actors.
138198
139199 Args:
200+ stopping_dispatch: The dispatch that is stopping the actor.
140201 msg: The message to be passed to the actors being stopped.
141202 """
142- for actor in self ._actors :
143- if actor .is_running :
144- await actor .stop (msg )
145- else :
146- _logger .warning ("Actor %s is not running" , actor .name )
203+ if self ._actor is None :
204+ _logger .warning (
205+ "Actor for dispatch type %r is not running" , stopping_dispatch .type
206+ )
207+ else :
208+ await self ._actor .stop (msg )
209+ self ._actor = None
147210
148211 async def _run (self ) -> None :
149212 """Wait for dispatches and handle them."""
@@ -157,18 +220,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
157220 dispatch: The dispatch to handle.
158221 """
159222 if dispatch .started :
160- if self ._updates_sender is not None :
161- _logger .info ("Updated by dispatch %s" , dispatch .id )
162- await self ._updates_sender .send (
163- DispatchUpdate (
164- components = dispatch .target ,
165- dry_run = dispatch .dry_run ,
166- options = dispatch .payload ,
167- )
168- )
169-
170- _logger .info ("Started by dispatch %s" , dispatch .id )
171- self ._start_actors ()
223+ await self ._start_actor (dispatch )
172224 else :
173- _logger .info ("Stopped by dispatch %s" , dispatch .id )
174- await self ._stop_actors ("Dispatch stopped" )
225+ await self ._stop_actor (dispatch , "Dispatch stopped" )
0 commit comments