Skip to content

Commit 66c64a9

Browse files
authored
Client.stream: Raise exception on connection loss (#94)
2 parents aec339d + d5dbc25 commit 66c64a9

File tree

2 files changed

+8
-1
lines changed

2 files changed

+8
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
* `Client.stream()` will now raise an Exception when the connection is lost.
1010

1111
## New Features
1212

src/frequenz/client/dispatch/_client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from frequenz.client.base.channel import ChannelOptions, SslOptions
3434
from frequenz.client.base.client import BaseApiClient
3535
from frequenz.client.base.conversion import to_timestamp
36+
from frequenz.client.base.retry import LinearBackoff
3637
from frequenz.client.base.streaming import GrpcStreamBroadcaster
3738

3839
from ._internal_types import DispatchCreateRequest
@@ -208,6 +209,11 @@ def _get_stream(
208209
) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]:
209210
"""Get an instance to the streaming helper."""
210211
broadcaster = self.streams.get(microgrid_id)
212+
# pylint: disable=protected-access
213+
if broadcaster is not None and broadcaster._channel.is_closed:
214+
# pylint: enable=protected-access
215+
del self.streams[microgrid_id]
216+
broadcaster = None
211217
if broadcaster is None:
212218
request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id)
213219
broadcaster = GrpcStreamBroadcaster(
@@ -219,6 +225,7 @@ def _get_stream(
219225
),
220226
),
221227
transform=DispatchEvent.from_protobuf,
228+
retry_strategy=LinearBackoff(interval=1, limit=0),
222229
)
223230
self.streams[microgrid_id] = broadcaster
224231

0 commit comments

Comments
 (0)