Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,15 @@
# Frequenz Client Base Library Release Notes

## Summary

<!-- Here goes a general summary of what this release is about -->

## Upgrading

* Updated interface and behavior for HMAC

This introduces a new positional argument to `parse_grpc_uri`.
If calling this function manually and passing `ChannelOptions`, it is recommended
to switch to passing `ChannelOptions` via keyword argument.

* All parameters of the Streamers `new_receiver` method are now keyword-only arguments. This means that you must specify them by name when calling the method, e.g.:
* There is very minor breaking change in this release, `GrpcStreamBroadcaster` now requires a `grpc.aio.UnaryStreamCall` instead of a `AsyncIterable` for the `stream_method` argument. In practice all users should be passing a `grpc.aio.UnaryStreamCall` already, so this should not affect anyone unless they are doing something very strange.

```python
recv = streamer.new_receiver(max_size=50, warn_on_overflow=True)
```

## New Features

* The streaming client, when using `new_receiver(include_events=True)`, will now return a receiver that yields stream notification events, such as `StreamStarted`, `StreamRetrying`, and `StreamFatalError`. This allows you to monitor the state of the stream:
## Bug Fixes

```python
recv = streamer.new_receiver(include_events=True)
* `GrpcStreamBroadcaster`: Fix potential long delays on retries, and giving up early if the number of retries is limited.

for msg in recv:
match msg:
case StreamStarted():
print("Stream started")
case StreamRetrying(delay, error):
print(f"Stream stopped and will retry in {delay}: {error or 'closed'}")
case StreamFatalError(error):
print(f"Stream will stop because of a fatal error: {error}")
case int() as output:
print(f"Received message: {output}")
```
The retry strategy was not reset after a successful start of the stream, so the back-off delays would accumulate over multiple retries and eventually give up if the number of retries were limited, even if there was a successful start of the stream in between. Now we properly reset the retry strategy after a successful start of the stream (successfully receiving the first item from the stream).

## Bug Fixes
* `GrpcStreamBroadcaster`: Fix `StreamStarted` event firing too soon.

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
The `StreamStarted` event was being fired as soon as the streamming method was called, but that doesn't mean that a streamming connection was established with the server at all, which can give a false impression that the stream is active and working. Now we wait until we receive the initial metadata from the server before firing the `StreamStarted` event. That should give users a better indication that the stream is actually active and working without having to wait for the first item to be received, which can take a long time for some low-frequency streams.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
]
requires-python = ">= 3.11, < 4"
dependencies = [
"frequenz-channels >= 1.6.1, < 2",
"frequenz-channels >= 1.8.0, < 2",
"grpcio >= 1.59, < 2",
"protobuf >= 5.29.2, < 7",
"typing-extensions >= 4.6.0, < 5",
Expand Down
55 changes: 30 additions & 25 deletions src/frequenz/client/base/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from collections.abc import Callable
from dataclasses import dataclass
from datetime import timedelta
from typing import AsyncIterable, Generic, Literal, TypeAlias, TypeVar, overload
from typing import Generic, Literal, TypeAlias, TypeVar, overload

import grpc.aio

Expand All @@ -19,6 +19,10 @@
_logger = logging.getLogger(__name__)


RequestT = TypeVar("RequestT")
"""The request type of the stream."""


InputT = TypeVar("InputT")
"""The input type of the stream."""

Expand Down Expand Up @@ -76,31 +80,20 @@ class GrpcStreamBroadcaster(Generic[InputT, OutputT]):

Example:
```python
from typing import Any
from frequenz.client.base import (
GrpcStreamBroadcaster,
StreamFatalError,
StreamRetrying,
StreamStarted,
)
from frequenz.channels import Receiver # Assuming Receiver is available

# Dummy async iterable for demonstration
async def async_range(fail_after: int = -1) -> AsyncIterable[int]:
for i in range(10):
if fail_after != -1 and i >= fail_after:
raise grpc.aio.AioRpcError(
code=grpc.StatusCode.UNAVAILABLE,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details="Simulated error"
)
yield i
await asyncio.sleep(0.1)
from frequenz.channels import Receiver

async def main():
stub: Any = ... # The gRPC stub
streamer = GrpcStreamBroadcaster(
stream_name="example_stream",
stream_method=lambda: async_range(fail_after=3),
stream_method=stub.MyStreamingMethod,
transform=lambda msg: msg * 2, # transform messages
retry_on_exhausted_stream=False,
)
Expand Down Expand Up @@ -156,7 +149,7 @@ async def consume_data():
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
self,
stream_name: str,
stream_method: Callable[[], AsyncIterable[InputT]],
stream_method: Callable[[], grpc.aio.UnaryStreamCall[RequestT, InputT]],
transform: Callable[[InputT], OutputT],
retry_strategy: retry.Strategy | None = None,
retry_on_exhausted_stream: bool = False,
Expand Down Expand Up @@ -206,7 +199,7 @@ def new_receiver(
*,
maxsize: int = 50,
warn_on_overflow: bool = True,
include_events: Literal[True],
include_events: bool,
) -> channels.Receiver[StreamEvent | OutputT]: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here they can pass False from a variable and still get typed StreamEvent | OutputT. And there's another overload above that would give OutputT when users send Literal[False]. I understand that it is unusable with literals and that in practice we're just adding support for variables while not providing any narrowing. But still, I feel it is a bad idea to do any overloads based on literals and maybe there's a type based solution to this, with sentinels, etc. But I can't think of a nice scheme at the moment, so happy to postpone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we are providing narrowing when using a literal, so x.new_receiver(include_events=False) (which is the default) should be typed -> OutputT. The problem is we can properly narrow the return type when a variable is passed. So I would say it is half-useful, but since this half will probably be used in 95% of the cases (the default), I think it is still pretty useful.

g: GrpcStreamBroadcaster[int, str] = GrpcStreamBroadcaster[int, str](
    stream_name="test",
    stream_method=None,  # type: ignore
)
reveal_type(obj: g.new_receiver())

Gives:

Type of "g.new_receiver()" is "Receiver[str]"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's true. It is a problem only when people start to mess with things. :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I hit the issue only because I parametrized a test, so I started passing a variable to include_events, but I don't think it will be a common case in practice.


def new_receiver(
Expand All @@ -215,7 +208,7 @@ def new_receiver(
maxsize: int = 50,
warn_on_overflow: bool = True,
include_events: bool = False,
) -> channels.Receiver[OutputT] | channels.Receiver[StreamEvent | OutputT]:
) -> channels.Receiver[StreamEvent | OutputT]:
"""Create a new receiver for the stream.

Args:
Expand Down Expand Up @@ -272,35 +265,47 @@ async def stop(self) -> None:
await self._task
except asyncio.CancelledError:
pass
await self._data_channel.close()
await self._data_channel.aclose()
if self._event_channel is not None:
await self._event_channel.close()
await self._event_channel.aclose()

async def _run(self) -> None:
"""Run the streaming helper."""
data_sender = self._data_channel.new_sender()

while True:
error: Exception | None = None
first_message_received = False
_logger.info("%s: starting to stream", self._stream_name)
try:
call = self._stream_method()

# We await for the initial metadata before sending a
# StreamStarted event. This is the best indication we have of a
# successful connection without delaying it until the first
# message is received, which might happen a long time after the
# "connection" was established.
await call.initial_metadata()
if self._event_sender:
await self._event_sender.send(StreamStarted())

async for msg in call:
first_message_received = True
await data_sender.send(self._transform(msg))

except grpc.aio.AioRpcError as err:
error = err

if first_message_received:
self._retry_strategy.reset()

if error is None and not self._retry_on_exhausted_stream:
_logger.info(
"%s: connection closed, stream exhausted", self._stream_name
)
await self._data_channel.close()
await self._data_channel.aclose()
if self._event_channel is not None:
await self._event_channel.close()
await self._event_channel.aclose()
break

interval = self._retry_strategy.next_interval()
Expand All @@ -314,9 +319,9 @@ async def _run(self) -> None:
)
if error is not None and self._event_sender:
await self._event_sender.send(StreamFatalError(error))
await self._data_channel.close()
await self._data_channel.aclose()
if self._event_channel is not None:
await self._event_channel.close()
await self._event_channel.aclose()
break
_logger.warning(
"%s: connection ended, retrying %s in %0.3f seconds. %s.",
Expand Down
Loading