@@ -51,47 +51,61 @@ class Dispatcher:
5151
5252 Example: Processing running state change dispatches
5353 ```python
54+ import os
5455 import grpc.aio
5556 from unittest.mock import MagicMock
5657
5758 async def run():
58- grpc_channel = grpc.aio.insecure_channel("localhost:50051")
59- microgrid_id = 1
60- service_address = "localhost:50051"
59+ host = os.getenv("DISPATCH_API_HOST", "localhost")
60+ port = os.getenv("DISPATCH_API_PORT", "50051")
6161
62+ service_address = f"{host}:{port}"
63+ grpc_channel = grpc.aio.insecure_channel(service_address)
64+ microgrid_id = 1
6265 dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
66+ await dispatcher.start()
67+
6368 actor = MagicMock() # replace with your actor
6469
6570 changed_running_status_rx = dispatcher.running_status_change.new_receiver()
6671
6772 async for dispatch in changed_running_status_rx:
73+ if dispatch.type != "DEMO_TYPE":
74+ continue
75+
6876 print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
6977 if dispatch.running:
7078 if actor.is_running:
7179 actor.reconfigure(
72- components=dispatch.selector,
73- run_parameters=dispatch.payload
7480 ) # this will reconfigure the actor
7581 else:
76- # this will start the actor
82+ # this will start a new or reconfigure a running actor
7783 # and run it for the duration of the dispatch
78- actor.start(duration=dispatch.duration, dry_run=dispatch.dry_run)
84+ actor.start_or_reconfigure(
85+ components=dispatch.selector,
86+ run_parameters=dispatch.payload, # custom actor parameters
87+ dry_run=dispatch.dry_run,
88+ until=dispatch.until,
89+ )
7990 else:
8091 actor.stop() # this will stop the actor
8192 ```
8293
8394 Example: Getting notification about dispatch lifecycle events
8495 ```python
96+ import os
8597 from typing import assert_never
8698
8799 import grpc.aio
88100 from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
89101
90-
91102 async def run():
92- grpc_channel = grpc.aio.insecure_channel("localhost:50051")
103+ host = os.getenv("DISPATCH_API_HOST", "localhost")
104+ port = os.getenv("DISPATCH_API_PORT", "50051")
105+
106+ service_address = f"{host}:{port}"
107+ grpc_channel = grpc.aio.insecure_channel(service_address)
93108 microgrid_id = 1
94- service_address = "localhost:50051"
95109 dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
96110 dispatcher.start() # this will start the actor
97111
@@ -108,6 +122,46 @@ async def run():
108122 case _ as unhandled:
109123 assert_never(unhandled)
110124 ```
125+ Example: Creating a new dispatch and then modifying it. Note that this uses
126+ the lower-level `Client` class to create and update the dispatch.
127+ ```python
128+ import os
129+ from datetime import datetime, timedelta, timezone
130+
131+ import grpc.aio
132+ from frequenz.client.common.microgrid.components import ComponentCategory
133+
134+ from frequenz.dispatch import Dispatcher
135+
136+ async def run():
137+ host = os.getenv("DISPATCH_API_HOST", "localhost")
138+ port = os.getenv("DISPATCH_API_PORT", "50051")
139+
140+ service_address = f"{host}:{port}"
141+ grpc_channel = grpc.aio.insecure_channel(service_address)
142+ microgrid_id = 1
143+ dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
144+ await dispatcher.start() # this will start the actor
145+
146+ # Create a new dispatch
147+ new_dispatch = await dispatcher.client.create(
148+ microgrid_id=microgrid_id,
149+ _type="ECHO_FREQUENCY", # replace with your own type
150+ start_time=datetime.now(tz=timezone.utc) + timedelta(minutes=10),
151+ duration=timedelta(minutes=5),
152+ selector=ComponentCategory.INVERTER,
153+ payload={"font": "Times New Roman"}, # Arbitrary payload data
154+ )
155+
156+ # Modify the dispatch
157+ await dispatcher.client.update(
158+ dispatch_id=new_dispatch.id, new_fields={"duration": timedelta(minutes=10)}
159+ )
160+
161+ # Validate the modification
162+ modified_dispatch = await dispatcher.client.get(new_dispatch.id)
163+ assert modified_dispatch.duration == timedelta(minutes=10)
164+ ```
111165 """
112166
113167 def __init__ (
0 commit comments