Skip to content

Commit 6bf83c4

Browse files
committed
Update and correct documentation, release-notes and README
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 2fc98c4 commit 6bf83c4

File tree

4 files changed

+179
-116
lines changed

4 files changed

+179
-116
lines changed

README.md

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,47 +21,33 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth
2121

2222
```python
2323
import os
24-
from frequenz.dispatch import Dispatcher
2524
from unittest.mock import MagicMock
25+
from datetime import timedelta
26+
27+
from frequenz.dispatch import Dispatcher, DispatchInfo, MergeByType
28+
29+
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
30+
return MagicMock(dispatch=dispatch, receiver=receiver)
2631

2732
async def run():
2833
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
2934
key = os.getenv("DISPATCH_API_KEY", "some-key")
3035

3136
microgrid_id = 1
3237

33-
dispatcher = Dispatcher(
38+
async with Dispatcher(
3439
microgrid_id=microgrid_id,
3540
server_url=url,
36-
key=key
37-
)
38-
await dispatcher.start()
39-
40-
actor = MagicMock() # replace with your actor
41-
42-
changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")
43-
44-
async for dispatch in changed_running_status_rx:
45-
if dispatch.started:
46-
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
47-
if actor.is_running:
48-
actor.reconfigure(
49-
components=dispatch.target,
50-
run_parameters=dispatch.payload, # custom actor parameters
51-
dry_run=dispatch.dry_run,
52-
until=dispatch.until,
53-
) # this will reconfigure the actor
54-
else:
55-
# this will start a new actor with the given components
56-
# and run it for the duration of the dispatch
57-
actor.start(
58-
components=dispatch.target,
59-
run_parameters=dispatch.payload, # custom actor parameters
60-
dry_run=dispatch.dry_run,
61-
until=dispatch.until,
62-
)
63-
else:
64-
actor.stop() # this will stop the actor
41+
key=key,
42+
) as dispatcher:
43+
await dispatcher.start_dispatching(
44+
dispatch_type="EXAMPLE_TYPE",
45+
actor_factory=create_actor,
46+
merge_strategy=MergeByType(),
47+
retry_interval=timedelta(seconds=10)
48+
)
49+
50+
await dispatcher
6551
```
6652

6753
## Supported Platforms

RELEASE_NOTES.md

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,50 @@ This release introduces a more flexible and powerful mechanism for managing disp
66

77
## Upgrading
88

9+
A new simplified way to manage actors has been introduced:
10+
11+
Change your code from:
12+
```python
13+
dispatcher = Dispatcher(
14+
microgrid_id=microgrid_id,
15+
server_url=url,
16+
key=key
17+
)
18+
dispatcher.start()
19+
20+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
21+
22+
managing_actor = ActorDispatcher(
23+
actor_factory=MyActor.new_with_dispatch,
24+
running_status_receiver=status_receiver,
25+
)
26+
27+
await run(managing_actor)
28+
```
29+
30+
to
31+
32+
```python
33+
async with Dispatcher(
34+
microgrid_id=microgrid_id,
35+
server_url=url,
36+
key=key
37+
) as dispatcher:
38+
await dispatcher.start_dispatching(
39+
dispatch_type="EXAMPLE_TYPE",
40+
actor_factory=MyActor.new_with_dispatch, # now async factory!
41+
merge_strategy=MergeStrategy.MergeByType,
42+
)
43+
await dispatcher
44+
```
45+
46+
Further changes:
47+
948
* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
1049
* Two properties have been replaced by methods that require a type as parameter.
1150
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
1251
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy)`.
13-
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
52+
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new_receiver function.
1453
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
1554
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
1655
* It only starts/stops a single actor at a time now instead of a set of actors.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,19 @@ async def main():
116116
117117
microgrid_id = 1
118118
119-
dispatcher = Dispatcher(
119+
async with Dispatcher(
120120
microgrid_id=microgrid_id,
121121
server_url=url,
122122
key=key
123-
)
124-
dispatcher.start()
125-
126-
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
123+
) as dispatcher:
124+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
127125
128-
managing_actor = ActorDispatcher(
129-
actor_factory=MyActor.new_with_dispatch,
130-
running_status_receiver=status_receiver,
131-
)
126+
managing_actor = ActorDispatcher(
127+
actor_factory=MyActor.new_with_dispatch,
128+
running_status_receiver=status_receiver,
129+
)
132130
133-
await run(managing_actor)
131+
await run(managing_actor)
134132
```
135133
"""
136134

src/frequenz/dispatch/_dispatcher.py

Lines changed: 114 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ class Dispatcher(BackgroundService):
2727
"""A highlevel interface for the dispatch API.
2828
2929
This class provides a highlevel interface to the dispatch API.
30-
It provides two receiver functions:
30+
It provides receivers for various events and management of actors based on
31+
dispatches.
32+
33+
The receivers shortly explained:
3134
3235
* [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
3336
Receives an event whenever a dispatch is created, updated or deleted.
@@ -41,6 +44,35 @@ class Dispatcher(BackgroundService):
4144
Any change that could potentially require the consumer to start, stop or
4245
reconfigure itself will cause a message to be sent.
4346
47+
Example: Managing an actor
48+
```python
49+
import os
50+
from frequenz.dispatch import Dispatcher, MergeByType
51+
from unittest.mock import MagicMock
52+
53+
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
54+
return MagicMock(dispatch=dispatch, receiver=receiver)
55+
56+
async def run():
57+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
58+
key = os.getenv("DISPATCH_API_KEY", "some-key")
59+
60+
microgrid_id = 1
61+
62+
async with Dispatcher(
63+
microgrid_id=microgrid_id,
64+
server_url=url,
65+
key=key
66+
) as dispatcher:
67+
dispatcher.start_dispatching(
68+
dispatch_type="DISPATCH_TYPE",
69+
actor_factory=create_actor,
70+
merge_strategy=MergeByType(),
71+
)
72+
73+
await dispatcher
74+
```
75+
4476
Example: Processing running state change dispatches
4577
```python
4678
import os
@@ -53,38 +85,36 @@ async def run():
5385
5486
microgrid_id = 1
5587
56-
dispatcher = Dispatcher(
88+
async with Dispatcher(
5789
microgrid_id=microgrid_id,
5890
server_url=url,
5991
key=key
60-
)
61-
await dispatcher.start()
62-
63-
actor = MagicMock() # replace with your actor
64-
65-
changed_running_status = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
66-
67-
async for dispatch in changed_running_status:
68-
if dispatch.started:
69-
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
70-
if actor.is_running:
71-
actor.reconfigure(
72-
components=dispatch.target,
73-
run_parameters=dispatch.payload, # custom actor parameters
74-
dry_run=dispatch.dry_run,
75-
until=dispatch.until,
76-
) # this will reconfigure the actor
92+
) as dispatcher:
93+
actor = MagicMock() # replace with your actor
94+
95+
rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
96+
97+
async for dispatch in rs_receiver:
98+
if dispatch.started:
99+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
100+
if actor.is_running:
101+
actor.reconfigure(
102+
components=dispatch.target,
103+
run_parameters=dispatch.payload, # custom actor parameters
104+
dry_run=dispatch.dry_run,
105+
until=dispatch.until,
106+
) # this will reconfigure the actor
107+
else:
108+
# this will start a new actor with the given components
109+
# and run it for the duration of the dispatch
110+
actor.start(
111+
components=dispatch.target,
112+
run_parameters=dispatch.payload, # custom actor parameters
113+
dry_run=dispatch.dry_run,
114+
until=dispatch.until,
115+
)
77116
else:
78-
# this will start a new actor with the given components
79-
# and run it for the duration of the dispatch
80-
actor.start(
81-
components=dispatch.target,
82-
run_parameters=dispatch.payload, # custom actor parameters
83-
dry_run=dispatch.dry_run,
84-
until=dispatch.until,
85-
)
86-
else:
87-
actor.stop() # this will stop the actor
117+
actor.stop() # this will stop the actor
88118
```
89119
90120
Example: Getting notification about dispatch lifecycle events
@@ -100,25 +130,23 @@ async def run():
100130
101131
microgrid_id = 1
102132
103-
dispatcher = Dispatcher(
133+
async with Dispatcher(
104134
microgrid_id=microgrid_id,
105135
server_url=url,
106-
key=key
107-
)
108-
await dispatcher.start() # this will start the actor
109-
110-
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
111-
112-
async for event in events_receiver:
113-
match event:
114-
case Created(dispatch):
115-
print(f"A dispatch was created: {dispatch}")
116-
case Deleted(dispatch):
117-
print(f"A dispatch was deleted: {dispatch}")
118-
case Updated(dispatch):
119-
print(f"A dispatch was updated: {dispatch}")
120-
case _ as unhandled:
121-
assert_never(unhandled)
136+
key=key,
137+
) as dispatcher:
138+
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
139+
140+
async for event in events_receiver:
141+
match event:
142+
case Created(dispatch):
143+
print(f"A dispatch was created: {dispatch}")
144+
case Deleted(dispatch):
145+
print(f"A dispatch was deleted: {dispatch}")
146+
case Updated(dispatch):
147+
print(f"A dispatch was updated: {dispatch}")
148+
case _ as unhandled:
149+
assert_never(unhandled)
122150
```
123151
124152
Example: Creating a new dispatch and then modifying it.
@@ -138,35 +166,33 @@ async def run():
138166
139167
microgrid_id = 1
140168
141-
dispatcher = Dispatcher(
169+
async with Dispatcher(
142170
microgrid_id=microgrid_id,
143171
server_url=url,
144-
key=key
145-
)
146-
await dispatcher.start() # this will start the actor
147-
148-
# Create a new dispatch
149-
new_dispatch = await dispatcher.client.create(
150-
microgrid_id=microgrid_id,
151-
type="ECHO_FREQUENCY", # replace with your own type
152-
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
153-
duration=timedelta(minutes=5),
154-
target=ComponentCategory.INVERTER,
155-
payload={"font": "Times New Roman"}, # Arbitrary payload data
156-
)
157-
158-
# Modify the dispatch
159-
await dispatcher.client.update(
160-
microgrid_id=microgrid_id,
161-
dispatch_id=new_dispatch.id,
162-
new_fields={"duration": timedelta(minutes=10)}
163-
)
164-
165-
# Validate the modification
166-
modified_dispatch = await dispatcher.client.get(
167-
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
168-
)
169-
assert modified_dispatch.duration == timedelta(minutes=10)
172+
key=key,
173+
) as dispatcher:
174+
# Create a new dispatch
175+
new_dispatch = await dispatcher.client.create(
176+
microgrid_id=microgrid_id,
177+
type="ECHO_FREQUENCY", # replace with your own type
178+
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
179+
duration=timedelta(minutes=5),
180+
target=ComponentCategory.INVERTER,
181+
payload={"font": "Times New Roman"}, # Arbitrary payload data
182+
)
183+
184+
# Modify the dispatch
185+
await dispatcher.client.update(
186+
microgrid_id=microgrid_id,
187+
dispatch_id=new_dispatch.id,
188+
new_fields={"duration": timedelta(minutes=10)}
189+
)
190+
191+
# Validate the modification
192+
modified_dispatch = await dispatcher.client.get(
193+
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
194+
)
195+
assert modified_dispatch.duration == timedelta(minutes=10)
170196
```
171197
"""
172198

@@ -247,13 +273,27 @@ async def start_dispatching(
247273
) -> None:
248274
"""Manage actors for a given dispatch type.
249275
250-
Creates and manages an ActorDispatcher for the given type that will
276+
Creates and manages an
277+
[`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
251278
start, stop and reconfigure actors based on received dispatches.
252279
253280
You can await the `Dispatcher` instance to block until all types
254281
registered with `start_dispatching()` are stopped using
255282
`stop_dispatching()`
256283
284+
"Merging" means that when multiple dispatches are active at the same time,
285+
the intervals are merged into one.
286+
287+
This also decides how instances are mapped from dispatches to actors:
288+
289+
* [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
290+
one single instance identified by the dispatch type.
291+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
292+
dispatch maps to an instance identified by the dispatch type and target.
293+
So different dispatches with equal type and target will map to the same
294+
instance.
295+
* `None` — No merging, each dispatch maps to a separate instance.
296+
257297
Args:
258298
dispatch_type: The type of the dispatch to manage.
259299
actor_factory: The factory to create actors.

0 commit comments

Comments
 (0)