Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* The streaming client now also sends state change events out. Usage example:
```python
recv = streamer.new_receiver()
Comment on lines +13 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: For this to be better rendered it should be:

* The streaming client now also sends state change events out. Usage example:

    ```python
        recv = streamer.new_receiver()
        ...
    ```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will create pr wtih fix


for msg in recv:
match msg:
case StreamStartedEvent():
print("Stream started")
case StreamStoppedEvent() as event:
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
case int() as output:
print(f"Received message: {output}")
```

## Bug Fixes

Expand Down
88 changes: 78 additions & 10 deletions src/frequenz/client/base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import asyncio
import logging
from collections.abc import Callable
from typing import AsyncIterable, Generic, TypeVar
from dataclasses import dataclass
from datetime import timedelta
from typing import AsyncIterable, Generic, TypeAlias, TypeVar

import grpc.aio

Expand All @@ -24,8 +26,70 @@
"""The output type of the stream."""


@dataclass(frozen=True, kw_only=True)
class StreamStartedEvent:
"""Event indicating that the stream has started."""


@dataclass(frozen=True, kw_only=True)
class StreamStoppedEvent:
"""Event indicating that the stream has stopped."""

retry_time: timedelta | None = None
"""Time to wait before retrying the stream, if applicable."""

exception: Exception | None = None
"""The exception that caused the stream to stop, if any."""


StreamEvent: TypeAlias = StreamStartedEvent | StreamStoppedEvent
"""Type alias for the events that can be sent over the stream."""


# Ignore D412: "No blank lines allowed between a section header and its content"
# flake8: noqa: D412
Comment on lines +49 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange, are you sure you need this? I never seen it before except for function bodies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think black was always adding the line and flake didn't like it?

class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
"""Helper class to handle grpc streaming methods."""
"""Helper class to handle grpc streaming methods.

This class handles the grpc streaming methods, automatically reconnecting
when the connection is lost, and broadcasting the received messages to
multiple receivers.

The stream is started when the class is initialized, and can be stopped
with the `stop` method. New receivers can be created with the
`new_receiver` method, which will receive the streamed messages.

Additionally to the transformed messages, the broadcaster will also send
state change messages indicating whether the stream is connecting,
connected, or disconnected. These messages can be used to monitor the
state of the stream.

Example:

```python
from frequenz.client.base import GrpcStreamBroadcaster

def async_range() -> AsyncIterable[int]:
yield from range(10)

streamer = GrpcStreamBroadcaster(
stream_name="example_stream",
stream_method=async_range,
transform=lambda msg: msg,
)

recv = streamer.new_receiver()

for msg in recv:
match msg:
case StreamStartedEvent():
print("Stream started")
case StreamStoppedEvent() as event:
print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}")
case int() as output:
print(f"Received message: {output}")
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

"""

def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
self,
Expand Down Expand Up @@ -55,14 +119,14 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument
)
self._retry_on_exhausted_stream = retry_on_exhausted_stream

self._channel: channels.Broadcast[OutputT] = channels.Broadcast(
self._channel: channels.Broadcast[StreamEvent | OutputT] = channels.Broadcast(
name=f"GrpcStreamBroadcaster-{stream_name}"
)
self._task = asyncio.create_task(self._run())

def new_receiver(
self, maxsize: int = 50, warn_on_overflow: bool = True
) -> channels.Receiver[OutputT]:
) -> channels.Receiver[StreamEvent | OutputT]:
"""Create a new receiver for the stream.

Args:
Expand Down Expand Up @@ -107,24 +171,28 @@ async def _run(self) -> None:
_logger.info("%s: starting to stream", self._stream_name)
try:
call = self._stream_method()
await sender.send(StreamStartedEvent())
async for msg in call:
await sender.send(self._transform(msg))
except grpc.aio.AioRpcError as err:
error = err
except Exception as err: # pylint: disable=broad-except
_logger.exception(
"%s: raise an unexpected exception",
self._stream_name,

interval = self._retry_strategy.next_interval()

await sender.send(
StreamStoppedEvent(
retry_time=timedelta(seconds=interval) if interval else None,
exception=error,
)
error = err
)

if error is None and not self._retry_on_exhausted_stream:
_logger.info(
"%s: connection closed, stream exhausted", self._stream_name
)
await self._channel.close()
break
error_str = f"Error: {error}" if error else "Stream exhausted"
interval = self._retry_strategy.next_interval()
if interval is None:
_logger.error(
"%s: connection ended, retry limit exceeded (%s), giving up. %s.",
Expand Down
Loading
Loading