Skip to content

Commit 92028a8

Browse files
authored
Don't retry by default when the stream is exhausted (#91)
This change the default behaviour of `GrpcStreamBroadcaster` to not reconnect when the server closes the stream. This can be overridden by setting the `retry_on_exhausted_stream` parameter to `True`.
2 parents 93cd8df + 63d7425 commit 92028a8

File tree

3 files changed

+65
-2
lines changed

3 files changed

+65
-2
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
## Upgrading
88

99
- `GrpcStreamBroadcaster` now takes a `AsyncIterable` instead of a `AsyncIterator` as the `stream_method`. This is to match the type of streaming methods generated by `grpc`, so no conversion to an `AsyncIterator` is needed.
10+
11+
- `GrpcStreamBroadcaster` no longer tries to reconnect when a server closes a connection. This behaviour can be overridden by passing `retry_on_exhausted_stream=True` when constructing `GrpcStreamBroadcaster` instances.
12+
1013
- gRPC URLs don't have a default port anymore, unless a default is set via `ChannelOptions`. If you want to set a default port for URLs, please pass custom `ChannelOptions` as `defaults` to `parse_grpc_uri` or as `channel_defaults` to `BaseApiClient`.
14+
1115
* The `ExponentialBackoff` and `LinearBackoff` classes now require keyword arguments for their constructor. This change was made to make the classes easier to use and to avoid confusion with the order of the arguments.
1216

1317
## New Features

src/frequenz/client/base/streaming.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@
2727
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
2828
"""Helper class to handle grpc streaming methods."""
2929

30-
def __init__(
30+
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
3131
self,
3232
stream_name: str,
3333
stream_method: Callable[[], AsyncIterable[InputT]],
3434
transform: Callable[[InputT], OutputT],
3535
retry_strategy: retry.Strategy | None = None,
36+
retry_on_exhausted_stream: bool = False,
3637
):
3738
"""Initialize the streaming helper.
3839
@@ -43,13 +44,16 @@ def __init__(
4344
transform: A function to transform the input type to the output type.
4445
retry_strategy: The retry strategy to use, when the connection is lost. Defaults
4546
to retries every 3 seconds, with a jitter of 1 second, indefinitely.
47+
retry_on_exhausted_stream: Whether to retry when the stream is exhausted, i.e.
48+
when the server closes the stream. Defaults to False.
4649
"""
4750
self._stream_name = stream_name
4851
self._stream_method = stream_method
4952
self._transform = transform
5053
self._retry_strategy = (
5154
retry.LinearBackoff() if retry_strategy is None else retry_strategy.copy()
5255
)
56+
self._retry_on_exhausted_stream = retry_on_exhausted_stream
5357

5458
self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
5559
name=f"GrpcStreamBroadcaster-{stream_name}"
@@ -67,6 +71,15 @@ def new_receiver(self, maxsize: int = 50) -> channels.Receiver[OutputT]:
6771
"""
6872
return self._channel.new_receiver(limit=maxsize)
6973

74+
@property
75+
def is_running(self) -> bool:
76+
"""Return whether the streaming helper is running.
77+
78+
Returns:
79+
Whether the streaming helper is running.
80+
"""
81+
return not self._task.done()
82+
7083
async def stop(self) -> None:
7184
"""Stop the streaming helper."""
7285
if self._task.done():
@@ -91,6 +104,12 @@ async def _run(self) -> None:
91104
await sender.send(self._transform(msg))
92105
except grpc.aio.AioRpcError as err:
93106
error = err
107+
if error is None and not self._retry_on_exhausted_stream:
108+
_logger.info(
109+
"%s: connection closed, stream exhausted", self._stream_name
110+
)
111+
await self._channel.close()
112+
break
94113
error_str = f"Error: {error}" if error else "Stream exhausted"
95114
interval = self._retry_strategy.next_interval()
96115
if interval is None:

tests/streaming/test_grpc_stream_broadcaster.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def no_retry() -> mock.MagicMock:
4040
async def ok_helper(
4141
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
4242
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
43+
retry_on_exhausted_stream: bool,
4344
) -> AsyncIterator[streaming.GrpcStreamBroadcaster[int, str]]:
4445
"""Fixture for GrpcStreamBroadcaster."""
4546

@@ -55,6 +56,7 @@ async def asynciter(ready_event: asyncio.Event) -> AsyncIterator[int]:
5556
stream_method=lambda: asynciter(receiver_ready_event),
5657
transform=_transformer,
5758
retry_strategy=no_retry,
59+
retry_on_exhausted_stream=retry_on_exhausted_stream,
5860
)
5961
yield helper
6062
await helper.stop()
@@ -79,7 +81,8 @@ async def __anext__(self) -> int:
7981
return self._current
8082

8183

82-
async def test_streaming_success(
84+
@pytest.mark.parametrize("retry_on_exhausted_stream", [True])
85+
async def test_streaming_success_retry_on_exhausted(
8386
ok_helper: streaming.GrpcStreamBroadcaster[
8487
int, str
8588
], # pylint: disable=redefined-outer-name
@@ -113,6 +116,43 @@ async def test_streaming_success(
113116
]
114117

115118

119+
@pytest.mark.parametrize("retry_on_exhausted_stream", [False])
120+
async def test_streaming_success(
121+
ok_helper: streaming.GrpcStreamBroadcaster[
122+
int, str
123+
], # pylint: disable=redefined-outer-name
124+
no_retry: mock.MagicMock, # pylint: disable=redefined-outer-name
125+
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
126+
caplog: pytest.LogCaptureFixture,
127+
) -> None:
128+
"""Test streaming success."""
129+
caplog.set_level(logging.INFO)
130+
items: list[str] = []
131+
async with asyncio.timeout(1):
132+
receiver = ok_helper.new_receiver()
133+
receiver_ready_event.set()
134+
async for item in receiver:
135+
items.append(item)
136+
assert (
137+
no_retry.next_interval.call_count == 0
138+
), "next_interval should not be called when streaming is successful"
139+
140+
assert items == [
141+
"transformed_0",
142+
"transformed_1",
143+
"transformed_2",
144+
"transformed_3",
145+
"transformed_4",
146+
]
147+
assert caplog.record_tuples == [
148+
(
149+
"frequenz.client.base.streaming",
150+
logging.INFO,
151+
"test_helper: connection closed, stream exhausted",
152+
)
153+
]
154+
155+
116156
class _NamedMagicMock(mock.MagicMock):
117157
"""Mock with a name."""
118158

0 commit comments

Comments
 (0)