Skip to content

Commit b651e17

Browse files
committed
Update and correct documentation, release-notes and README
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 22361c8 commit b651e17

File tree

4 files changed

+181
-116
lines changed

4 files changed

+181
-116
lines changed

README.md

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,47 +21,33 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth
2121

2222
```python
2323
import os
24-
from frequenz.dispatch import Dispatcher
2524
from unittest.mock import MagicMock
25+
from datetime import timedelta
26+
27+
from frequenz.dispatch import Dispatcher, DispatchInfo, MergeByType
28+
29+
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
30+
return MagicMock(dispatch=dispatch, receiver=receiver)
2631

2732
async def run():
2833
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
2934
key = os.getenv("DISPATCH_API_KEY", "some-key")
3035

3136
microgrid_id = 1
3237

33-
dispatcher = Dispatcher(
38+
async with Dispatcher(
3439
microgrid_id=microgrid_id,
3540
server_url=url,
36-
key=key
37-
)
38-
await dispatcher.start()
39-
40-
actor = MagicMock() # replace with your actor
41-
42-
changed_running_status_rx = dispatcher.new_running_state_event_receiver("MY_TYPE")
43-
44-
async for dispatch in changed_running_status_rx:
45-
if dispatch.started:
46-
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
47-
if actor.is_running:
48-
actor.reconfigure(
49-
components=dispatch.target,
50-
run_parameters=dispatch.payload, # custom actor parameters
51-
dry_run=dispatch.dry_run,
52-
until=dispatch.until,
53-
) # this will reconfigure the actor
54-
else:
55-
# this will start a new actor with the given components
56-
# and run it for the duration of the dispatch
57-
actor.start(
58-
components=dispatch.target,
59-
run_parameters=dispatch.payload, # custom actor parameters
60-
dry_run=dispatch.dry_run,
61-
until=dispatch.until,
62-
)
63-
else:
64-
actor.stop() # this will stop the actor
41+
key=key,
42+
) as dispatcher:
43+
await dispatcher.start_dispatching(
44+
dispatch_type="EXAMPLE_TYPE",
45+
actor_factory=create_actor,
46+
merge_strategy=MergeByType(),
47+
retry_interval=timedelta(seconds=10)
48+
)
49+
50+
await dispatcher
6551
```
6652

6753
## Supported Platforms

RELEASE_NOTES.md

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,51 @@ This release introduces a more flexible and powerful mechanism for managing disp
66

77
## Upgrading
88

9+
A new simplified way to manage actors has been introduced:
10+
11+
Change your code from:
12+
```python
13+
dispatcher = Dispatcher(
14+
microgrid_id=microgrid_id,
15+
server_url=url,
16+
key=key
17+
)
18+
dispatcher.start()
19+
20+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
21+
22+
managing_actor = ActorDispatcher(
23+
actor_factory=MyActor.new_with_dispatch,
24+
running_status_receiver=status_receiver,
25+
)
26+
27+
await run(managing_actor)
28+
```
29+
30+
to
31+
32+
```python
33+
async with Dispatcher(
34+
microgrid_id=microgrid_id,
35+
server_url=url,
36+
key=key
37+
) as dispatcher:
38+
await dispatcher.start_dispatching(
39+
dispatch_type="EXAMPLE_TYPE",
40+
actor_factory=MyActor.new_with_dispatch, # now async factory!
41+
merge_strategy=MergeStrategy.MergeByType,
42+
retry_interval=10
43+
)
44+
await dispatcher
45+
```
46+
47+
Further changes:
48+
949
* `Dispatcher.start` is no longer `async`. Remove `await` when calling it.
1050
* Two properties have been replaced by methods that require a type as parameter.
1151
* `Dispatcher.lifecycle_events` has been replaced by the method `Dispatcher.new_lifecycle_events_receiver(self, dispatch_type: str)`.
1252
* `Dispatcher.running_status_change` has been replaced by the method `Dispatcher.new_running_state_event_receiver(self, dispatch_type: str, merge_strategy: MergeStrategy, initial_fetch_timeout: timedelta)`.
13-
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new-receiver function.
53+
* The managing actor constructor no longer requires the `dispatch_type` parameter. Instead you're expected to pass the type to the new_receiver function.
1454
* The `DispatchManagingActor` class has been renamed to `DispatchActorsService`.
1555
* It's interface has been simplified and now only requires an actor factory and a running status receiver.
1656
* It only starts/stops a single actor at a time now instead of a set of actors.

src/frequenz/dispatch/_actor_dispatcher.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,21 +118,19 @@ async def main():
118118
119119
microgrid_id = 1
120120
121-
dispatcher = Dispatcher(
121+
async with Dispatcher(
122122
microgrid_id=microgrid_id,
123123
server_url=url,
124124
key=key
125-
)
126-
dispatcher.start()
127-
128-
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
125+
) as dispatcher:
126+
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")
129127
130-
managing_actor = ActorDispatcher(
131-
actor_factory=MyActor.new_with_dispatch,
132-
running_status_receiver=status_receiver,
133-
)
128+
managing_actor = ActorDispatcher(
129+
actor_factory=MyActor.new_with_dispatch,
130+
running_status_receiver=status_receiver,
131+
)
134132
135-
await run(managing_actor)
133+
await run(managing_actor)
136134
```
137135
"""
138136

src/frequenz/dispatch/_dispatcher.py

Lines changed: 115 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ class Dispatcher(BackgroundService):
2828
"""A highlevel interface for the dispatch API.
2929
3030
This class provides a highlevel interface to the dispatch API.
31-
It provides two receiver functions:
31+
It provides receivers for various events and management of actors based on
32+
dispatches.
33+
34+
The receivers shortly explained:
3235
3336
* [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
3437
Receives an event whenever a dispatch is created, updated or deleted.
@@ -42,6 +45,36 @@ class Dispatcher(BackgroundService):
4245
Any change that could potentially require the consumer to start, stop or
4346
reconfigure itself will cause a message to be sent.
4447
48+
Example: Managing an actor
49+
```python
50+
import os
51+
from frequenz.dispatch import Dispatcher, MergeByType
52+
from unittest.mock import MagicMock
53+
54+
async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
55+
return MagicMock(dispatch=dispatch, receiver=receiver)
56+
57+
async def run():
58+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
59+
key = os.getenv("DISPATCH_API_KEY", "some-key")
60+
61+
microgrid_id = 1
62+
63+
async with Dispatcher(
64+
microgrid_id=microgrid_id,
65+
server_url=url,
66+
key=key
67+
) as dispatcher:
68+
dispatcher.start_dispatching(
69+
dispatch_type="DISPATCH_TYPE",
70+
actor_factory=create_actor,
71+
merge_strategy=MergeByType(),
72+
retry_interval=timedelta(seconds=60),
73+
)
74+
75+
await dispatcher
76+
```
77+
4578
Example: Processing running state change dispatches
4679
```python
4780
import os
@@ -54,38 +87,36 @@ async def run():
5487
5588
microgrid_id = 1
5689
57-
dispatcher = Dispatcher(
90+
async with Dispatcher(
5891
microgrid_id=microgrid_id,
5992
server_url=url,
6093
key=key
61-
)
62-
await dispatcher.start()
63-
64-
actor = MagicMock() # replace with your actor
65-
66-
changed_running_status = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
67-
68-
async for dispatch in changed_running_status:
69-
if dispatch.started:
70-
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
71-
if actor.is_running:
72-
actor.reconfigure(
73-
components=dispatch.target,
74-
run_parameters=dispatch.payload, # custom actor parameters
75-
dry_run=dispatch.dry_run,
76-
until=dispatch.until,
77-
) # this will reconfigure the actor
94+
) as dispatcher:
95+
actor = MagicMock() # replace with your actor
96+
97+
rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
98+
99+
async for dispatch in rs_receiver:
100+
if dispatch.started:
101+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
102+
if actor.is_running:
103+
actor.reconfigure(
104+
components=dispatch.target,
105+
run_parameters=dispatch.payload, # custom actor parameters
106+
dry_run=dispatch.dry_run,
107+
until=dispatch.until,
108+
) # this will reconfigure the actor
109+
else:
110+
# this will start a new actor with the given components
111+
# and run it for the duration of the dispatch
112+
actor.start(
113+
components=dispatch.target,
114+
run_parameters=dispatch.payload, # custom actor parameters
115+
dry_run=dispatch.dry_run,
116+
until=dispatch.until,
117+
)
78118
else:
79-
# this will start a new actor with the given components
80-
# and run it for the duration of the dispatch
81-
actor.start(
82-
components=dispatch.target,
83-
run_parameters=dispatch.payload, # custom actor parameters
84-
dry_run=dispatch.dry_run,
85-
until=dispatch.until,
86-
)
87-
else:
88-
actor.stop() # this will stop the actor
119+
actor.stop() # this will stop the actor
89120
```
90121
91122
Example: Getting notification about dispatch lifecycle events
@@ -101,25 +132,23 @@ async def run():
101132
102133
microgrid_id = 1
103134
104-
dispatcher = Dispatcher(
135+
async with Dispatcher(
105136
microgrid_id=microgrid_id,
106137
server_url=url,
107-
key=key
108-
)
109-
await dispatcher.start() # this will start the actor
110-
111-
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
112-
113-
async for event in events_receiver:
114-
match event:
115-
case Created(dispatch):
116-
print(f"A dispatch was created: {dispatch}")
117-
case Deleted(dispatch):
118-
print(f"A dispatch was deleted: {dispatch}")
119-
case Updated(dispatch):
120-
print(f"A dispatch was updated: {dispatch}")
121-
case _ as unhandled:
122-
assert_never(unhandled)
138+
key=key,
139+
) as dispatcher:
140+
events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
141+
142+
async for event in events_receiver:
143+
match event:
144+
case Created(dispatch):
145+
print(f"A dispatch was created: {dispatch}")
146+
case Deleted(dispatch):
147+
print(f"A dispatch was deleted: {dispatch}")
148+
case Updated(dispatch):
149+
print(f"A dispatch was updated: {dispatch}")
150+
case _ as unhandled:
151+
assert_never(unhandled)
123152
```
124153
125154
Example: Creating a new dispatch and then modifying it.
@@ -139,35 +168,33 @@ async def run():
139168
140169
microgrid_id = 1
141170
142-
dispatcher = Dispatcher(
171+
async with Dispatcher(
143172
microgrid_id=microgrid_id,
144173
server_url=url,
145-
key=key
146-
)
147-
await dispatcher.start() # this will start the actor
148-
149-
# Create a new dispatch
150-
new_dispatch = await dispatcher.client.create(
151-
microgrid_id=microgrid_id,
152-
type="ECHO_FREQUENCY", # replace with your own type
153-
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
154-
duration=timedelta(minutes=5),
155-
target=ComponentCategory.INVERTER,
156-
payload={"font": "Times New Roman"}, # Arbitrary payload data
157-
)
158-
159-
# Modify the dispatch
160-
await dispatcher.client.update(
161-
microgrid_id=microgrid_id,
162-
dispatch_id=new_dispatch.id,
163-
new_fields={"duration": timedelta(minutes=10)}
164-
)
165-
166-
# Validate the modification
167-
modified_dispatch = await dispatcher.client.get(
168-
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
169-
)
170-
assert modified_dispatch.duration == timedelta(minutes=10)
174+
key=key,
175+
) as dispatcher:
176+
# Create a new dispatch
177+
new_dispatch = await dispatcher.client.create(
178+
microgrid_id=microgrid_id,
179+
type="ECHO_FREQUENCY", # replace with your own type
180+
start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
181+
duration=timedelta(minutes=5),
182+
target=ComponentCategory.INVERTER,
183+
payload={"font": "Times New Roman"}, # Arbitrary payload data
184+
)
185+
186+
# Modify the dispatch
187+
await dispatcher.client.update(
188+
microgrid_id=microgrid_id,
189+
dispatch_id=new_dispatch.id,
190+
new_fields={"duration": timedelta(minutes=10)}
191+
)
192+
193+
# Validate the modification
194+
modified_dispatch = await dispatcher.client.get(
195+
microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
196+
)
197+
assert modified_dispatch.duration == timedelta(minutes=10)
171198
```
172199
"""
173200

@@ -248,13 +275,27 @@ async def start_dispatching(
248275
) -> None:
249276
"""Manage actors for a given dispatch type.
250277
251-
Creates and manages an ActorDispatcher for the given type that will
278+
Creates and manages an
279+
[`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
252280
start, stop and reconfigure actors based on received dispatches.
253281
254282
You can await the `Dispatcher` instance to block until all types
255283
registered with `start_dispatching()` are stopped using
256284
`stop_dispatching()`
257285
286+
"Merging" means that when multiple dispatches are active at the same time,
287+
the intervals are merged into one.
288+
289+
This also decides how instances are mapped from dispatches to actors:
290+
291+
* [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
292+
one single instance identified by the dispatch type.
293+
* [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
294+
dispatch maps to an instance identified by the dispatch type and target.
295+
So different dispatches with equal type and target will map to the same
296+
instance.
297+
* `None` — No merging, each dispatch maps to a separate instance.
298+
258299
Args:
259300
dispatch_type: The type of the dispatch to manage.
260301
actor_factory: The factory to create actors.

0 commit comments

Comments
 (0)