Skip to content

Commit b4c1883

Browse files
committed
Don't retry by default when the stream is exhausted
When the stream is exhausted and the server closes the stream, we no longer automatically reconnect. But this can be changed by setting the `retry_on_exhausted_stream` parameter to `True`. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 93cd8df commit b4c1883

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

src/frequenz/client/base/streaming.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(
3333
stream_method: Callable[[], AsyncIterable[InputT]],
3434
transform: Callable[[InputT], OutputT],
3535
retry_strategy: retry.Strategy | None = None,
36+
retry_on_exhausted_stream: bool = False,
3637
):
3738
"""Initialize the streaming helper.
3839
@@ -43,13 +44,16 @@ def __init__(
4344
transform: A function to transform the input type to the output type.
4445
retry_strategy: The retry strategy to use, when the connection is lost. Defaults
4546
to retries every 3 seconds, with a jitter of 1 second, indefinitely.
47+
retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e.
48+
when the server closes the stream. Defaults to False.
4649
"""
4750
self._stream_name = stream_name
4851
self._stream_method = stream_method
4952
self._transform = transform
5053
self._retry_strategy = (
5154
retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
5255
)
56+
self._retry_on_exhausted_stream = retry_on_exhausted_stream
5357

5458
self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
5559
name=f"GrpcStreamBroadcaster-{stream_name}"
@@ -91,6 +95,12 @@ async def _run(self) -> None:
9195
await sender.send(self._transform(msg))
9296
except grpc.aio.AioRpcError as err:
9397
error = err
98+
if error is None and not self._retry_on_exhausted_stream:
99+
_logger.info(
100+
"%s: connection closed, stream exhausted", self._stream_name
101+
)
102+
await self._channel.close()
103+
break
94104
error_str = f"Error: {error}" if error else "Stream exhausted"
95105
interval = self._retry_strategy.next_interval()
96106
if interval is None:

0 commit comments

Comments
 (0)