From d5dbc254fccfb78700de0770eca7fdb7698ec5ee Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 26 Sep 2024 13:48:11 +0200 Subject: [PATCH] Raise exception in stream() on connection loss Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 2 +- src/frequenz/client/dispatch/_client.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 676fcdfc..98e9160d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +* `Client.stream()` will now raise an Exception when the connection is lost. ## New Features diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index 5bac9cc4..8f24b788 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -33,6 +33,7 @@ from frequenz.client.base.channel import ChannelOptions, SslOptions from frequenz.client.base.client import BaseApiClient from frequenz.client.base.conversion import to_timestamp +from frequenz.client.base.retry import LinearBackoff from frequenz.client.base.streaming import GrpcStreamBroadcaster from ._internal_types import DispatchCreateRequest @@ -208,6 +209,11 @@ def _get_stream( ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: """Get an instance to the streaming helper.""" broadcaster = self.streams.get(microgrid_id) + # pylint: disable=protected-access + if broadcaster is not None and broadcaster._channel.is_closed: + # pylint: enable=protected-access + del self.streams[microgrid_id] + broadcaster = None if broadcaster is None: request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id) broadcaster = GrpcStreamBroadcaster( @@ -219,6 +225,7 @@ def _get_stream( ), ), transform=DispatchEvent.from_protobuf, + retry_strategy=LinearBackoff(interval=1, limit=0), ) self.streams[microgrid_id] = broadcaster