Skip to content

Conversation

@llucax
Copy link
Contributor

@llucax llucax commented May 28, 2025

After the streaming started successfully we should reset the retry strategy, so next time there is a failure we can retry from the beginning of the retry strategy again.

Copilot AI review requested due to automatic review settings May 28, 2025 08:34
@llucax llucax requested a review from a team as a code owner May 28, 2025 08:34
@llucax llucax self-assigned this May 28, 2025
@github-actions github-actions bot added part:docs Affects the documentation part:code Affects the code in general labels May 28, 2025
@llucax llucax requested review from Marenz and shsms May 28, 2025 08:34
@llucax llucax added this to the v0.9.0 milestone May 28, 2025

This comment was marked as outdated.

@llucax llucax enabled auto-merge May 28, 2025 08:34
Marenz
Marenz previously approved these changes May 28, 2025
Copy link
Contributor

@Marenz Marenz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make a test for that?

Comment on lines 173 to 193
call = self._stream_method()
self._retry_strategy.reset()
await sender.send(StreamStartedEvent())
async for msg in call:
await sender.send(self._transform(msg))
Copy link
Contributor

@shsms shsms May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling we need to make a flag and put the StreamStartedEvent and the retry_strategy reset inside the loop.

Until we try to iterate over the call, we don't know if it was successful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially for some kinds of issues, like the streaming method on the API is failing before sending any data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, true, but for me this is good enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamStarted sounds like there were messages. StreamOpened might be better in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, it looks like we need to think a bit more about this, but I think it is very subtle when to consider a stream being started. For me if the stream request was sent, it started, and any errors after that is a normal retry. In any case, discussing this could take some time and effort, and I think that is a separate discussion and could be a future improvement, but it is a bug not ever reset the retry strategy. So I suggest to merge this an open a different issue for this if we want to change it, because also I think I already spent more time than I should have improving the streaming events 😬, so unless someone else is willing to pick it up, I don't see much happening beyond this PR and #149.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it's my fault, misunderstanding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this and I don't think putting the reset (let's leave the event for a separate issue, as this PR is not touching on that, but I think the arguments for reset applies to the event too) is really not an option. A stream could be idle for a long time until something is sent through it. If errors are really delayed until the returned iterator is awaited, then maybe the only option would be to try to get the first element with some very short timeout, but I'm not sure if we have any guarantees. For example if the other end disappeared, I'm not sure any errors will be raised until the connection really times out for good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI overlords suggest using await call.initial_metadata() to "survey for errors" (or wait_for_connection(), but this one is marked as experimental, and not sure if it is only about the underlying channel connection or if it will also catch other errors that happen after the connection, like a NOT FOUND or something like that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the AI overlords are not hallucinating, it seems like it should work.

sequenceDiagram
    actor Client
    actor Server

    %% Phase 0: Client-Side Setup
    Note over Client: Phase 0: Client-Side Setup
    Client->>Client: Creates gRPC channel
    Client->>Client: Instantiates service stub

    %% Phase 1: RPC Invocation & Connection Establishment
    Note over Client: Phase 1: RPC Invocation & Connection Establishment
    Client->>Client: Calls unary_stream method (call = stub.Method())
    Client->>Client: Executes await call.initial_metadata()

    opt TCP/TLS Connection Not Established
        Client->>Server: TCP: SYN
        Server-->>Client: TCP: SYN-ACK
        Client->>Server: TCP: ACK
        Note over Client,Server: TCP Connection Established

        Client->>Server: TLS: ClientHello
        Server-->>Client: TLS: ServerHello, Certificate, etc.
        Client->>Server: TLS: KeyExchange, ChangeCipherSpec
        Server-->>Client: TLS: ChangeCipherSpec
        Note over Client,Server: TLS Handshake Complete

        Client->>Server: HTTP/2: Connection Preface (Magic + SETTINGS)
        Server-->>Client: HTTP/2: SETTINGS frame
        Note over Client,Server: HTTP/2 Connection Initialized
    end

    Note over Client,Server: RPC Request over established HTTP/2
    Client->>Server: HTTP/2: HEADERS (method, metadata, :status=200, content-type=application/grpc)
    Client->>Server: HTTP/2: DATA (serialized request_message, END_STREAM=true)
    Server-->>Client: HTTP/2: HEADERS (initial server metadata, END_STREAM=false)
    Note over Client: await call.initial_metadata() completes

    %% Phase 2: Server Streaming Responses
    Note over Client,Server: Phase 2: Server Streaming Responses
    loop For each message in stream
        Note over Client: Enters/awaits in `async for msg in call`
        Server-->>Client: HTTP/2: DATA (serialized response_message_N, END_STREAM=false)
        Client->>Client: Receives and deserializes msg_N
    end

    %% Phase 3: Stream Termination (Normal)
    Note over Client,Server: Phase 3: Stream Termination (Normal)
    Note over Client: Awaits in `async for` (server finished sending)
    Server-->>Client: HTTP/2: HEADERS (Trailers: grpc-status=OK, END_STREAM=true)
    Note over Client: `async for` loop terminates
    Client->>Client: Accesses call.code(), call.details()

    %% Alternate Flow: Error During Stream
    alt Error During Stream
        Note over Client,Server: Alternate Flow: Error During Stream
        Note over Client: Awaits next message in `async for msg in call`
        Server-->>Client: HTTP/2: HEADERS (Trailers: grpc-status=ERROR_CODE, grpc-message, END_STREAM=true)
        Note over Client: AioRpcError raised
        Client->>Client: Accesses call.code(), call.details()
    end
Loading

@llucax llucax added the type:bug Something isn't working label May 28, 2025
@llucax llucax disabled auto-merge May 28, 2025 12:20
@llucax llucax modified the milestones: v0.9.0, v0.11.0 May 28, 2025
@llucax llucax marked this pull request as draft May 28, 2025 12:29
@github-actions github-actions bot added the part:tests Affects the unit, integration and performance (benchmarks) tests label Jun 2, 2025
@llucax
Copy link
Contributor Author

llucax commented Jun 2, 2025

Updated, still the test needs to be fixed because to do this we can't take an arbitrary async iterator anymore, we need to take a proper UnaryStreamCall to be able to call initial_metadata().

Signed-off-by: Leandro Lucarella <[email protected]>
@github-actions github-actions bot added the part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) label Aug 28, 2025
@llucax llucax marked this pull request as ready for review August 28, 2025 19:01
@llucax
Copy link
Contributor Author

llucax commented Aug 28, 2025

OK, this is ready for review again.

After considering where to put the reset(), I think the best place is after the first message is received. Otherwise a stream that doesn't receive anything and is exhausted will still be considered successfully connected, and the retry will be reset. That seems weird, particularly in the scenario where retry_on_exhausted_stream=True, but in general if the server is saying the stream is over again and again, never being able to receive any messages it sounds to me like something is wrong and we should do a back-off between tries (and eventually give up if the number of retries are limited) instead of resetting the strategy.

I didn't change when the StreamStarted event is emitted though, but I added the await for the initial metadata before sending the event. Again I think this is probably the best trade-off between knowing with some certainty that the streaming successfully started without having to wait for the first message, which could take a long time for low-frequency streams (like dispatch updates).

@llucax llucax requested a review from Marenz August 28, 2025 19:12
@llucax llucax requested review from Copilot and shsms August 28, 2025 19:12
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes the retry strategy behavior in the GrpcStreamBroadcaster by resetting the retry strategy after a successful stream start, and improves the timing of the StreamStarted event.

  • Reset retry strategy after receiving the first message to prevent accumulating delays across retries
  • Wait for initial metadata before firing StreamStarted event for better connection indication
  • Update channel closing methods and dependency version

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
src/frequenz/client/base/streaming.py Add retry strategy reset logic and improve StreamStarted event timing
tests/streaming/test_grpc_stream_broadcaster.py Refactor test mocks and add test for retry strategy reset behavior
pyproject.toml Update frequenz-channels dependency version requirement
RELEASE_NOTES.md Document bug fixes and breaking changes

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

llucax added 12 commits August 28, 2025 21:22
After the streaming started successfully we should reset the retry
strategy, so next time there is a failure we can retry from the beginning
of the retry strategy again.

Signed-off-by: Leandro Lucarella <[email protected]>
Sending the `StreamStarted` event as soon as the streaming method is
called can give a false impression that the stream is active and working,
when in reality it might not be connected to the server at all.

Now we wait until we receive the initial metadata from the server before
firing the `StreamStarted` event. That should give users a better
indication that the stream is actually active and working without having
to wait for the first item to be received, which can take a long time
for some low-frequency streams.

Signed-off-by: Leandro Lucarella <[email protected]>
This removes the need for an extra adaptor lambda when passing the
generator as a callable.

Signed-off-by: Leandro Lucarella <[email protected]>
The `stream_method` is not a simple async iterator anymore, as we need
to also use the `initial_metadata()` method from gRPC calls. Because of
this we now need more complex mocking.

The new `unary_stream_call_mock()` facilitates the mocking of unary
stream gRPC calls.

Signed-off-by: Leandro Lucarella <[email protected]>
Same as in the previous commit, we need to mock the `stream_method` with
a full `UnaryStreamCall` instead of an `AsyncIterator`.

Signed-off-by: Leandro Lucarella <[email protected]>
Before the changes to the errored stream tests, the generator used for
error tests was not re-created each time the streamer reconnected. This
means that after the first error, the stream would resume correctly.

Before this was not an issue because the retry strategy was never reset,
so when retry was set to 2, it didn't matter that the stream was resumed
successfully, the next retry would still reduce the number or further
retry attempts.

Now that the retry strategy is properly reset, we need to make sure to
keep the errored channel in error state, otherwise the retry counter
will always be reset and never give up.

Because of this, now we don't receive messages anymore in the test after
the first failure.

Signed-off-by: Leandro Lucarella <[email protected]>
The most common case will probably be that after a failure while
iterating, then the stream will also fail at `initial_metadata()`. To
test this case we add support to making the errored stream start failing
`initial_metadata()` after it enters the error state.

We also keep the old test case where it only fails while iterating,
there might be cases where the server sends some wrong/unexpected
message consistently that will only fail when the message is received
and not during the initial metadata exchange.

Signed-off-by: Leandro Lucarella <[email protected]>
The overloads for `new_receiver()`'s `include_events` parameter used
`Literal[True]` and `Literal[False]` to differentiate the return types,
but this requires users to call the method only using literals, which is
a bit too restrictive.

This commit changes the type hints to use just `bool` for the parameter
for the general case. It also simplifies the type hints for the return
type.

Signed-off-by: Leandro Lucarella <[email protected]>
This makes sure it is clear what this argument is on the call site.

Signed-off-by: Leandro Lucarella <[email protected]>
Signed-off-by: Leandro Lucarella <[email protected]>
This requires bumping the minimum dependency version of
'frequenz-channels' to 1.8.0.

Signed-off-by: Leandro Lucarella <[email protected]>
@llucax
Copy link
Contributor Author

llucax commented Sep 2, 2025

@shsms do you want to give it a look too as you also reviewed in the past?

Copy link
Contributor

@shsms shsms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment for future improvement.

warn_on_overflow: bool = True,
include_events: Literal[True],
include_events: bool,
) -> channels.Receiver[StreamEvent | OutputT]: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here they can pass False from a variable and still get typed StreamEvent | OutputT. And there's another overload above that would give OutputT when users send Literal[False]. I understand that it is unusable with literals and that in practice we're just adding support for variables while not providing any narrowing. But still, I feel it is a bad idea to do any overloads based on literals and maybe there's a type based solution to this, with sentinels, etc. But I can't think of a nice scheme at the moment, so happy to postpone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we are providing narrowing when using a literal, so x.new_receiver(include_events=False) (which is the default) should be typed -> OutputT. The problem is we can properly narrow the return type when a variable is passed. So I would say it is half-useful, but since this half will probably be used in 95% of the cases (the default), I think it is still pretty useful.

g: GrpcStreamBroadcaster[int, str] = GrpcStreamBroadcaster[int, str](
    stream_name="test",
    stream_method=None,  # type: ignore
)
reveal_type(obj: g.new_receiver())

Gives:

Type of "g.new_receiver()" is "Receiver[str]"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's true. It is a problem only when people start to mess with things. :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I hit the issue only because I parametrized a test, so I started passing a variable to include_events, but I don't think it will be a common case in practice.

@llucax llucax added this pull request to the merge queue Sep 2, 2025
Merged via the queue into frequenz-floss:v0.x.x with commit 4afe440 Sep 2, 2025
5 checks passed
@llucax llucax deleted the fix-reset branch September 2, 2025 09:26
@llucax llucax added this to the v0.11.1 milestone Sep 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:code Affects the code in general part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) type:bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants