Skip to content

Commit cd6e1ce

Browse files
llucaxMarenz
authored andcommitted
Re-create actors on dispatch updates
This commit modifies the `DispatchManagingActor` to re-create the actor when a new dispatch is received and the actor should start running instead of just start/stop the actor. To create the actor a factory function is used, which passes the initial dispatch information to the actor, so it can be properly initialized instead of having the initialization in 2 steps, the creation and the receiving of the dispatch update. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 96c8bc8 commit cd6e1ce

File tree

2 files changed

+163
-87
lines changed

2 files changed

+163
-87
lines changed

src/frequenz/dispatch/_managing_actor.py

Lines changed: 115 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33

44
"""Helper class to manage actors based on dispatches."""
55

6+
import asyncio
67
import logging
8+
from collections.abc import Callable
79
from dataclasses import dataclass
8-
from typing import Any, Set
10+
from typing import Any
911

10-
from frequenz.channels import Receiver, Sender
12+
from frequenz.channels import Broadcast, Receiver
1113
from frequenz.client.dispatch.types import TargetComponents
12-
from frequenz.sdk.actor import Actor
14+
from frequenz.sdk.actor import Actor, BackgroundService
1315

1416
from ._dispatch import Dispatch
1517

@@ -30,38 +32,71 @@ class DispatchUpdate:
3032
"""Additional options."""
3133

3234

33-
class DispatchManagingActor(Actor):
35+
class DispatchManagingActor(BackgroundService):
3436
"""Helper class to manage actors based on dispatches.
3537
3638
Example usage:
3739
3840
```python
3941
import os
4042
import asyncio
43+
from typing import override
4144
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
4245
from frequenz.client.dispatch.types import TargetComponents
4346
from frequenz.client.common.microgrid.components import ComponentCategory
44-
45-
from frequenz.channels import Receiver, Broadcast
47+
from frequenz.channels import Receiver, Broadcast, select, selected_from
48+
from frequenz.sdk.actor import Actor, run
4649
4750
class MyActor(Actor):
48-
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49-
super().__init__()
50-
self._updates_channel = updates_channel
51-
self._dry_run: bool
52-
self._options : dict[str, Any]
53-
51+
def __init__(
52+
self,
53+
*,
54+
name: str | None = None,
55+
) -> None:
56+
super().__init__(name=name)
57+
self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None
58+
self._dry_run: bool = False
59+
self._options: dict[str, Any] = {}
60+
61+
@classmethod
62+
def new_with_dispatch(
63+
cls,
64+
initial_dispatch: DispatchUpdate,
65+
dispatch_updates_receiver: Receiver[DispatchUpdate],
66+
*,
67+
name: str | None = None,
68+
) -> "Self":
69+
self = cls(name=name)
70+
self._dispatch_updates_receiver = dispatch_updates_receiver
71+
self._update_dispatch_information(initial_dispatch)
72+
return self
73+
74+
@override
5475
async def _run(self) -> None:
55-
while True:
56-
update = await self._updates_channel.receive()
57-
print("Received update:", update)
58-
59-
self.set_components(update.components)
60-
self._dry_run = update.dry_run
61-
self._options = update.options
76+
other_recv: Receiver[Any] = ...
6277
63-
def set_components(self, components: TargetComponents) -> None:
64-
match components:
78+
if self._dispatch_updates_receiver is None:
79+
async for msg in other_recv:
80+
# do stuff
81+
...
82+
else:
83+
await self._run_with_dispatch(other_recv)
84+
85+
async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
86+
async for selected in select(self._dispatch_updates_receiver, other_recv):
87+
if selected_from(selected, self._dispatch_updates_receiver):
88+
self._update_dispatch_information(selected.message)
89+
elif selected_from(selected, other_recv):
90+
# do stuff
91+
...
92+
else:
93+
assert False, f"Unexpected selected receiver: {selected}"
94+
95+
def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None:
96+
print("Received update:", dispatch_update)
97+
self._dry_run = dispatch_update.dry_run
98+
self._options = dispatch_update.options
99+
match dispatch_update.components:
65100
case []:
66101
print("Dispatch: Using all components")
67102
case list() as ids if isinstance(ids[0], int):
@@ -75,7 +110,7 @@ def set_components(self, components: TargetComponents) -> None:
75110
unsupported,
76111
)
77112
78-
async def run():
113+
async def main():
79114
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
80115
key = os.getenv("DISPATCH_API_KEY", "some-key")
81116
@@ -86,64 +121,92 @@ async def run():
86121
server_url=url,
87122
key=key
88123
)
89-
90-
# Create update channel to receive dispatch update events pre-start and mid-run
91-
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
92-
93-
# Start actor and give it an dispatch updates channel receiver
94-
my_actor = MyActor(dispatch_updates_channel.new_receiver())
124+
dispatcher.start()
95125
96126
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
97127
128+
def actor_factory(initial_dispatch, dispatch_receiver):
129+
return MyActor.new_with_dispatch(initial_dispatch, dispatch_receiver)
130+
98131
managing_actor = DispatchManagingActor(
99-
actor=my_actor,
132+
actor_factory=actor_factory,
100133
running_status_receiver=status_receiver,
101-
updates_sender=dispatch_updates_channel.new_sender(),
102134
)
103135
104-
await asyncio.gather(dispatcher.start(), managing_actor.start())
136+
await run(managing_actor)
105137
```
106138
"""
107139

108140
def __init__(
109141
self,
110-
actor: Actor | Set[Actor],
142+
actor_factory: Callable[[DispatchUpdate, Receiver[DispatchUpdate]], Actor],
111143
running_status_receiver: Receiver[Dispatch],
112-
updates_sender: Sender[DispatchUpdate] | None = None,
113144
) -> None:
114145
"""Initialize the dispatch handler.
115146
116147
Args:
117-
actor: A set of actors or a single actor to manage.
148+
actor_factory: A callable that creates an actor with some initial dispatch
149+
information.
118150
running_status_receiver: The receiver for dispatch running status changes.
119-
updates_sender: The sender for dispatch events
120151
"""
121152
super().__init__()
122153
self._dispatch_rx = running_status_receiver
123-
self._actors: frozenset[Actor] = frozenset(
124-
[actor] if isinstance(actor, Actor) else actor
154+
self._actor_factory = actor_factory
155+
self._actor: Actor | None = None
156+
self._updates_channel = Broadcast[DispatchUpdate](
157+
name="dispatch_updates_channel", resend_latest=True
125158
)
126-
self._updates_sender = updates_sender
159+
self._updates_sender = self._updates_channel.new_sender()
160+
161+
def start(self) -> None:
162+
"""Start the background service."""
163+
self._tasks.add(asyncio.create_task(self._run()))
164+
165+
def new_receiver(self) -> Receiver[DispatchUpdate]:
166+
"""Create a new receiver for dispatch updates.
167+
168+
Returns:
169+
A new receiver for dispatch updates.
170+
"""
171+
return self._updates_channel.new_receiver()
127172

128-
def _start_actors(self) -> None:
173+
async def _start_actor(self, dispatch: Dispatch) -> None:
129174
"""Start all actors."""
130-
for actor in self._actors:
131-
if actor.is_running:
132-
_logger.warning("Actor %s is already running", actor.name)
133-
else:
134-
actor.start()
175+
dispatch_update = DispatchUpdate(
176+
components=dispatch.target,
177+
dry_run=dispatch.dry_run,
178+
options=dispatch.payload,
179+
)
180+
181+
if self._actor:
182+
sent_str = ""
183+
if self._updates_sender is not None:
184+
sent_str = ", sent a dispatch update instead of creating a new actor"
185+
await self._updates_sender.send(dispatch_update)
186+
_logger.warning(
187+
"Actor for dispatch type %r is already running%s",
188+
dispatch.type,
189+
sent_str,
190+
)
191+
else:
192+
_logger.info("Starting actor for dispatch type %r", dispatch.type)
193+
self._actor = self._actor_factory(dispatch_update, self.new_receiver())
194+
self._actor.start()
135195

136-
async def _stop_actors(self, msg: str) -> None:
196+
async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
137197
"""Stop all actors.
138198
139199
Args:
200+
stopping_dispatch: The dispatch that is stopping the actor.
140201
msg: The message to be passed to the actors being stopped.
141202
"""
142-
for actor in self._actors:
143-
if actor.is_running:
144-
await actor.stop(msg)
145-
else:
146-
_logger.warning("Actor %s is not running", actor.name)
203+
if self._actor is None:
204+
_logger.warning(
205+
"Actor for dispatch type %r is not running", stopping_dispatch.type
206+
)
207+
else:
208+
await self._actor.stop(msg)
209+
self._actor = None
147210

148211
async def _run(self) -> None:
149212
"""Wait for dispatches and handle them."""
@@ -157,18 +220,6 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
157220
dispatch: The dispatch to handle.
158221
"""
159222
if dispatch.started:
160-
if self._updates_sender is not None:
161-
_logger.info("Updated by dispatch %s", dispatch.id)
162-
await self._updates_sender.send(
163-
DispatchUpdate(
164-
components=dispatch.target,
165-
dry_run=dispatch.dry_run,
166-
options=dispatch.payload,
167-
)
168-
)
169-
170-
_logger.info("Started by dispatch %s", dispatch.id)
171-
self._start_actors()
223+
await self._start_actor(dispatch)
172224
else:
173-
_logger.info("Stopped by dispatch %s", dispatch.id)
174-
await self._stop_actors("Dispatch stopped")
225+
await self._stop_actor(dispatch, "Dispatch stopped")

0 commit comments

Comments
 (0)