Skip to content

Commit 232b3f3

Browse files
authored
Implement streaming (#84)
2 parents 4ae5d7b + 5be697f commit 232b3f3

File tree

4 files changed

+112
-1
lines changed

4 files changed

+112
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ This release includes a new feature for pagination support in the dispatch list
1616
- `Client.__init__`:
1717
- Has a new parameter `connect` which is a boolean that determines if the client should connect to the server on initialization.
1818
- Automatically sets up the channel for encrypted TLS communication.
19+
- A new method `stream()` to receive dispatch events in real-time.

src/frequenz/client/dispatch/__main__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ async def list_(ctx: click.Context, /, **filters: Any) -> None:
105105
click.echo(f"{num_dispatches} dispatches total.")
106106

107107

108+
@cli.command("stream")
109+
@click.pass_context
110+
@click.argument("microgrid-id", required=True, type=int)
111+
async def stream(ctx: click.Context, microgrid_id: int) -> None:
112+
"""Stream dispatches."""
113+
async for message in ctx.obj["client"].stream(microgrid_id=microgrid_id):
114+
click.echo(pformat(message, compact=True))
115+
116+
108117
def parse_recurrence(kwargs: dict[str, Any]) -> RecurrenceRule | None:
109118
"""Parse recurrence rule from kwargs."""
110119
interval = kwargs.pop("interval", 0)
@@ -387,7 +396,16 @@ async def interactive_mode(url: str, key: str) -> None:
387396
hist_file = os.path.expanduser("~/.dispatch_cli_history.txt")
388397
session: PromptSession[str] = PromptSession(history=FileHistory(filename=hist_file))
389398

390-
user_commands = ["list", "create", "update", "get", "delete", "exit", "help"]
399+
user_commands = [
400+
"list",
401+
"stream",
402+
"create",
403+
"update",
404+
"get",
405+
"delete",
406+
"exit",
407+
"help",
408+
]
391409

392410
async def display_help() -> None:
393411
await cli.main(args=["--help"], standalone_mode=False)

src/frequenz/client/dispatch/_client.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
GetMicrogridDispatchResponse,
1919
ListMicrogridDispatchesRequest,
2020
ListMicrogridDispatchesResponse,
21+
StreamMicrogridDispatchesRequest,
22+
StreamMicrogridDispatchesResponse,
2123
)
2224
from frequenz.api.dispatch.v1.dispatch_pb2 import (
2325
TimeIntervalFilter as PBTimeIntervalFilter,
@@ -27,14 +29,17 @@
2729
UpdateMicrogridDispatchResponse,
2830
)
2931

32+
from frequenz import channels
3033
from frequenz.client.base.channel import ChannelOptions, SslOptions
3134
from frequenz.client.base.client import BaseApiClient
3235
from frequenz.client.base.conversion import to_timestamp
36+
from frequenz.client.base.streaming import GrpcStreamBroadcaster
3337

3438
from ._internal_types import DispatchCreateRequest
3539
from .types import (
3640
ComponentSelector,
3741
Dispatch,
42+
DispatchEvent,
3843
RecurrenceRule,
3944
component_selector_to_protobuf,
4045
)
@@ -46,6 +51,11 @@
4651
class Client(BaseApiClient[dispatch_pb2_grpc.MicrogridDispatchServiceStub]):
4752
"""Dispatch API client."""
4853

54+
streams: dict[
55+
int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]
56+
] = {}
57+
"""A dictionary of streamers, keyed by microgrid_id."""
58+
4959
def __init__(
5060
self,
5161
*,
@@ -170,6 +180,50 @@ def to_interval(
170180
else:
171181
break
172182

183+
def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
184+
"""Receive a stream of dispatch events.
185+
186+
This function returns a receiver channel that can be used to receive
187+
dispatch events.
188+
An event is one of [CREATE, UPDATE, DELETE].
189+
190+
Example usage:
191+
192+
```
193+
client = Client(key="key", server_url="grpc://fz-0004.frequenz.io")
194+
async for message in client.stream(microgrid_id=1):
195+
print(message.event, message.dispatch)
196+
```
197+
198+
Args:
199+
microgrid_id: The microgrid_id to receive dispatches for.
200+
201+
Returns:
202+
A receiver channel to receive the stream of dispatch events.
203+
"""
204+
return self._get_stream(microgrid_id).new_receiver()
205+
206+
def _get_stream(
207+
self, microgrid_id: int
208+
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
209+
"""Get an instance to the streaming helper."""
210+
broadcaster = self.streams.get(microgrid_id)
211+
if broadcaster is None:
212+
request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id)
213+
broadcaster = GrpcStreamBroadcaster(
214+
stream_name="StreamMicrogridDispatches",
215+
stream_method=lambda: cast(
216+
AsyncIterator[StreamMicrogridDispatchesResponse],
217+
self.stub.StreamMicrogridDispatches(
218+
request, metadata=self._metadata
219+
),
220+
),
221+
transform=DispatchEvent.from_protobuf,
222+
)
223+
self.streams[microgrid_id] = broadcaster
224+
225+
return broadcaster
226+
173227
async def create(
174228
self,
175229
microgrid_id: int,

src/frequenz/client/dispatch/types.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from frequenz.api.dispatch.v1.dispatch_pb2 import Dispatch as PBDispatch
1717
from frequenz.api.dispatch.v1.dispatch_pb2 import RecurrenceRule as PBRecurrenceRule
18+
from frequenz.api.dispatch.v1.dispatch_pb2 import StreamMicrogridDispatchesResponse
1819
from google.protobuf.json_format import MessageToDict
1920

2021
from frequenz.client.base.conversion import to_datetime, to_timestamp
@@ -348,3 +349,40 @@ def to_protobuf(self) -> PBDispatch:
348349
pb_dispatch.data.recurrence.CopyFrom(self.recurrence.to_protobuf())
349350

350351
return pb_dispatch
352+
353+
354+
class Event(IntEnum):
355+
"""Enum representing the type of event that occurred during a dispatch operation."""
356+
357+
UNSPECIFIED = StreamMicrogridDispatchesResponse.Event.EVENT_UNSPECIFIED
358+
CREATED = StreamMicrogridDispatchesResponse.Event.EVENT_CREATED
359+
UPDATED = StreamMicrogridDispatchesResponse.Event.EVENT_UPDATED
360+
DELETED = StreamMicrogridDispatchesResponse.Event.EVENT_DELETED
361+
362+
363+
@dataclass(kw_only=True, frozen=True)
364+
class DispatchEvent:
365+
"""Represents an event that occurred during a dispatch operation."""
366+
367+
dispatch: Dispatch
368+
"""The dispatch associated with the event."""
369+
370+
event: Event
371+
"""The type of event that occurred."""
372+
373+
@classmethod
374+
def from_protobuf(
375+
cls, pb_object: StreamMicrogridDispatchesResponse
376+
) -> "DispatchEvent":
377+
"""Convert a protobuf dispatch event to a dispatch event.
378+
379+
Args:
380+
pb_object: The protobuf dispatch event to convert.
381+
382+
Returns:
383+
The converted dispatch event.
384+
"""
385+
return DispatchEvent(
386+
dispatch=Dispatch.from_protobuf(pb_object.dispatch),
387+
event=Event(pb_object.event),
388+
)

0 commit comments

Comments
 (0)