From 7957bef7e22253e5a56cc78d9870c8842afd2b60 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 5 Jun 2025 10:34:09 +0200 Subject: [PATCH] Client.stream(): Return a streamer instead of a receiver This allows us to use all the options available in the streamer. Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 13 +++++++++++ src/frequenz/client/dispatch/_client.py | 29 +++++++++++-------------- tests/test_client.py | 2 +- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bfe26b86..0f316eb2 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,6 +12,19 @@ While the new TargetCategory class supports subtypes, only reading them is curre * `TargetIds(ComponentIds(1), ComponentIds(2), ComponentIds(3))` * `TargetCategories` can be used to specify one or more target categories: * `TargetCategories(ComponentCategory.BATTERY, ComponentCategory.INVERTER)` +* `DispatchApiClient.stream()` now returns a streamer instead of a receiver. This allows you to use the new `new_receiver` method to create a receiver with more options, such as `max_size`, `warn_on_overflow` and `include_events`. + + ```python + client = DispatchApiClient( + key="key", + server_url="grpc://dispatch.url.goes.here.example.com" + ) + receiver = client.stream(microgrid_id=1).new_receiver() + + async for message in receiver: + print(message.event, message.dispatch) + ``` + ## New Features diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index ab6fd8af..2cf095aa 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -29,7 +29,6 @@ UpdateMicrogridDispatchResponse, ) -from frequenz import channels from frequenz.client.base.channel import ChannelOptions, SslOptions from frequenz.client.base.client import BaseApiClient from frequenz.client.base.conversion import to_timestamp @@ -211,21 +210,25 @@ def to_interval( else: break - def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]: + def stream( + self, microgrid_id: int + ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: """Receive a stream of dispatch events. - This function returns a receiver channel that can be used to receive - dispatch events. + This function returns a streamer that can be used to receive dispatch + events. An event is one of [CREATE, UPDATE, DELETE]. Example usage: - ``` + ```python client = DispatchApiClient( key="key", server_url="grpc://dispatch.url.goes.here.example.com" ) - async for message in client.stream(microgrid_id=1): + receiver = client.stream(microgrid_id=1).new_receiver() + + async for message in receiver: print(message.event, message.dispatch) ``` @@ -233,18 +236,12 @@ def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]: microgrid_id: The microgrid_id to receive dispatches for. Returns: - A receiver channel to receive the stream of dispatch events. + A broadcaster that can be used to receive dispatch events. + The broadcaster will automatically reconnect if the connection is lost. + It will also handle backoff and retry logic. """ - return self._get_stream(microgrid_id).new_receiver() - - def _get_stream( - self, microgrid_id: int - ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: - """Get an instance to the streaming helper.""" broadcaster = self._streams.get(microgrid_id) - # pylint: disable=protected-access - if broadcaster is not None and broadcaster._channel.is_closed: - # pylint: enable=protected-access + if broadcaster is not None and not broadcaster.is_running: del self._streams[microgrid_id] broadcaster = None if broadcaster is None: diff --git a/tests/test_client.py b/tests/test_client.py index 8ee16eab..ba1189d4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -310,7 +310,7 @@ async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None: stream = client.stream(microgrid_id) async def expect(dispatch: Dispatch, event: Event) -> None: - message = await stream.receive() + message = await stream.new_receiver().receive() assert message.dispatch == dispatch assert message.event == event