Skip to content

Commit 1c73538

Browse files
committed
Client.stream(): Return a streamer instead of a receiver
This allows us to use all the options available in the streamer. Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent b83e6fa commit 1c73538

File tree

3 files changed

+24
-16
lines changed

3 files changed

+24
-16
lines changed

RELEASE_NOTES.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@ While the new TargetCategory class supports subtypes, only reading them is curre
1212
* `TargetIds(ComponentIds(1), ComponentIds(2), ComponentIds(3))`
1313
* `TargetCategories` can be used to specify one or more target categories:
1414
* `TargetCategories(ComponentCategory.BATTERY, ComponentCategory.INVERTER)`
15+
* `DispatchApiClient.stream()` now returns a streamer instead of a receiver. This allows you to use the new `new_receiver` method to create a receiver with more options, such as `max_size`, `warn_on_overflow` and `include_events`.
16+
17+
```python
18+
client = DispatchApiClient(
19+
key="key",
20+
server_url="grpc://dispatch.url.goes.here.example.com"
21+
)
22+
receiver = client.stream(microgrid_id=1).new_receiver()
23+
24+
async for message in receiver:
25+
print(message.event, message.dispatch)
26+
```
27+
1528

1629
## New Features
1730

src/frequenz/client/dispatch/_client.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
UpdateMicrogridDispatchResponse,
3030
)
3131

32-
from frequenz import channels
3332
from frequenz.client.base.channel import ChannelOptions, SslOptions
3433
from frequenz.client.base.client import BaseApiClient
3534
from frequenz.client.base.conversion import to_timestamp
@@ -211,21 +210,25 @@ def to_interval(
211210
else:
212211
break
213212

214-
def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
213+
def stream(
214+
self, microgrid_id: int
215+
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
215216
"""Receive a stream of dispatch events.
216217
217-
This function returns a receiver channel that can be used to receive
218-
dispatch events.
218+
This function returns a streamer that can be used to receive dispatch
219+
events.
219220
An event is one of [CREATE, UPDATE, DELETE].
220221
221222
Example usage:
222223
223-
```
224+
```python
224225
client = DispatchApiClient(
225226
key="key",
226227
server_url="grpc://dispatch.url.goes.here.example.com"
227228
)
228-
async for message in client.stream(microgrid_id=1):
229+
receiver = client.stream(microgrid_id=1).new_receiver()
230+
231+
async for message in receiver:
229232
print(message.event, message.dispatch)
230233
```
231234
@@ -235,16 +238,8 @@ def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
235238
Returns:
236239
A receiver channel to receive the stream of dispatch events.
237240
"""
238-
return self._get_stream(microgrid_id).new_receiver()
239-
240-
def _get_stream(
241-
self, microgrid_id: int
242-
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
243-
"""Get an instance to the streaming helper."""
244241
broadcaster = self._streams.get(microgrid_id)
245-
# pylint: disable=protected-access
246-
if broadcaster is not None and broadcaster._channel.is_closed:
247-
# pylint: enable=protected-access
242+
if broadcaster is not None and not broadcaster.is_running:
248243
del self._streams[microgrid_id]
249244
broadcaster = None
250245
if broadcaster is None:

tests/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None:
310310
stream = client.stream(microgrid_id)
311311

312312
async def expect(dispatch: Dispatch, event: Event) -> None:
313-
message = await stream.receive()
313+
message = await stream.new_receiver().receive()
314314
assert message.dispatch == dispatch
315315
assert message.event == event
316316

0 commit comments

Comments
 (0)