@@ -28,7 +28,10 @@ class Dispatcher(BackgroundService):
2828 """A highlevel interface for the dispatch API.
2929
3030 This class provides a highlevel interface to the dispatch API.
31- It provides two receiver functions:
31+ It provides receivers for various events and management of actors based on
32+ dispatches.
33+
34+ The receivers shortly explained:
3235
3336 * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]:
3437 Receives an event whenever a dispatch is created, updated or deleted.
@@ -42,6 +45,38 @@ class Dispatcher(BackgroundService):
4245 Any change that could potentially require the consumer to start, stop or
4346 reconfigure itself will cause a message to be sent.
4447
48+ Example: Managing an actor
49+ ```python
50+ import os
51+ from frequenz.dispatch import Dispatcher, MergeByType
52+ from unittest.mock import MagicMock
53+
54+ async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor:
55+ return MagicMock(dispatch=dispatch, receiver=receiver)
56+
57+ async def run():
58+ url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
59+ key = os.getenv("DISPATCH_API_KEY", "some-key")
60+
61+ microgrid_id = 1
62+
63+ dispatcher = Dispatcher(
64+ microgrid_id=microgrid_id,
65+ server_url=url,
66+ key=key,
67+ autostart=True
68+ )
69+
70+ dispatcher.start_dispatching(
71+ dispatch_type="DISPATCH_TYPE",
72+ actor_factory=create_actor,
73+ merge_strategy=MergeByType(),
74+ retry_interval=timedelta(seconds=60),
75+ )
76+
77+ await dispatcher
78+ ```
79+
4580 Example: Processing running state change dispatches
4681 ```python
4782 import os
@@ -57,9 +92,9 @@ async def run():
5792 dispatcher = Dispatcher(
5893 microgrid_id=microgrid_id,
5994 server_url=url,
60- key=key
95+ key=key,
96+ autostart=True
6197 )
62- await dispatcher.start()
6398
6499 actor = MagicMock() # replace with your actor
65100
@@ -104,9 +139,9 @@ async def run():
104139 dispatcher = Dispatcher(
105140 microgrid_id=microgrid_id,
106141 server_url=url,
107- key=key
142+ key=key,
143+ autostart=True
108144 )
109- await dispatcher.start() # this will start the actor
110145
111146 events_receiver = dispatcher.new_lifecycle_events_receiver("DISPATCH_TYPE")
112147
@@ -142,9 +177,9 @@ async def run():
142177 dispatcher = Dispatcher(
143178 microgrid_id=microgrid_id,
144179 server_url=url,
145- key=key
180+ key=key,
181+ autostart=True
146182 )
147- await dispatcher.start() # this will start the actor
148183
149184 # Create a new dispatch
150185 new_dispatch = await dispatcher.client.create(
@@ -250,13 +285,27 @@ async def start_dispatching(
250285 ) -> None :
251286 """Manage actors for a given dispatch type.
252287
253- Creates and manages an ActorDispatcher for the given type that will
288+ Creates and manages an
289+ [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will
254290 start, stop and reconfigure actors based on received dispatches.
255291
256292 You can await the `Dispatcher` instance to block until all types
257293 registered with `start_dispatching()` are stopped using
258294 `stop_dispatching()`
259295
296+ "Merging" means that when multiple dispatches are active at the same time,
297+ the intervals are merged into one.
298+
299+ This also decides how instances are mapped from dispatches to actors:
300+
301+ * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to
302+ one single instance identified by the dispatch type.
303+ * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A
304+ dispatch maps to an instance identified by the dispatch type and target.
305+ So different dispatches with equal type and target will map to the same
306+ instance.
307+ * `None` — No merging, each dispatch maps to a separate instance.
308+
260309 Args:
261310 dispatch_type: The type of the dispatch to manage.
262311 actor_factory: The factory to create actors.
0 commit comments