Skip to content

Commit c40283b

Browse files
committed
Add a StreamFatalError event
This event is sent when the stream is about to stop for good due to an error (this event is NOT sent when the stream stops due to normal stream termination). Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 5ffc7c9 commit c40283b

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
print("Stream started")
2222
case StreamStopped(delay, error):
2323
print(f"Stream stopped, reason {error}, retry in {delay}")
24+
case StreamFatalError(error):
25+
print(f"Stream will stop because of a fatal error: {error}")
2426
case int() as output:
2527
print(f"Received message: {output}")
2628
```

src/frequenz/client/base/streaming.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,15 @@ class StreamStopped:
4242
"""The exception that caused the stream to stop, if any."""
4343

4444

45-
StreamEvent: TypeAlias = StreamStarted | StreamStopped
45+
@dataclass(frozen=True)
46+
class StreamFatalError:
47+
"""Event indicating that the stream has stopped due to an unrecoverable error."""
48+
49+
exception: Exception
50+
"""The exception that caused the stream to stop."""
51+
52+
53+
StreamEvent: TypeAlias = StreamStarted | StreamStopped | StreamFatalError
4654
"""Type alias for the events that can be sent over the stream."""
4755

4856

@@ -83,6 +91,8 @@ def async_range() -> AsyncIterable[int]:
8391
print("Stream started")
8492
case StreamStopped(delay, error):
8593
print(f"Stream stopped, reason {error}, retry in {delay}")
94+
case StreamFatalError(error):
95+
print(f"Stream will stop because of a fatal error: {error}")
8696
case int() as output:
8797
print(f"Received message: {output}")
8898
```
@@ -197,6 +207,8 @@ async def _run(self) -> None:
197207
self._retry_strategy.get_progress(),
198208
error_str,
199209
)
210+
if error is not None:
211+
await sender.send(StreamFatalError(error))
200212
await self._channel.close()
201213
break
202214
_logger.warning(

tests/streaming/test_grpc_stream_broadcaster.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from frequenz.client.base import retry, streaming
1818
from frequenz.client.base.streaming import (
1919
StreamEvent,
20+
StreamFatalError,
2021
StreamStarted,
2122
StreamStopped,
2223
)
@@ -95,7 +96,7 @@ async def _split_message(
9596
events: list[StreamEvent] = []
9697
async for item in receiver:
9798
match item:
98-
case StreamStarted() | StreamStopped() as item:
99+
case StreamStarted() | StreamStopped() | StreamFatalError():
99100
events.append(item)
100101
case str():
101102
items.append(item)
@@ -343,5 +344,6 @@ async def test_messages_on_retry(
343344
),
344345
StreamStarted(),
345346
StreamStopped(exception=mock_error(), retry_time=None),
347+
StreamFatalError(mock_error()),
346348
]
347349
]

0 commit comments

Comments
 (0)