From f8a6fabc09d7abfcedca21f66dbd797dcf76cbd9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 11:43:45 +0200 Subject: [PATCH 01/10] Fix example indentation and blank line "Example:" is a section, so it cannot have a black line after it. Also it should be properly indented to be rendered correctly. Signed-off-by: Leandro Lucarella --- src/frequenz/client/base/streaming.py | 47 +++++++++++++-------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index 2e95bcd..36f0ffd 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -46,8 +46,6 @@ class StreamStoppedEvent: """Type alias for the events that can be sent over the stream.""" -# Ignore D412: "No blank lines allowed between a section header and its content" -# flake8: noqa: D412 class GrpcStreamBroadcaster(Generic[InputT, OutputT]): """Helper class to handle grpc streaming methods. @@ -65,30 +63,29 @@ class GrpcStreamBroadcaster(Generic[InputT, OutputT]): state of the stream. Example: + ```python + from frequenz.client.base import GrpcStreamBroadcaster - ```python - from frequenz.client.base import GrpcStreamBroadcaster - - def async_range() -> AsyncIterable[int]: - yield from range(10) - - streamer = GrpcStreamBroadcaster( - stream_name="example_stream", - stream_method=async_range, - transform=lambda msg: msg, - ) - - recv = streamer.new_receiver() - - for msg in recv: - match msg: - case StreamStartedEvent(): - print("Stream started") - case StreamStoppedEvent() as event: - print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") - case int() as output: - print(f"Received message: {output}") - ``` + def async_range() -> AsyncIterable[int]: + yield from range(10) + + streamer = GrpcStreamBroadcaster( + stream_name="example_stream", + stream_method=async_range, + transform=lambda msg: msg, + ) + + recv = streamer.new_receiver() + + for msg in recv: + match msg: + case StreamStartedEvent(): + print("Stream started") + case StreamStoppedEvent() as event: + print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") + case int() as output: + print(f"Received message: {output}") + ``` """ def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments From fed6f0c2b5cfaa9d58ec081db708f9a3a128e0b2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 14:23:02 +0200 Subject: [PATCH 02/10] Unindent example code in RELEASE_NOTES.md There is no need for the extra indentation inside the code block. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index b2bc245..7b881e4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,16 +13,16 @@ * The streaming client now also sends state change events out. Usage example: ```python - recv = streamer.new_receiver() - - for msg in recv: - match msg: - case StreamStartedEvent(): - print("Stream started") - case StreamStoppedEvent() as event: - print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") - case int() as output: - print(f"Received message: {output}") + recv = streamer.new_receiver() + + for msg in recv: + match msg: + case StreamStartedEvent(): + print("Stream started") + case StreamStoppedEvent() as event: + print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") + case int() as output: + print(f"Received message: {output}") ``` ## Bug Fixes From 03a1932684d54e7966a6c5dac6487913f3ff43a7 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 11:49:16 +0200 Subject: [PATCH 03/10] Remove `Event` suffix from stream events The suffix doesn't add a lot of extra clarity and make names too long. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 4 ++-- src/frequenz/client/base/streaming.py | 14 +++++++------- .../streaming/test_grpc_stream_broadcaster.py | 18 +++++++++--------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7b881e4..72c6720 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -17,9 +17,9 @@ for msg in recv: match msg: - case StreamStartedEvent(): + case StreamStarted(): print("Stream started") - case StreamStoppedEvent() as event: + case StreamStopped() as event: print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") case int() as output: print(f"Received message: {output}") diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index 36f0ffd..df7030d 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -27,12 +27,12 @@ @dataclass(frozen=True, kw_only=True) -class StreamStartedEvent: +class StreamStarted: """Event indicating that the stream has started.""" @dataclass(frozen=True, kw_only=True) -class StreamStoppedEvent: +class StreamStopped: """Event indicating that the stream has stopped.""" retry_time: timedelta | None = None @@ -42,7 +42,7 @@ class StreamStoppedEvent: """The exception that caused the stream to stop, if any.""" -StreamEvent: TypeAlias = StreamStartedEvent | StreamStoppedEvent +StreamEvent: TypeAlias = StreamStarted | StreamStopped """Type alias for the events that can be sent over the stream.""" @@ -79,9 +79,9 @@ def async_range() -> AsyncIterable[int]: for msg in recv: match msg: - case StreamStartedEvent(): + case StreamStarted(): print("Stream started") - case StreamStoppedEvent() as event: + case StreamStopped() as event: print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") case int() as output: print(f"Received message: {output}") @@ -168,7 +168,7 @@ async def _run(self) -> None: _logger.info("%s: starting to stream", self._stream_name) try: call = self._stream_method() - await sender.send(StreamStartedEvent()) + await sender.send(StreamStarted()) async for msg in call: await sender.send(self._transform(msg)) except grpc.aio.AioRpcError as err: @@ -177,7 +177,7 @@ async def _run(self) -> None: interval = self._retry_strategy.next_interval() await sender.send( - StreamStoppedEvent( + StreamStopped( retry_time=timedelta(seconds=interval) if interval else None, exception=error, ) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index a9a0bef..bdaa7ae 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -17,8 +17,8 @@ from frequenz.client.base import retry, streaming from frequenz.client.base.streaming import ( StreamEvent, - StreamStartedEvent, - StreamStoppedEvent, + StreamStarted, + StreamStopped, ) @@ -95,7 +95,7 @@ async def _split_message( events: list[StreamEvent] = [] async for item in receiver: match item: - case StreamStartedEvent() | StreamStoppedEvent() as item: + case StreamStarted() | StreamStopped() as item: events.append(item) case str(): items.append(item) @@ -148,7 +148,7 @@ async def test_streaming_success_retry_on_exhausted( "transformed_4", ] assert events == [ - StreamStoppedEvent(exception=None, retry_time=None), + StreamStopped(exception=None, retry_time=None), ] assert caplog.record_tuples == [ @@ -190,7 +190,7 @@ async def test_streaming_success( "transformed_4", ] assert events == [ - StreamStoppedEvent(exception=None, retry_time=None), + StreamStopped(exception=None, retry_time=None), ] assert caplog.record_tuples == [ ( @@ -337,11 +337,11 @@ async def test_messages_on_retry( assert [type(e) for e in events] == [ type(e) for e in [ - StreamStartedEvent(), - StreamStoppedEvent( + StreamStarted(), + StreamStopped( exception=mock_error(), retry_time=timedelta(seconds=0.01) ), - StreamStartedEvent(), - StreamStoppedEvent(exception=mock_error(), retry_time=None), + StreamStarted(), + StreamStopped(exception=mock_error(), retry_time=None), ] ] From f0de54ce845fd4cdac214c77c1d317683c5376ae Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 11:54:29 +0200 Subject: [PATCH 04/10] Make stream event dataclasses not keyword-only Since all attributes have different types, it is not too error prone to allow using positional arguments, and this makes the dataclass automatically provides a `__match__` dunder method that allows for less verbose match syntax. We also update examples to use the match syntax to extra the event information more easily. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 4 ++-- src/frequenz/client/base/streaming.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 72c6720..1391f61 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,8 +19,8 @@ match msg: case StreamStarted(): print("Stream started") - case StreamStopped() as event: - print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") + case StreamStopped(delay, error): + print(f"Stream stopped, reason {error}, retry in {delay}") case int() as output: print(f"Received message: {output}") ``` diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index df7030d..7053699 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -26,12 +26,12 @@ """The output type of the stream.""" -@dataclass(frozen=True, kw_only=True) +@dataclass(frozen=True) class StreamStarted: """Event indicating that the stream has started.""" -@dataclass(frozen=True, kw_only=True) +@dataclass(frozen=True) class StreamStopped: """Event indicating that the stream has stopped.""" @@ -81,8 +81,8 @@ def async_range() -> AsyncIterable[int]: match msg: case StreamStarted(): print("Stream started") - case StreamStopped() as event: - print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") + case StreamStopped(delay, error): + print(f"Stream stopped, reason {error}, retry in {delay}") case int() as output: print(f"Received message: {output}") ``` From 5ffc7c9efe7fbe6028e9d100ae16f8f8ef462383 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 11:59:39 +0200 Subject: [PATCH 05/10] Fix typo Signed-off-by: Leandro Lucarella --- src/frequenz/client/base/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index 7053699..cdaea01 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -101,7 +101,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument Args: stream_name: A name to identify the stream in the logs. stream_method: A function that returns the grpc stream. This function is - called everytime the connection is lost and we want to retry. + called every time the connection is lost and we want to retry. transform: A function to transform the input type to the output type. retry_strategy: The retry strategy to use, when the connection is lost. Defaults to retries every 3 seconds, with a jitter of 1 second, indefinitely. From c40283b19a205561630cdf38392e376e19112773 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 12:53:12 +0200 Subject: [PATCH 06/10] Add a `StreamFatalError` event This event is sent when the stream is about to stop for good due to an error (this event is NOT sent when the stream stops due to normal stream termination). Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 2 ++ src/frequenz/client/base/streaming.py | 14 +++++++++++++- tests/streaming/test_grpc_stream_broadcaster.py | 4 +++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1391f61..bfe19a4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -21,6 +21,8 @@ print("Stream started") case StreamStopped(delay, error): print(f"Stream stopped, reason {error}, retry in {delay}") + case StreamFatalError(error): + print(f"Stream will stop because of a fatal error: {error}") case int() as output: print(f"Received message: {output}") ``` diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index cdaea01..c7f227a 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -42,7 +42,15 @@ class StreamStopped: """The exception that caused the stream to stop, if any.""" -StreamEvent: TypeAlias = StreamStarted | StreamStopped +@dataclass(frozen=True) +class StreamFatalError: + """Event indicating that the stream has stopped due to an unrecoverable error.""" + + exception: Exception + """The exception that caused the stream to stop.""" + + +StreamEvent: TypeAlias = StreamStarted | StreamStopped | StreamFatalError """Type alias for the events that can be sent over the stream.""" @@ -83,6 +91,8 @@ def async_range() -> AsyncIterable[int]: print("Stream started") case StreamStopped(delay, error): print(f"Stream stopped, reason {error}, retry in {delay}") + case StreamFatalError(error): + print(f"Stream will stop because of a fatal error: {error}") case int() as output: print(f"Received message: {output}") ``` @@ -197,6 +207,8 @@ async def _run(self) -> None: self._retry_strategy.get_progress(), error_str, ) + if error is not None: + await sender.send(StreamFatalError(error)) await self._channel.close() break _logger.warning( diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index bdaa7ae..94a7abc 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -17,6 +17,7 @@ from frequenz.client.base import retry, streaming from frequenz.client.base.streaming import ( StreamEvent, + StreamFatalError, StreamStarted, StreamStopped, ) @@ -95,7 +96,7 @@ async def _split_message( events: list[StreamEvent] = [] async for item in receiver: match item: - case StreamStarted() | StreamStopped() as item: + case StreamStarted() | StreamStopped() | StreamFatalError(): events.append(item) case str(): items.append(item) @@ -343,5 +344,6 @@ async def test_messages_on_retry( ), StreamStarted(), StreamStopped(exception=mock_error(), retry_time=None), + StreamFatalError(mock_error()), ] ] From b023e923a3dc03c18b3b3a8a7e4f5e4a2727bd70 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 13:03:08 +0200 Subject: [PATCH 07/10] Do a proper full comparison for events in tests Only the types of the events were checked by the tests, not their state, which doesn't really validate everything works properly. We couldn't do a proper full comparison because we were using mocks inside the error object, and mocks have non-deterministic representations, as they have a unique ID. To fix this we just create a full gRPC error instance. This is not enough though, we also need to use a specific instance because 2 gRPC error created the same are not equal. Finally, we also need to set the retry strategy jitter to 0 so we can make sure we have a predictable retry_time. Signed-off-by: Leandro Lucarella --- .../streaming/test_grpc_stream_broadcaster.py | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index 94a7abc..93d8bb5 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -10,6 +10,7 @@ from datetime import timedelta from unittest import mock +import grpc import grpc.aio import pytest from frequenz.channels import Receiver @@ -44,12 +45,12 @@ def no_retry() -> mock.MagicMock: return mock_retry -def mock_error() -> grpc.aio.AioRpcError: +def make_error() -> grpc.aio.AioRpcError: """Mock error for testing.""" return grpc.aio.AioRpcError( - code=mock.MagicMock(name="mock grpc code"), - initial_metadata=mock.MagicMock(), - trailing_metadata=mock.MagicMock(), + code=grpc.StatusCode.UNAVAILABLE, + initial_metadata=grpc.aio.Metadata(), + trailing_metadata=grpc.aio.Metadata(), details="mock details", debug_error_string="mock debug_error_string", ) @@ -222,7 +223,7 @@ async def test_streaming_error( # pylint: disable=too-many-arguments """Test streaming errors.""" caplog.set_level(logging.INFO) - error = mock_error() + error = make_error() helper = streaming.GrpcStreamBroadcaster( stream_name="test_helper", @@ -269,7 +270,7 @@ async def test_retry_next_interval_zero( # pylint: disable=too-many-arguments ) -> None: """Test retry logic when next_interval returns 0.""" caplog.set_level(logging.WARNING) - error = mock_error() + error = make_error() mock_retry = mock.MagicMock(spec=retry.Strategy) mock_retry.next_interval.side_effect = [0, None] mock_retry.copy.return_value = mock_retry @@ -311,17 +312,19 @@ async def test_messages_on_retry( receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name ) -> None: """Test that messages are sent on retry.""" + # We need to use a specific instance for all the test here because 2 errors created + # with the same arguments don't compare equal (grpc.aio.AioRpcError doesn't seem to + # provide a __eq__ method). + error = make_error() + helper = streaming.GrpcStreamBroadcaster( stream_name="test_helper", stream_method=lambda: _ErroringAsyncIter( - mock_error(), + error, receiver_ready_event, ), transform=_transformer, - retry_strategy=retry.LinearBackoff( - limit=1, - interval=0.01, - ), + retry_strategy=retry.LinearBackoff(limit=1, interval=0.01, jitter=0.0), retry_on_exhausted_stream=True, ) @@ -335,15 +338,10 @@ async def test_messages_on_retry( items, events = await _split_message(receiver) assert items == [] - assert [type(e) for e in events] == [ - type(e) - for e in [ - StreamStarted(), - StreamStopped( - exception=mock_error(), retry_time=timedelta(seconds=0.01) - ), - StreamStarted(), - StreamStopped(exception=mock_error(), retry_time=None), - StreamFatalError(mock_error()), - ] + assert events == [ + StreamStarted(), + StreamStopped(timedelta(seconds=0.01), error), + StreamStarted(), + StreamStopped(None, error), + StreamFatalError(error), ] From 4cc449c89b94773a6ec5bc2176296aa4f21f0969 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 13:09:16 +0200 Subject: [PATCH 08/10] Lower the retry interval to 0.0 in tests This is just to speed up tests, we don't really need to wait. This also uncovered a bug in the creation of the `StreamStopped` instance, which was converting `0.0` to `None`. Signed-off-by: Leandro Lucarella --- src/frequenz/client/base/streaming.py | 4 +++- tests/streaming/test_grpc_stream_broadcaster.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index c7f227a..f75f7dc 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -188,7 +188,9 @@ async def _run(self) -> None: await sender.send( StreamStopped( - retry_time=timedelta(seconds=interval) if interval else None, + retry_time=( + timedelta(seconds=interval) if interval is not None else None + ), exception=error, ) ) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index 93d8bb5..a1888ab 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -324,7 +324,7 @@ async def test_messages_on_retry( receiver_ready_event, ), transform=_transformer, - retry_strategy=retry.LinearBackoff(limit=1, interval=0.01, jitter=0.0), + retry_strategy=retry.LinearBackoff(limit=1, interval=0.0, jitter=0.0), retry_on_exhausted_stream=True, ) @@ -340,7 +340,7 @@ async def test_messages_on_retry( assert items == [] assert events == [ StreamStarted(), - StreamStopped(timedelta(seconds=0.01), error), + StreamStopped(timedelta(seconds=0.0), error), StreamStarted(), StreamStopped(None, error), StreamFatalError(error), From b4b97fe8298bdd4d6dcc03959523e9e387225587 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 13:13:49 +0200 Subject: [PATCH 09/10] Send some valid messages in `test_messages_on_retry` This also ensures the stream is really being restarted. Signed-off-by: Leandro Lucarella --- tests/streaming/test_grpc_stream_broadcaster.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index a1888ab..503cd2f 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -320,8 +320,7 @@ async def test_messages_on_retry( helper = streaming.GrpcStreamBroadcaster( stream_name="test_helper", stream_method=lambda: _ErroringAsyncIter( - error, - receiver_ready_event, + error, receiver_ready_event, num_successes=2 ), transform=_transformer, retry_strategy=retry.LinearBackoff(limit=1, interval=0.0, jitter=0.0), @@ -337,7 +336,12 @@ async def test_messages_on_retry( receiver_ready_event.set() items, events = await _split_message(receiver) - assert items == [] + assert items == [ + "transformed_0", + "transformed_1", + "transformed_0", + "transformed_1", + ] assert events == [ StreamStarted(), StreamStopped(timedelta(seconds=0.0), error), From db56d5a2ce5462e8e300e08281433a1b01b5f234 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 28 May 2025 13:18:26 +0200 Subject: [PATCH 10/10] Replace `StreamStopped` with `StreamRetrying` The new event is sent only when the stream will retry reconnecting instead of every time the stream stops, as this information is less important. When the stream stops, 2 things can happen, it is retried or it finishes. The new event covers the retry case, and the case when it finishes is covered by the receiver being closed. With the current approach, we also needed to consume from the retry strategy even if no retry was going to be attempted, so with this change we can now only consume from the retry strategy when we actually want to retry. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 4 +-- src/frequenz/client/base/streaming.py | 33 +++++++++---------- .../streaming/test_grpc_stream_broadcaster.py | 17 ++++------ 3 files changed, 23 insertions(+), 31 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bfe19a4..9a57d26 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -19,8 +19,8 @@ match msg: case StreamStarted(): print("Stream started") - case StreamStopped(delay, error): - print(f"Stream stopped, reason {error}, retry in {delay}") + case StreamRetrying(delay, error): + print(f"Stream stopped and will retry in {delay}: {error or 'closed'}") case StreamFatalError(error): print(f"Stream will stop because of a fatal error: {error}") case int() as output: diff --git a/src/frequenz/client/base/streaming.py b/src/frequenz/client/base/streaming.py index f75f7dc..8c5533d 100644 --- a/src/frequenz/client/base/streaming.py +++ b/src/frequenz/client/base/streaming.py @@ -32,14 +32,18 @@ class StreamStarted: @dataclass(frozen=True) -class StreamStopped: +class StreamRetrying: """Event indicating that the stream has stopped.""" - retry_time: timedelta | None = None - """Time to wait before retrying the stream, if applicable.""" + delay: timedelta + """Time to wait before retrying to start the stream again.""" exception: Exception | None = None - """The exception that caused the stream to stop, if any.""" + """The exception that caused the stream to stop, if any. + + If `None`, the stream was stopped without an error, e.g. the server closed the + stream. + """ @dataclass(frozen=True) @@ -50,7 +54,7 @@ class StreamFatalError: """The exception that caused the stream to stop.""" -StreamEvent: TypeAlias = StreamStarted | StreamStopped | StreamFatalError +StreamEvent: TypeAlias = StreamStarted | StreamRetrying | StreamFatalError """Type alias for the events that can be sent over the stream.""" @@ -89,8 +93,8 @@ def async_range() -> AsyncIterable[int]: match msg: case StreamStarted(): print("Stream started") - case StreamStopped(delay, error): - print(f"Stream stopped, reason {error}, retry in {delay}") + case StreamRetrying(delay, error): + print(f"Stream stopped and will retry in {delay}: {error or 'closed'}") case StreamFatalError(error): print(f"Stream will stop because of a fatal error: {error}") case int() as output: @@ -184,23 +188,14 @@ async def _run(self) -> None: except grpc.aio.AioRpcError as err: error = err - interval = self._retry_strategy.next_interval() - - await sender.send( - StreamStopped( - retry_time=( - timedelta(seconds=interval) if interval is not None else None - ), - exception=error, - ) - ) - if error is None and not self._retry_on_exhausted_stream: _logger.info( "%s: connection closed, stream exhausted", self._stream_name ) await self._channel.close() break + + interval = self._retry_strategy.next_interval() error_str = f"Error: {error}" if error else "Stream exhausted" if interval is None: _logger.error( @@ -220,4 +215,6 @@ async def _run(self) -> None: interval, error_str, ) + + await sender.send(StreamRetrying(timedelta(seconds=interval), error)) await asyncio.sleep(interval) diff --git a/tests/streaming/test_grpc_stream_broadcaster.py b/tests/streaming/test_grpc_stream_broadcaster.py index 503cd2f..b96fadf 100644 --- a/tests/streaming/test_grpc_stream_broadcaster.py +++ b/tests/streaming/test_grpc_stream_broadcaster.py @@ -19,8 +19,8 @@ from frequenz.client.base.streaming import ( StreamEvent, StreamFatalError, + StreamRetrying, StreamStarted, - StreamStopped, ) @@ -97,7 +97,7 @@ async def _split_message( events: list[StreamEvent] = [] async for item in receiver: match item: - case StreamStarted() | StreamStopped() | StreamFatalError(): + case StreamStarted() | StreamRetrying() | StreamFatalError(): events.append(item) case str(): items.append(item) @@ -149,9 +149,7 @@ async def test_streaming_success_retry_on_exhausted( "transformed_3", "transformed_4", ] - assert events == [ - StreamStopped(exception=None, retry_time=None), - ] + assert events == [] assert caplog.record_tuples == [ ( @@ -182,7 +180,7 @@ async def test_streaming_success( receiver_ready_event.set() items, events = await _split_message(receiver) - no_retry.next_interval.assert_called_once_with() + no_retry.next_interval.assert_not_called() assert items == [ "transformed_0", @@ -191,9 +189,7 @@ async def test_streaming_success( "transformed_3", "transformed_4", ] - assert events == [ - StreamStopped(exception=None, retry_time=None), - ] + assert events == [] assert caplog.record_tuples == [ ( "frequenz.client.base.streaming", @@ -344,8 +340,7 @@ async def test_messages_on_retry( ] assert events == [ StreamStarted(), - StreamStopped(timedelta(seconds=0.0), error), + StreamRetrying(timedelta(seconds=0.0), error), StreamStarted(), - StreamStopped(None, error), StreamFatalError(error), ]