Skip to content

Commit 3e6ed34

Browse files
committed
Streamer: Write out events on the channels as well
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 1eafda5 commit 3e6ed34

File tree

3 files changed

+194
-51
lines changed

3 files changed

+194
-51
lines changed

RELEASE_NOTES.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,19 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* The streaming client now also sends state change events out. Usage example:
14+
```python
15+
recv = streamer.new_receiver()
16+
17+
for msg in recv:
18+
match msg:
19+
case StreamStartedEvent():
20+
print("Stream started")
21+
case StreamStoppedEvent() as event:
22+
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
23+
case int() as output:
24+
print(f"Received message: {output}")
25+
```
1426

1527
## Bug Fixes
1628

src/frequenz/client/base/streaming.py

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import asyncio
77
import logging
88
from collections.abc import Callable
9-
from typing import AsyncIterable, Generic, TypeVar
9+
from dataclasses import dataclass
10+
from datetime import timedelta
11+
from typing import AsyncIterable, Generic, TypeAlias, TypeVar
1012

1113
import grpc.aio
1214

@@ -24,8 +26,70 @@
2426
"""The output type of the stream."""
2527

2628

29+
@dataclass(frozen=True, kw_only=True)
30+
class StreamStartedEvent:
31+
"""Event indicating that the stream has started."""
32+
33+
34+
@dataclass(frozen=True, kw_only=True)
35+
class StreamStoppedEvent:
36+
"""Event indicating that the stream has stopped."""
37+
38+
retry_time: timedelta | None = None
39+
"""Time to wait before retrying the stream, if applicable."""
40+
41+
exception: Exception | None = None
42+
"""The exception that caused the stream to stop, if any."""
43+
44+
45+
StreamEvent: TypeAlias = StreamStartedEvent | StreamStoppedEvent
46+
"""Type alias for the events that can be sent over the stream."""
47+
48+
49+
# Ignore D412: "No blank lines allowed between a section header and its content"
50+
# flake8: noqa: D412
2751
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
28-
"""Helper class to handle grpc streaming methods."""
52+
"""Helper class to handle grpc streaming methods.
53+
54+
This class handles the grpc streaming methods, automatically reconnecting
55+
when the connection is lost, and broadcasting the received messages to
56+
multiple receivers.
57+
58+
The stream is started when the class is initialized, and can be stopped
59+
with the `stop` method. New receivers can be created with the
60+
`new_receiver` method, which will receive the streamed messages.
61+
62+
Additionally to the transformed messages, the broadcaster will also send
63+
state change messages indicating whether the stream is connecting,
64+
connected, or disconnected. These messages can be used to monitor the
65+
state of the stream.
66+
67+
Example:
68+
69+
```python
70+
from frequenz.client.base import GrpcStreamBroadcaster
71+
72+
def async_range() -> AsyncIterable[int]:
73+
yield from range(10)
74+
75+
streamer = GrpcStreamBroadcaster(
76+
stream_name="example_stream",
77+
stream_method=async_range,
78+
transform=lambda msg: msg,
79+
)
80+
81+
recv = streamer.new_receiver()
82+
83+
for msg in recv:
84+
match msg:
85+
case StreamStartedEvent():
86+
print("Stream started")
87+
case StreamStoppedEvent() as event:
88+
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
89+
case int() as output:
90+
print(f"Received message: {output}")
91+
```
92+
"""
2993

3094
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
3195
self,
@@ -55,14 +119,14 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument
55119
)
56120
self._retry_on_exhausted_stream = retry_on_exhausted_stream
57121

58-
self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
122+
self._channel: channels.Broadcast[StreamEvent | OutputT] = channels.Broadcast(
59123
name=f"GrpcStreamBroadcaster-{stream_name}"
60124
)
61125
self._task = asyncio.create_task(self._run())
62126

63127
def new_receiver(
64128
self, maxsize: int = 50, warn_on_overflow: bool = True
65-
) -> channels.Receiver[OutputT]:
129+
) -> channels.Receiver[StreamEvent | OutputT]:
66130
"""Create a new receiver for the stream.
67131
68132
Args:
@@ -107,24 +171,28 @@ async def _run(self) -> None:
107171
_logger.info("%s: starting to stream", self._stream_name)
108172
try:
109173
call = self._stream_method()
174+
await sender.send(StreamStartedEvent())
110175
async for msg in call:
111176
await sender.send(self._transform(msg))
112177
except grpc.aio.AioRpcError as err:
113178
error = err
114-
except Exception as err: # pylint: disable=broad-except
115-
_logger.exception(
116-
"%s: raise an unexpected exception",
117-
self._stream_name,
179+
180+
interval = self._retry_strategy.next_interval()
181+
182+
await sender.send(
183+
StreamStoppedEvent(
184+
retry_time=timedelta(seconds=interval) if interval else None,
185+
exception=error,
118186
)
119-
error = err
187+
)
188+
120189
if error is None and not self._retry_on_exhausted_stream:
121190
_logger.info(
122191
"%s: connection closed, stream exhausted", self._stream_name
123192
)
124193
await self._channel.close()
125194
break
126195
error_str = f"Error: {error}" if error else "Stream exhausted"
127-
interval = self._retry_strategy.next_interval()
128196
if interval is None:
129197
_logger.error(
130198
"%s: connection ended, retry limit exceeded (%s), giving up. %s.",

0 commit comments

Comments
 (0)