Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ While the new TargetCategory class supports subtypes, only reading them is curre
* `TargetIds(ComponentIds(1), ComponentIds(2), ComponentIds(3))`
* `TargetCategories` can be used to specify one or more target categories:
* `TargetCategories(ComponentCategory.BATTERY, ComponentCategory.INVERTER)`
* `DispatchApiClient.stream()` now returns a streamer instead of a receiver. This allows you to use the new `new_receiver` method to create a receiver with more options, such as `max_size`, `warn_on_overflow` and `include_events`.

```python
client = DispatchApiClient(
key="key",
server_url="grpc://dispatch.url.goes.here.example.com"
)
receiver = client.stream(microgrid_id=1).new_receiver()

async for message in receiver:
print(message.event, message.dispatch)
```


## New Features

Expand Down
29 changes: 13 additions & 16 deletions src/frequenz/client/dispatch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
UpdateMicrogridDispatchResponse,
)

from frequenz import channels
from frequenz.client.base.channel import ChannelOptions, SslOptions
from frequenz.client.base.client import BaseApiClient
from frequenz.client.base.conversion import to_timestamp
Expand Down Expand Up @@ -211,40 +210,38 @@ def to_interval(
else:
break

def stream(self, microgrid_id: int) -> channels.Receiver[DispatchEvent]:
def stream(
self, microgrid_id: int
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
"""Receive a stream of dispatch events.

This function returns a receiver channel that can be used to receive
dispatch events.
This function returns a streamer that can be used to receive dispatch
events.
An event is one of [CREATE, UPDATE, DELETE].

Example usage:

```
```python
client = DispatchApiClient(
key="key",
server_url="grpc://dispatch.url.goes.here.example.com"
)
async for message in client.stream(microgrid_id=1):
receiver = client.stream(microgrid_id=1).new_receiver()

async for message in receiver:
print(message.event, message.dispatch)
```

Args:
microgrid_id: The microgrid_id to receive dispatches for.

Returns:
A receiver channel to receive the stream of dispatch events.
A broadcaster that can be used to receive dispatch events.
The broadcaster will automatically reconnect if the connection is lost.
It will also handle backoff and retry logic.
"""
return self._get_stream(microgrid_id).new_receiver()

def _get_stream(
self, microgrid_id: int
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
"""Get an instance to the streaming helper."""
broadcaster = self._streams.get(microgrid_id)
# pylint: disable=protected-access
if broadcaster is not None and broadcaster._channel.is_closed:
# pylint: enable=protected-access
if broadcaster is not None and not broadcaster.is_running:
del self._streams[microgrid_id]
broadcaster = None
if broadcaster is None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def test_dispatch_stream(client: FakeClient, sample: Dispatch) -> None:
stream = client.stream(microgrid_id)

async def expect(dispatch: Dispatch, event: Event) -> None:
message = await stream.receive()
message = await stream.new_receiver().receive()
assert message.dispatch == dispatch
assert message.event == event

Expand Down
Loading