Skip to content

Commit db56d5a

Browse files
committed
Replace StreamStopped with StreamRetrying
The new event is sent only when the stream will retry reconnecting instead of every time the stream stops, as this information is less important. When the stream stops, 2 things can happen, it is retried or it finishes. The new event covers the retry case, and the case when it finishes is covered by the receiver being closed. With the current approach, we also needed to consume from the retry strategy even if no retry was going to be attempted, so with this change we can now only consume from the retry strategy when we actually want to retry. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent b4b97fe commit db56d5a

File tree

3 files changed

+23
-31
lines changed

3 files changed

+23
-31
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
match msg:
2020
case StreamStarted():
2121
print("Stream started")
22-
case StreamStopped(delay, error):
23-
print(f"Stream stopped, reason {error}, retry in {delay}")
22+
case StreamRetrying(delay, error):
23+
print(f"Stream stopped and will retry in {delay}: {error or 'closed'}")
2424
case StreamFatalError(error):
2525
print(f"Stream will stop because of a fatal error: {error}")
2626
case int() as output:

src/frequenz/client/base/streaming.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,18 @@ class StreamStarted:
3232

3333

3434
@dataclass(frozen=True)
35-
class StreamStopped:
35+
class StreamRetrying:
3636
"""Event indicating that the stream has stopped."""
3737

38-
retry_time: timedelta | None = None
39-
"""Time to wait before retrying the stream, if applicable."""
38+
delay: timedelta
39+
"""Time to wait before retrying to start the stream again."""
4040

4141
exception: Exception | None = None
42-
"""The exception that caused the stream to stop, if any."""
42+
"""The exception that caused the stream to stop, if any.
43+
44+
If `None`, the stream was stopped without an error, e.g. the server closed the
45+
stream.
46+
"""
4347

4448

4549
@dataclass(frozen=True)
@@ -50,7 +54,7 @@ class StreamFatalError:
5054
"""The exception that caused the stream to stop."""
5155

5256

53-
StreamEvent: TypeAlias = StreamStarted | StreamStopped | StreamFatalError
57+
StreamEvent: TypeAlias = StreamStarted | StreamRetrying | StreamFatalError
5458
"""Type alias for the events that can be sent over the stream."""
5559

5660

@@ -89,8 +93,8 @@ def async_range() -> AsyncIterable[int]:
8993
match msg:
9094
case StreamStarted():
9195
print("Stream started")
92-
case StreamStopped(delay, error):
93-
print(f"Stream stopped, reason {error}, retry in {delay}")
96+
case StreamRetrying(delay, error):
97+
print(f"Stream stopped and will retry in {delay}: {error or 'closed'}")
9498
case StreamFatalError(error):
9599
print(f"Stream will stop because of a fatal error: {error}")
96100
case int() as output:
@@ -184,23 +188,14 @@ async def _run(self) -> None:
184188
except grpc.aio.AioRpcError as err:
185189
error = err
186190

187-
interval = self._retry_strategy.next_interval()
188-
189-
await sender.send(
190-
StreamStopped(
191-
retry_time=(
192-
timedelta(seconds=interval) if interval is not None else None
193-
),
194-
exception=error,
195-
)
196-
)
197-
198191
if error is None and not self._retry_on_exhausted_stream:
199192
_logger.info(
200193
"%s: connection closed, stream exhausted", self._stream_name
201194
)
202195
await self._channel.close()
203196
break
197+
198+
interval = self._retry_strategy.next_interval()
204199
error_str = f"Error: {error}" if error else "Stream exhausted"
205200
if interval is None:
206201
_logger.error(
@@ -220,4 +215,6 @@ async def _run(self) -> None:
220215
interval,
221216
error_str,
222217
)
218+
219+
await sender.send(StreamRetrying(timedelta(seconds=interval), error))
223220
await asyncio.sleep(interval)

tests/streaming/test_grpc_stream_broadcaster.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
from frequenz.client.base.streaming import (
2020
StreamEvent,
2121
StreamFatalError,
22+
StreamRetrying,
2223
StreamStarted,
23-
StreamStopped,
2424
)
2525

2626

@@ -97,7 +97,7 @@ async def _split_message(
9797
events: list[StreamEvent] = []
9898
async for item in receiver:
9999
match item:
100-
case StreamStarted() | StreamStopped() | StreamFatalError():
100+
case StreamStarted() | StreamRetrying() | StreamFatalError():
101101
events.append(item)
102102
case str():
103103
items.append(item)
@@ -149,9 +149,7 @@ async def test_streaming_success_retry_on_exhausted(
149149
"transformed_3",
150150
"transformed_4",
151151
]
152-
assert events == [
153-
StreamStopped(exception=None, retry_time=None),
154-
]
152+
assert events == []
155153

156154
assert caplog.record_tuples == [
157155
(
@@ -182,7 +180,7 @@ async def test_streaming_success(
182180
receiver_ready_event.set()
183181
items, events = await _split_message(receiver)
184182

185-
no_retry.next_interval.assert_called_once_with()
183+
no_retry.next_interval.assert_not_called()
186184

187185
assert items == [
188186
"transformed_0",
@@ -191,9 +189,7 @@ async def test_streaming_success(
191189
"transformed_3",
192190
"transformed_4",
193191
]
194-
assert events == [
195-
StreamStopped(exception=None, retry_time=None),
196-
]
192+
assert events == []
197193
assert caplog.record_tuples == [
198194
(
199195
"frequenz.client.base.streaming",
@@ -344,8 +340,7 @@ async def test_messages_on_retry(
344340
]
345341
assert events == [
346342
StreamStarted(),
347-
StreamStopped(timedelta(seconds=0.0), error),
343+
StreamRetrying(timedelta(seconds=0.0), error),
348344
StreamStarted(),
349-
StreamStopped(None, error),
350345
StreamFatalError(error),
351346
]

0 commit comments

Comments
 (0)