33
44"""Helper class to manage actors based on dispatches."""
55
6+ import asyncio
67import logging
8+ from collections .abc import Callable
79from dataclasses import dataclass
8- from typing import Any , Set
10+ from typing import Any
911
10- from frequenz .channels import Receiver , Sender
12+ from frequenz .channels import Broadcast , Receiver
1113from frequenz .client .dispatch .types import TargetComponents
12- from frequenz .sdk .actor import Actor
14+ from frequenz .sdk .actor import Actor , BackgroundService
1315
1416from ._dispatch import Dispatch
1517
@@ -30,38 +32,71 @@ class DispatchUpdate:
3032 """Additional options."""
3133
3234
33- class DispatchManagingActor (Actor ):
35+ class DispatchManagingActor (BackgroundService ):
3436 """Helper class to manage actors based on dispatches.
3537
3638 Example usage:
3739
3840 ```python
3941 import os
4042 import asyncio
43+ from typing import override
4144 from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
4245 from frequenz.client.dispatch.types import TargetComponents
4346 from frequenz.client.common.microgrid.components import ComponentCategory
44-
45- from frequenz.channels import Receiver, Broadcast
47+ from frequenz.channels import Receiver, Broadcast, select, selected_from
48+ from frequenz.sdk.actor import Actor, run
4649
4750 class MyActor(Actor):
48- def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49- super().__init__()
50- self._updates_channel = updates_channel
51- self._dry_run: bool
52- self._options : dict[str, Any]
53-
51+ def __init__(
52+ self,
53+ *,
54+ name: str | None = None,
55+ ) -> None:
56+ super().__init__(name=name)
57+ self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None
58+ self._dry_run: bool = False
59+ self._options: dict[str, Any] = {}
60+
61+ @classmethod
62+ def new_with_dispatch(
63+ cls,
64+ initial_dispatch: DispatchUpdate,
65+ dispatch_updates_receiver: Receiver[DispatchUpdate],
66+ *,
67+ name: str | None = None,
68+ ) -> "Self":
69+ self = cls(name=name)
70+ self._dispatch_updates_receiver = dispatch_updates_receiver
71+ self._update_dispatch_information(initial_dispatch)
72+ return self
73+
74+ @override
5475 async def _run(self) -> None:
55- while True:
56- update = await self._updates_channel.receive()
57- print("Received update:", update)
58-
59- self.set_components(update.components)
60- self._dry_run = update.dry_run
61- self._options = update.options
76+ other_recv: Receiver[Any] = ...
6277
63- def set_components(self, components: TargetComponents) -> None:
64- match components:
78+ if self._dispatch_updates_receiver is None:
79+ async for msg in other_recv:
80+ # do stuff
81+ ...
82+ else:
83+ await self._run_with_dispatch(other_recv)
84+
85+ async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
86+ async for selected in select(self._dispatch_updates_receiver, other_recv):
87+ if selected_from(selected, self._dispatch_updates_receiver):
88+ self._update_dispatch_information(selected.message)
89+ elif selected_from(selected, other_recv):
90+ # do stuff
91+ ...
92+ else:
93+ assert False, f"Unexpected selected receiver: {selected}"
94+
95+ def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None:
96+ print("Received update:", dispatch_update)
97+ self._dry_run = dispatch_update.dry_run
98+ self._options = dispatch_update.options
99+ match dispatch_update.components:
65100 case []:
66101 print("Dispatch: Using all components")
67102 case list() as ids if isinstance(ids[0], int):
@@ -75,7 +110,7 @@ def set_components(self, components: TargetComponents) -> None:
75110 unsupported,
76111 )
77112
78- async def run ():
113+ async def main ():
79114 url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
80115 key = os.getenv("DISPATCH_API_KEY", "some-key")
81116
@@ -86,64 +121,83 @@ async def run():
86121 server_url=url,
87122 key=key
88123 )
89-
90- # Create update channel to receive dispatch update events pre-start and mid-run
91- dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
92-
93- # Start actor and give it an dispatch updates channel receiver
94- my_actor = MyActor(dispatch_updates_channel.new_receiver())
124+ dispatcher.start()
95125
96126 status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
97127
98128 managing_actor = DispatchManagingActor(
99- actor=my_actor ,
129+ actor_factory=MyActor.new_with_dispatch ,
100130 running_status_receiver=status_receiver,
101- updates_sender=dispatch_updates_channel.new_sender(),
102131 )
103132
104- await asyncio.gather(dispatcher.start(), managing_actor.start() )
133+ await run( managing_actor)
105134 ```
106135 """
107136
108137 def __init__ (
109138 self ,
110- actor : Actor | Set [ Actor ],
139+ actor_factory : Callable [[ DispatchUpdate , Receiver [ DispatchUpdate ]], Actor ],
111140 running_status_receiver : Receiver [Dispatch ],
112- updates_sender : Sender [DispatchUpdate ] | None = None ,
113141 ) -> None :
114142 """Initialize the dispatch handler.
115143
116144 Args:
117- actor: A set of actors or a single actor to manage.
145+ actor_factory: A callable that creates an actor with some initial dispatch
146+ information.
118147 running_status_receiver: The receiver for dispatch running status changes.
119- updates_sender: The sender for dispatch events
120148 """
121149 super ().__init__ ()
122150 self ._dispatch_rx = running_status_receiver
123- self ._actors : frozenset [Actor ] = frozenset (
124- [actor ] if isinstance (actor , Actor ) else actor
151+ self ._actor_factory = actor_factory
152+ self ._actor : Actor | None = None
153+ self ._updates_channel = Broadcast [DispatchUpdate ](
154+ name = "dispatch_updates_channel" , resend_latest = True
125155 )
126- self ._updates_sender = updates_sender
156+ self ._updates_sender = self ._updates_channel .new_sender ()
157+
158+ def start (self ) -> None :
159+ """Start the background service."""
160+ self ._tasks .add (asyncio .create_task (self ._run ()))
127161
128- def _start_actors (self ) -> None :
162+ async def _start_actor (self , dispatch : Dispatch ) -> None :
129163 """Start all actors."""
130- for actor in self . _actors :
131- if actor . is_running :
132- _logger . warning ( "Actor %s is already running" , actor . name )
133- else :
134- actor . start ( )
164+ dispatch_update = DispatchUpdate (
165+ components = dispatch . target ,
166+ dry_run = dispatch . dry_run ,
167+ options = dispatch . payload ,
168+ )
135169
136- async def _stop_actors (self , msg : str ) -> None :
170+ if self ._actor :
171+ sent_str = ""
172+ if self ._updates_sender is not None :
173+ sent_str = ", sent a dispatch update instead of creating a new actor"
174+ await self ._updates_sender .send (dispatch_update )
175+ _logger .warning (
176+ "Actor for dispatch type %r is already running%s" ,
177+ dispatch .type ,
178+ sent_str ,
179+ )
180+ else :
181+ _logger .info ("Starting actor for dispatch type %r" , dispatch .type )
182+ self ._actor = self ._actor_factory (
183+ dispatch_update , self ._updates_channel .new_receiver ()
184+ )
185+ self ._actor .start ()
186+
187+ async def _stop_actor (self , stopping_dispatch : Dispatch , msg : str ) -> None :
137188 """Stop all actors.
138189
139190 Args:
191+ stopping_dispatch: The dispatch that is stopping the actor.
140192 msg: The message to be passed to the actors being stopped.
141193 """
142- for actor in self ._actors :
143- if actor .is_running :
144- await actor .stop (msg )
145- else :
146- _logger .warning ("Actor %s is not running" , actor .name )
194+ if self ._actor is None :
195+ _logger .warning (
196+ "Actor for dispatch type %r is not running" , stopping_dispatch .type
197+ )
198+ else :
199+ await self ._actor .stop (msg )
200+ self ._actor = None
147201
148202 async def _run (self ) -> None :
149203 """Wait for dispatches and handle them."""
@@ -157,18 +211,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
157211 dispatch: The dispatch to handle.
158212 """
159213 if dispatch .started :
160- if self ._updates_sender is not None :
161- _logger .info ("Updated by dispatch %s" , dispatch .id )
162- await self ._updates_sender .send (
163- DispatchUpdate (
164- components = dispatch .target ,
165- dry_run = dispatch .dry_run ,
166- options = dispatch .payload ,
167- )
168- )
169-
170- _logger .info ("Started by dispatch %s" , dispatch .id )
171- self ._start_actors ()
214+ await self ._start_actor (dispatch )
172215 else :
173- _logger .info ("Stopped by dispatch %s" , dispatch .id )
174- await self ._stop_actors ("Dispatch stopped" )
216+ await self ._stop_actor (dispatch , "Dispatch stopped" )
0 commit comments