Skip to content

Commit 3694295

Browse files
committed
WIP: Re-create actors on dispatch updates
This commit modifies the `DispatchManagingActor` to re-create the actor when a new dispatch is received and the actor should start running instead of just start/stop the actor. To create the actor a factory function is used, which passes the initial dispatch information to the actor, so it can be properly initialized instead of having the initialization in 2 steps, the creation and the receiving of the dispatch update.
1 parent 1c88c58 commit 3694295

File tree

1 file changed

+118
-51
lines changed

1 file changed

+118
-51
lines changed

src/frequenz/dispatch/_managing_actor.py

Lines changed: 118 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44
"""Helper class to manage actors based on dispatches."""
55

66
import logging
7+
from abc import abstractmethod
8+
from collections.abc import Callable
79
from dataclasses import dataclass
8-
from typing import Any, Set
10+
from typing import Any
911

1012
from frequenz.channels import Receiver, Sender
1113
from frequenz.client.dispatch.types import TargetComponents
1214
from frequenz.sdk.actor import Actor
15+
from typing_extensions import override
1316

1417
from ._dispatch import Dispatch
1518

@@ -38,29 +41,62 @@ class DispatchManagingActor(Actor):
3841
```python
3942
import os
4043
import asyncio
41-
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
44+
from typing import override
45+
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate, DispatchableActor
4246
from frequenz.client.dispatch.types import TargetComponents
4347
from frequenz.client.common.microgrid.components import ComponentCategory
44-
45-
from frequenz.channels import Receiver, Broadcast
48+
from frequenz.channels import Receiver, Broadcast, select, selected_from
49+
from frequenz.sdk.actor import Actor, run
4650
4751
class MyActor(Actor):
48-
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49-
super().__init__()
50-
self._updates_channel = updates_channel
51-
self._dry_run: bool
52-
self._options : dict[str, Any]
53-
52+
def __init__(
53+
self,
54+
*,
55+
name: str | None = None,
56+
) -> None:
57+
super().__init__(name=name)
58+
self._dispatch_updates_receiver: Receiver[DispatchUpdate] | None = None
59+
self._dry_run: bool = False
60+
self._options: dict[str, Any] = {}
61+
62+
@classmethod
63+
def new_with_dispatch(
64+
cls,
65+
initial_dispatch: DispatchUpdate,
66+
dispatch_updates_receiver: Receiver[DispatchUpdate],
67+
*,
68+
name: str | None = None,
69+
) -> Self:
70+
self = cls(name=name)
71+
self._dispatch_updates_receiver = dispatch_updates_receiver
72+
self._update_dispatch_information(initial_dispatch)
73+
return self
74+
75+
@override
5476
async def _run(self) -> None:
55-
while True:
56-
update = await self._updates_channel.receive()
57-
print("Received update:", update)
77+
other_recv: Receiver[Any] = ...
5878
59-
self.set_components(update.components)
60-
self._dry_run = update.dry_run
61-
self._options = update.options
62-
63-
def set_components(self, components: TargetComponents) -> None:
79+
if self._dispatch_updates_receiver is None:
80+
async for msg in other:
81+
# do stuff
82+
...
83+
else:
84+
await self._run_with_dispatch(other_recv)
85+
86+
async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
87+
async for selected in select(self._dispatch_updates_receiver, other_recv):
88+
if selected_from(selected, self._dispatch_updates_receiver):
89+
self._update_dispatch_information(selected.message)
90+
elif selected_from(selected, other_recv):
91+
# do stuff
92+
...
93+
else:
94+
assert False, f"Unexpected selected receiver: {selected}"
95+
96+
def _update_dispatch_information(self, dispatch_update: DispatchUpdate) -> None:
97+
print("Received update:", dispatch_update)
98+
self._dry_run = dispatch_update.dry_run
99+
self._options = dispatch_update.options
64100
match components:
65101
case [int(), *_] as component_ids:
66102
print("Dispatch: Setting components to %s", components)
@@ -84,6 +120,7 @@ async def run():
84120
server_url=url,
85121
key=key
86122
)
123+
dispatcher.start()
87124
88125
# Create update channel to receive dispatch update events pre-start and mid-run
89126
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
@@ -94,58 +131,70 @@ async def run():
94131
status_receiver = dispatcher.running_status_change.new_receiver()
95132
96133
managing_actor = DispatchManagingActor(
97-
actor=my_actor,
134+
actor_factory=labda initial_dispatch: MyActor.new_with_dispatch(
135+
initial_dispatch, dispatch_updates_channel.new_receiver(),
136+
),
98137
dispatch_type="EXAMPLE",
99138
running_status_receiver=status_receiver,
100139
updates_sender=dispatch_updates_channel.new_sender(),
101140
)
102141
103-
await asyncio.gather(dispatcher.start(), managing_actor.start())
142+
await run(managing_actor)
104143
```
105144
"""
106145

107146
def __init__(
108147
self,
109-
actor: Actor | Set[Actor],
148+
actor_factory: Callable[[DispatchUpdate], Actor],
110149
dispatch_type: str,
111150
running_status_receiver: Receiver[Dispatch],
112151
updates_sender: Sender[DispatchUpdate] | None = None,
113152
) -> None:
114153
"""Initialize the dispatch handler.
115154
116155
Args:
117-
actor: A set of actors or a single actor to manage.
156+
actor_factory: A callable that creates an actor with some initial dispatch
157+
information.
118158
dispatch_type: The type of dispatches to handle.
119159
running_status_receiver: The receiver for dispatch running status changes.
120160
updates_sender: The sender for dispatch events
121161
"""
122162
super().__init__()
123163
self._dispatch_rx = running_status_receiver
124-
self._actors: frozenset[Actor] = frozenset(
125-
[actor] if isinstance(actor, Actor) else actor
126-
)
164+
self._actor_factory = actor_factory
165+
self._actor: Actor | None = None
127166
self._dispatch_type = dispatch_type
128167
self._updates_sender = updates_sender
129168

130-
def _start_actors(self) -> None:
169+
async def _start_actor(self, dispatch_update: DispatchUpdate) -> None:
131170
"""Start all actors."""
132-
for actor in self._actors:
133-
if actor.is_running:
134-
_logger.warning("Actor %s is already running", actor.name)
135-
else:
136-
actor.start()
171+
if self._actor is None:
172+
sent_str = ""
173+
if self._updates_sender is not None:
174+
sent_str = ", sent a dispatch update instead of creating a new actor"
175+
await self._updates_sender.send(dispatch_update)
176+
_logger.warning(
177+
"Actor for dispatch type %r is already running%s",
178+
self._dispatch_type,
179+
sent_str,
180+
)
181+
else:
182+
self._actor = self._actor_factory(dispatch_update)
183+
self._actor.start()
137184

138-
async def _stop_actors(self, msg: str) -> None:
185+
async def _stop_actor(self, msg: str) -> None:
139186
"""Stop all actors.
140187
141188
Args:
142189
msg: The message to be passed to the actors being stopped.
143190
"""
144-
for actor in self._actors:
145-
if actor.is_running:
146-
await actor.stop(msg)
147-
else:
148-
_logger.warning("Actor %s is not running", actor.name)
191+
if self._actor is None:
192+
_logger.warning(
193+
"Actor for dispatch type %r is not running", self._dispatch_type
194+
)
195+
else:
196+
await self._actor.stop(msg)
197+
self._actor = None
149198

150199
async def _run(self) -> None:
151200
"""Wait for dispatches and handle them."""
@@ -159,22 +208,40 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None:
159208
dispatch: The dispatch to handle.
160209
"""
161210
if dispatch.type != self._dispatch_type:
162-
_logger.debug("Ignoring dispatch %s", dispatch.id)
211+
_logger.debug(
212+
"Ignoring dispatch %s, handled type is %r but received %r",
213+
dispatch.id,
214+
self._dispatch_type,
215+
dispatch.type,
216+
)
163217
return
164218

165219
if dispatch.started:
166-
if self._updates_sender is not None:
167-
_logger.info("Updated by dispatch %s", dispatch.id)
168-
await self._updates_sender.send(
169-
DispatchUpdate(
170-
components=dispatch.target,
171-
dry_run=dispatch.dry_run,
172-
options=dispatch.payload,
173-
)
220+
dispatch_update = DispatchUpdate(
221+
components=dispatch.target,
222+
dry_run=dispatch.dry_run,
223+
options=dispatch.payload,
224+
)
225+
if self._actor is None:
226+
_logger.info(
227+
"A new dispatch with ID %s became active for type %r and the "
228+
"actor was not running, starting...",
229+
dispatch.id,
230+
self._dispatch_type,
174231
)
175-
176-
_logger.info("Started by dispatch %s", dispatch.id)
177-
self._start_actors()
232+
self._actor = self._actor_factory(dispatch_update)
233+
elif self._updates_sender is not None:
234+
_logger.info(
235+
"A new dispatch with ID %s became active for type %r and the "
236+
"actor was running, sending update...",
237+
dispatch.id,
238+
self._dispatch_type,
239+
)
240+
await self._updates_sender.send(dispatch_update)
178241
else:
179-
_logger.info("Stopped by dispatch %s", dispatch.id)
180-
await self._stop_actors("Dispatch stopped")
242+
_logger.info(
243+
"Actor for dispatch type %r stopped by dispatch ID %s",
244+
self._dispatch_type,
245+
dispatch.id,
246+
)
247+
await self._stop_actor("Dispatch stopped")

0 commit comments

Comments
 (0)