Skip to content

Conversation

@Marenz
Copy link
Contributor

@Marenz Marenz commented Jun 5, 2025

This allows us to use all the options available in the streamer.

Copilot AI review requested due to automatic review settings June 5, 2025 09:20
@Marenz Marenz requested review from a team as code owners June 5, 2025 09:20
@github-actions github-actions bot added part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests part:dispatcher labels Jun 5, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR updates DispatchApiClient.stream() to return a GrpcStreamBroadcaster (a streamer) instead of a simple receiver, allowing callers to spawn customizable receivers via new_receiver().
Key changes:

  • Refactored stream() implementation to return a broadcaster and inlined stream-management logic.
  • Updated tests to call stream.new_receiver() before receiving messages.
  • Documented the breaking change and example usage in RELEASE_NOTES.md.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
tests/test_client.py Updated test to use stream.new_receiver().receive()
src/frequenz/client/dispatch/_client.py Changed return type of stream(), removed _get_stream, inlined logic
RELEASE_NOTES.md Added note about stream() now returning a streamer and example
Comments suppressed due to low confidence (3)

src/frequenz/client/dispatch/_client.py:239

  • The Returns section is outdated: update it to indicate this method returns a streamer (GrpcStreamBroadcaster), not a receiver.
    A receiver channel to receive the stream of dispatch events.

tests/test_client.py:313

  • [nitpick] Consider renaming the variable stream to something like streamer to reflect that it’s a broadcaster and avoid confusion with the receiver returned by new_receiver().
message = await stream.new_receiver().receive()

src/frequenz/client/dispatch/_client.py:213

  • The signature references GrpcStreamBroadcaster but it isn’t imported. Add from frequenz.client.base.channel import GrpcStreamBroadcaster to the top of the file.
def stream(

This allows us to use all the options available in the streamer.

Signed-off-by: Mathias L. Baumann <[email protected]>
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced about this, GrpcStreamBroadcaster is a "low-level" class designed to be used internally by clients. Also the stream() might be misleading as a name, as it is now only getting you a GrpcStreamBroadcaster.

Another option would be to just accept the same arguments as GrpcStreamBroadcaster.new_receiver() in your stream() method.

On the other hand, if we return a streamer, it means less code duplication, and having a common interface to "configuring" the receiver for all streaming methods. If we want to go that route, maybe we can create a Protocol to only expose the parts of the GrpcStreamBroadcaster we want to be public. We even already have something like this, the ReceiverFetcher, but it more general, so I think the new_receiver() it takes no arguments (or maybe only the buffer size).

In any case, I think we need to talk about this more broadly, so all clients provide the same interface. If one client returns a GrpcStreamBroadcaster and another one returns a plain receiver, then our clients will become inconsistent.

@Marenz Marenz closed this Jun 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:dispatcher part:docs Affects the documentation part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants