Skip to content

Commit 59efdb5

Browse files
authored
Improve streamer events (frequenz-floss#149)
This PR contains several improvements to streamer events: - **Remove `Event` suffix from stream events** (for shorter names) - **Make stream event dataclasses not keyword-only** (for more compact pattern matching) - **Add a `StreamFatalError` event** (for errors when the stream stops for good and can't be retried) - **Replace `StreamStopped` with `StreamRetrying`** (only sent when the stream is retrying, not when it stops for good) It also includes some minor improvements and fixes: - **Send some valid messages in `test_messages_on_retry`** - **Lower the retry interval to 0.0 in tests** - **Do a proper full comparison for events in tests** - **Fix example indentation and blank line** - **Fix typo** - **Improve release notes formatting**
2 parents 2b3baec + db56d5a commit 59efdb5

File tree

3 files changed

+96
-87
lines changed

3 files changed

+96
-87
lines changed

RELEASE_NOTES.md

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313
* The streaming client now also sends state change events out. Usage example:
1414

1515
```python
16-
recv = streamer.new_receiver()
17-
18-
for msg in recv:
19-
match msg:
20-
case StreamStartedEvent():
21-
print("Stream started")
22-
case StreamStoppedEvent() as event:
23-
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
24-
case int() as output:
25-
print(f"Received message: {output}")
16+
recv = streamer.new_receiver()
17+
18+
for msg in recv:
19+
match msg:
20+
case StreamStarted():
21+
print("Stream started")
22+
case StreamRetrying(delay, error):
23+
print(f"Stream stopped and will retry in {delay}: {error or 'closed'}")
24+
case StreamFatalError(error):
25+
print(f"Stream will stop because of a fatal error: {error}")
26+
case int() as output:
27+
print(f"Received message: {output}")
2628
```
2729

2830
## Bug Fixes

src/frequenz/client/base/streaming.py

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,38 @@
2626
"""The output type of the stream."""
2727

2828

29-
@dataclass(frozen=True, kw_only=True)
30-
class StreamStartedEvent:
29+
@dataclass(frozen=True)
30+
class StreamStarted:
3131
"""Event indicating that the stream has started."""
3232

3333

34-
@dataclass(frozen=True, kw_only=True)
35-
class StreamStoppedEvent:
34+
@dataclass(frozen=True)
35+
class StreamRetrying:
3636
"""Event indicating that the stream has stopped."""
3737

38-
retry_time: timedelta | None = None
39-
"""Time to wait before retrying the stream, if applicable."""
38+
delay: timedelta
39+
"""Time to wait before retrying to start the stream again."""
4040

4141
exception: Exception | None = None
42-
"""The exception that caused the stream to stop, if any."""
42+
"""The exception that caused the stream to stop, if any.
4343
44+
If `None`, the stream was stopped without an error, e.g. the server closed the
45+
stream.
46+
"""
47+
48+
49+
@dataclass(frozen=True)
50+
class StreamFatalError:
51+
"""Event indicating that the stream has stopped due to an unrecoverable error."""
52+
53+
exception: Exception
54+
"""The exception that caused the stream to stop."""
4455

45-
StreamEvent: TypeAlias = StreamStartedEvent | StreamStoppedEvent
56+
57+
StreamEvent: TypeAlias = StreamStarted | StreamRetrying | StreamFatalError
4658
"""Type alias for the events that can be sent over the stream."""
4759

4860

49-
# Ignore D412: "No blank lines allowed between a section header and its content"
50-
# flake8: noqa: D412
5161
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
5262
"""Helper class to handle grpc streaming methods.
5363
@@ -65,30 +75,31 @@ class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
6575
state of the stream.
6676
6777
Example:
78+
```python
79+
from frequenz.client.base import GrpcStreamBroadcaster
80+
81+
def async_range() -> AsyncIterable[int]:
82+
yield from range(10)
83+
84+
streamer = GrpcStreamBroadcaster(
85+
stream_name="example_stream",
86+
stream_method=async_range,
87+
transform=lambda msg: msg,
88+
)
6889
69-
```python
70-
from frequenz.client.base import GrpcStreamBroadcaster
71-
72-
def async_range() -> AsyncIterable[int]:
73-
yield from range(10)
74-
75-
streamer = GrpcStreamBroadcaster(
76-
stream_name="example_stream",
77-
stream_method=async_range,
78-
transform=lambda msg: msg,
79-
)
80-
81-
recv = streamer.new_receiver()
82-
83-
for msg in recv:
84-
match msg:
85-
case StreamStartedEvent():
86-
print("Stream started")
87-
case StreamStoppedEvent() as event:
88-
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
89-
case int() as output:
90-
print(f"Received message: {output}")
91-
```
90+
recv = streamer.new_receiver()
91+
92+
for msg in recv:
93+
match msg:
94+
case StreamStarted():
95+
print("Stream started")
96+
case StreamRetrying(delay, error):
97+
print(f"Stream stopped and will retry in {delay}: {error or 'closed'}")
98+
case StreamFatalError(error):
99+
print(f"Stream will stop because of a fatal error: {error}")
100+
case int() as output:
101+
print(f"Received message: {output}")
102+
```
92103
"""
93104

94105
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
@@ -104,7 +115,7 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument
104115
Args:
105116
stream_name: A name to identify the stream in the logs.
106117
stream_method: A function that returns the grpc stream. This function is
107-
called everytime the connection is lost and we want to retry.
118+
called every time the connection is lost and we want to retry.
108119
transform: A function to transform the input type to the output type.
109120
retry_strategy: The retry strategy to use, when the connection is lost. Defaults
110121
to retries every 3 seconds, with a jitter of 1 second, indefinitely.
@@ -171,27 +182,20 @@ async def _run(self) -> None:
171182
_logger.info("%s: starting to stream", self._stream_name)
172183
try:
173184
call = self._stream_method()
174-
await sender.send(StreamStartedEvent())
185+
await sender.send(StreamStarted())
175186
async for msg in call:
176187
await sender.send(self._transform(msg))
177188
except grpc.aio.AioRpcError as err:
178189
error = err
179190

180-
interval = self._retry_strategy.next_interval()
181-
182-
await sender.send(
183-
StreamStoppedEvent(
184-
retry_time=timedelta(seconds=interval) if interval else None,
185-
exception=error,
186-
)
187-
)
188-
189191
if error is None and not self._retry_on_exhausted_stream:
190192
_logger.info(
191193
"%s: connection closed, stream exhausted", self._stream_name
192194
)
193195
await self._channel.close()
194196
break
197+
198+
interval = self._retry_strategy.next_interval()
195199
error_str = f"Error: {error}" if error else "Stream exhausted"
196200
if interval is None:
197201
_logger.error(
@@ -200,6 +204,8 @@ async def _run(self) -> None:
200204
self._retry_strategy.get_progress(),
201205
error_str,
202206
)
207+
if error is not None:
208+
await sender.send(StreamFatalError(error))
203209
await self._channel.close()
204210
break
205211
_logger.warning(
@@ -209,4 +215,6 @@ async def _run(self) -> None:
209215
interval,
210216
error_str,
211217
)
218+
219+
await sender.send(StreamRetrying(timedelta(seconds=interval), error))
212220
await asyncio.sleep(interval)

tests/streaming/test_grpc_stream_broadcaster.py

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,17 @@
1010
from datetime import timedelta
1111
from unittest import mock
1212

13+
import grpc
1314
import grpc.aio
1415
import pytest
1516
from frequenz.channels import Receiver
1617

1718
from frequenz.client.base import retry, streaming
1819
from frequenz.client.base.streaming import (
1920
StreamEvent,
20-
StreamStartedEvent,
21-
StreamStoppedEvent,
21+
StreamFatalError,
22+
StreamRetrying,
23+
StreamStarted,
2224
)
2325

2426

@@ -43,12 +45,12 @@ def no_retry() -> mock.MagicMock:
4345
return mock_retry
4446

4547

46-
def mock_error() -> grpc.aio.AioRpcError:
48+
def make_error() -> grpc.aio.AioRpcError:
4749
"""Mock error for testing."""
4850
return grpc.aio.AioRpcError(
49-
code=mock.MagicMock(name="mock grpc code"),
50-
initial_metadata=mock.MagicMock(),
51-
trailing_metadata=mock.MagicMock(),
51+
code=grpc.StatusCode.UNAVAILABLE,
52+
initial_metadata=grpc.aio.Metadata(),
53+
trailing_metadata=grpc.aio.Metadata(),
5254
details="mock details",
5355
debug_error_string="mock debug_error_string",
5456
)
@@ -95,7 +97,7 @@ async def _split_message(
9597
events: list[StreamEvent] = []
9698
async for item in receiver:
9799
match item:
98-
case StreamStartedEvent() | StreamStoppedEvent() as item:
100+
case StreamStarted() | StreamRetrying() | StreamFatalError():
99101
events.append(item)
100102
case str():
101103
items.append(item)
@@ -147,9 +149,7 @@ async def test_streaming_success_retry_on_exhausted(
147149
"transformed_3",
148150
"transformed_4",
149151
]
150-
assert events == [
151-
StreamStoppedEvent(exception=None, retry_time=None),
152-
]
152+
assert events == []
153153

154154
assert caplog.record_tuples == [
155155
(
@@ -180,7 +180,7 @@ async def test_streaming_success(
180180
receiver_ready_event.set()
181181
items, events = await _split_message(receiver)
182182

183-
no_retry.next_interval.assert_called_once_with()
183+
no_retry.next_interval.assert_not_called()
184184

185185
assert items == [
186186
"transformed_0",
@@ -189,9 +189,7 @@ async def test_streaming_success(
189189
"transformed_3",
190190
"transformed_4",
191191
]
192-
assert events == [
193-
StreamStoppedEvent(exception=None, retry_time=None),
194-
]
192+
assert events == []
195193
assert caplog.record_tuples == [
196194
(
197195
"frequenz.client.base.streaming",
@@ -221,7 +219,7 @@ async def test_streaming_error( # pylint: disable=too-many-arguments
221219
"""Test streaming errors."""
222220
caplog.set_level(logging.INFO)
223221

224-
error = mock_error()
222+
error = make_error()
225223

226224
helper = streaming.GrpcStreamBroadcaster(
227225
stream_name="test_helper",
@@ -268,7 +266,7 @@ async def test_retry_next_interval_zero( # pylint: disable=too-many-arguments
268266
) -> None:
269267
"""Test retry logic when next_interval returns 0."""
270268
caplog.set_level(logging.WARNING)
271-
error = mock_error()
269+
error = make_error()
272270
mock_retry = mock.MagicMock(spec=retry.Strategy)
273271
mock_retry.next_interval.side_effect = [0, None]
274272
mock_retry.copy.return_value = mock_retry
@@ -310,17 +308,18 @@ async def test_messages_on_retry(
310308
receiver_ready_event: asyncio.Event, # pylint: disable=redefined-outer-name
311309
) -> None:
312310
"""Test that messages are sent on retry."""
311+
# We need to use a specific instance for all the test here because 2 errors created
312+
# with the same arguments don't compare equal (grpc.aio.AioRpcError doesn't seem to
313+
# provide a __eq__ method).
314+
error = make_error()
315+
313316
helper = streaming.GrpcStreamBroadcaster(
314317
stream_name="test_helper",
315318
stream_method=lambda: _ErroringAsyncIter(
316-
mock_error(),
317-
receiver_ready_event,
319+
error, receiver_ready_event, num_successes=2
318320
),
319321
transform=_transformer,
320-
retry_strategy=retry.LinearBackoff(
321-
limit=1,
322-
interval=0.01,
323-
),
322+
retry_strategy=retry.LinearBackoff(limit=1, interval=0.0, jitter=0.0),
324323
retry_on_exhausted_stream=True,
325324
)
326325

@@ -333,15 +332,15 @@ async def test_messages_on_retry(
333332
receiver_ready_event.set()
334333
items, events = await _split_message(receiver)
335334

336-
assert items == []
337-
assert [type(e) for e in events] == [
338-
type(e)
339-
for e in [
340-
StreamStartedEvent(),
341-
StreamStoppedEvent(
342-
exception=mock_error(), retry_time=timedelta(seconds=0.01)
343-
),
344-
StreamStartedEvent(),
345-
StreamStoppedEvent(exception=mock_error(), retry_time=None),
346-
]
335+
assert items == [
336+
"transformed_0",
337+
"transformed_1",
338+
"transformed_0",
339+
"transformed_1",
340+
]
341+
assert events == [
342+
StreamStarted(),
343+
StreamRetrying(timedelta(seconds=0.0), error),
344+
StreamStarted(),
345+
StreamFatalError(error),
347346
]

0 commit comments

Comments
 (0)