@@ -28,7 +28,10 @@ class Dispatcher(BackgroundService):
2828 """A highlevel interface for the dispatch API.
2929
3030 This class provides a highlevel interface to the dispatch API.
31- It provides two receiver functions:
31+ It provides receivers for various events and management of actors based on
32+ dispatches.
33+
34+ The receivers shortly explained:
3235
3336 * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
3437 Receives an event whenever a dispatch is created, updated or deleted.
@@ -42,6 +45,36 @@ class Dispatcher(BackgroundService):
4245 Any change that could potentially require the consumer to start, stop or
4346 reconfigure itself will cause a message to be sent.
4447
48+ Example: Managing an actor
49+ ```python
50+ import os
51+ from frequenz.dispatch import Dispatcher, MergeByType
52+ from unittest.mock import MagicMock
53+
54+ async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
55+ return MagicMock(dispatch=dispatch, receiver=receiver)
56+
57+ async def run():
58+ url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
59+ key = os.getenv("DISPATCH_API_KEY", "some-key")
60+
61+ microgrid_id = 1
62+
63+ async with Dispatcher(
64+ microgrid_id=microgrid_id,
65+ server_url=url,
66+ key=key
67+ ) as dispatcher:
68+ dispatcher.start_dispatching(
69+ dispatch_type="DISPATCH_TYPE",
70+ actor_factory=create_actor,
71+ merge_strategy=MergeByType(),
72+ retry_interval=timedelta(seconds=60),
73+ )
74+
75+ await dispatcher
76+ ```
77+
4578 Example: Processing running state change dispatches
4679 ```python
4780 import os
@@ -54,38 +87,36 @@ async def run():
5487
5588 microgrid_id = 1
5689
57- dispatcher = Dispatcher(
90+ async with Dispatcher(
5891 microgrid_id=microgrid_id,
5992 server_url=url,
6093 key=key
61- )
62- await dispatcher.start()
63-
64- actor = MagicMock() # replace with your actor
65-
66- changed_running_status = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
67-
68- async for dispatch in changed_running_status:
69- if dispatch.started:
70- print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
71- if actor.is_running:
72- actor.reconfigure(
73- components=dispatch.target,
74- run_parameters=dispatch.payload, # custom actor parameters
75- dry_run=dispatch.dry_run,
76- until=dispatch.until,
77- ) # this will reconfigure the actor
94+ ) as dispatcher:
95+ actor = MagicMock() # replace with your actor
96+
97+ rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE")
98+
99+ async for dispatch in rs_receiver:
100+ if dispatch.started:
101+ print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
102+ if actor.is_running:
103+ actor.reconfigure(
104+ components=dispatch.target,
105+ run_parameters=dispatch.payload, # custom actor parameters
106+ dry_run=dispatch.dry_run,
107+ until=dispatch.until,
108+ ) # this will reconfigure the actor
109+ else:
110+ # this will start a new actor with the given components
111+ # and run it for the duration of the dispatch
112+ actor.start(
113+ components=dispatch.target,
114+ run_parameters=dispatch.payload, # custom actor parameters
115+ dry_run=dispatch.dry_run,
116+ until=dispatch.until,
117+ )
78118 else:
79- # this will start a new actor with the given components
80- # and run it for the duration of the dispatch
81- actor.start(
82- components=dispatch.target,
83- run_parameters=dispatch.payload, # custom actor parameters
84- dry_run=dispatch.dry_run,
85- until=dispatch.until,
86- )
87- else:
88- actor.stop() # this will stop the actor
119+ actor.stop() # this will stop the actor
89120 ```
90121
91122 Example: Getting notification about dispatch lifecycle events
@@ -101,25 +132,23 @@ async def run():
101132
102133 microgrid_id = 1
103134
104- dispatcher = Dispatcher(
135+ async with Dispatcher(
105136 microgrid_id=microgrid_id,
106137 server_url=url,
107- key=key
108- )
109- await dispatcher.start() # this will start the actor
110-
111- events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
112-
113- async for event in events_receiver:
114- match event:
115- case Created(dispatch):
116- print(f"A dispatch was created: {dispatch}")
117- case Deleted(dispatch):
118- print(f"A dispatch was deleted: {dispatch}")
119- case Updated(dispatch):
120- print(f"A dispatch was updated: {dispatch}")
121- case _ as unhandled:
122- assert_never(unhandled)
138+ key=key,
139+ ) as dispatcher:
140+ events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
141+
142+ async for event in events_receiver:
143+ match event:
144+ case Created(dispatch):
145+ print(f"A dispatch was created: {dispatch}")
146+ case Deleted(dispatch):
147+ print(f"A dispatch was deleted: {dispatch}")
148+ case Updated(dispatch):
149+ print(f"A dispatch was updated: {dispatch}")
150+ case _ as unhandled:
151+ assert_never(unhandled)
123152 ```
124153
125154 Example: Creating a new dispatch and then modifying it.
@@ -139,35 +168,33 @@ async def run():
139168
140169 microgrid_id = 1
141170
142- dispatcher = Dispatcher(
171+ async with Dispatcher(
143172 microgrid_id=microgrid_id,
144173 server_url=url,
145- key=key
146- )
147- await dispatcher.start() # this will start the actor
148-
149- # Create a new dispatch
150- new_dispatch = await dispatcher.client.create(
151- microgrid_id=microgrid_id,
152- type="ECHO_FREQUENCY", # replace with your own type
153- start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
154- duration=timedelta(minutes=5),
155- target=ComponentCategory.INVERTER,
156- payload={"font": "Times New Roman"}, # Arbitrary payload data
157- )
158-
159- # Modify the dispatch
160- await dispatcher.client.update(
161- microgrid_id=microgrid_id,
162- dispatch_id=new_dispatch.id,
163- new_fields={"duration": timedelta(minutes=10)}
164- )
165-
166- # Validate the modification
167- modified_dispatch = await dispatcher.client.get(
168- microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
169- )
170- assert modified_dispatch.duration == timedelta(minutes=10)
174+ key=key,
175+ ) as dispatcher:
176+ # Create a new dispatch
177+ new_dispatch = await dispatcher.client.create(
178+ microgrid_id=microgrid_id,
179+ type="ECHO_FREQUENCY", # replace with your own type
180+ start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
181+ duration=timedelta(minutes=5),
182+ target=ComponentCategory.INVERTER,
183+ payload={"font": "Times New Roman"}, # Arbitrary payload data
184+ )
185+
186+ # Modify the dispatch
187+ await dispatcher.client.update(
188+ microgrid_id=microgrid_id,
189+ dispatch_id=new_dispatch.id,
190+ new_fields={"duration": timedelta(minutes=10)}
191+ )
192+
193+ # Validate the modification
194+ modified_dispatch = await dispatcher.client.get(
195+ microgrid_id=microgrid_id, dispatch_id=new_dispatch.id
196+ )
197+ assert modified_dispatch.duration == timedelta(minutes=10)
171198 ```
172199 """
173200
@@ -248,13 +275,27 @@ async def start_dispatching(
248275 ) -> None :
249276 """Manage actors for a given dispatch type.
250277
251- Creates and manages an ActorDispatcher for the given type that will
278+ Creates and manages an
279+ [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
252280 start, stop and reconfigure actors based on received dispatches.
253281
254282 You can await the `Dispatcher` instance to block until all types
255283 registered with `start_dispatching()` are stopped using
256284 `stop_dispatching()`
257285
286+ "Merging" means that when multiple dispatches are active at the same time,
287+ the intervals are merged into one.
288+
289+ This also decides how instances are mapped from dispatches to actors:
290+
291+ * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
292+ one single instance identified by the dispatch type.
293+ * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
294+ dispatch maps to an instance identified by the dispatch type and target.
295+ So different dispatches with equal type and target will map to the same
296+ instance.
297+ * `None` — No merging, each dispatch maps to a separate instance.
298+
258299 Args:
259300 dispatch_type: The type of the dispatch to manage.
260301 actor_factory: The factory to create actors.
0 commit comments