-
Couldn't load subscription status.
- Fork 5
Streamer: Write out events on the channels as well #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for emitting structured state change events alongside regular stream messages, updates tests to validate these events, and records the change in the release notes.
- Introduces a
Stateenum andMessagedataclass for connection lifecycle events. - Modifies
GrpcStreamBroadcasterto sendCONNECTED,DISCONNECTED, andCONNECTINGmessages and updates its channel types. - Extends existing tests and adds a new test to assert state events; documents the feature in
RELEASE_NOTES.md.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| src/frequenz/client/base/streaming.py | Define State/Message, emit state events in _run, adjust channel and receiver types, add reconnect method. |
| tests/streaming/test_grpc_stream_broadcaster.py | Capture and assert on Message events in multiple tests; add test_messages_on_retry. |
| RELEASE_NOTES.md | Note the new streaming state events feature. |
Comments suppressed due to low confidence (3)
src/frequenz/client/base/streaming.py:156
- The generic exception handler was removed, so non-gRPC exceptions will now escape the loop and crash the task. Consider reintegrating a broad exception catch (with logging and a corresponding state message) to ensure unexpected errors are reported.
except Exception as err: # pylint: disable=broad-except
src/frequenz/client/base/streaming.py:109
- The new
reconnectmethod isn’t exercised by any tests; consider adding a unit test that callsreconnect()and verifies the previous streaming task is cancelled and a new one starts.
def reconnect(self) -> None:
tests/streaming/test_grpc_stream_broadcaster.py:97
- [nitpick] The test variable
eventsmixes message and state events; renaming it tostate_messagesormessage_eventscould clarify its purpose.
events: list[Message] = []
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds functionality to emit state change events on the streaming channels while updating tests to validate both data items and state messages.
- Introduces a new helper function (_split_message) to separate string items from Message objects.
- Updates the GrpcStreamBroadcaster to send state change events (CONNECTED, DISCONNECTED, CONNECTING) during the stream lifecycle.
- Updates tests and release notes to reflect the new event broadcasting feature.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| tests/streaming/test_grpc_stream_broadcaster.py | Refactored tests to capture both transformed items and state Message events. |
| src/frequenz/client/base/streaming.py | Updated _run logic to emit new state events along with the transformed stream data. |
| RELEASE_NOTES.md | Updated release notes to highlight new state change event emission. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it in general, but I find a few things confusing. For example, I find the State values confusing, streams are not really connected or disconnected, connections are, and for me this might look like the underlying grpc channel is connected or disconnected, but this doesn't have anything to do with that. I would just use started/stopped. Also I would rename "connecting" to "waiting for retry" or something like that, connecting sounds like you sent the SYN and you are waiting for the ACK but it is nothing like that in this case.
Also the Message class name is way too general, I would call it StreamEvent, and new_state instead of just state (not sure if reming State to StreamEventType wouldn't be another option).
I would also add an event for indicating we given up, as the alternative is seeing there is a stopped event and then waiting to see if we get a retrying or not . This is actually wrong, as the iterator will stop when the stream gave up, so we already have an event for that. What we are not doing is to bubble up the exception, if it stopped due to an exception, which would probably be a good idea.
Also since we'll always emit stopped+waiting for retry, it might make more sense to join them instead of emitting 2 events.
And now, while I'm trying to put it all together, Maybe it would be better to split in a few different classes, as we are already using type unions.
Now, putting it all together:
@dataclass(froze=True, kw_only=True) # for consistency, so all events are dataclasses
class StreamStartedEvent:
"""The broadcaster started streaming."""
@dataclass(froze=True, kw_only=True)
class StreamStoppedEvent:
"""The broadcaster stopped streaming and it is waiting to do a retry."""
retry_time: timedelta
"""The time the broadcaster will wait until trying again."""
exception: Exception
"""The exception that made the broadcaster stop."""
StreamEvent: TypeAlias = StreamStartedEvent | StreamStoppedEvent
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
async def _run(self) -> None:
"""Run the streaming helper."""
sender = self._channel.new_sender()
while True:
error: Exception | None = None
_logger.info("%s: starting to stream", self._stream_name)
try:
call = self._stream_method()
await sender.send(StreamStartedEvent())
# Totally unrelated to this PR, but I just realized we have a bug, we never
# reset the retry strategy, this mostly works because we have unlimited
# restarts, but we should probably add the reset() here.
self._retry_strategy.reset() # <---- NEW
async for msg in call:
await sender.send(self._transform(msg))
except grpc.aio.AioRpcError as err:
error = err
except Exception as err: # pylint: disable=broad-except
_logger.exception(
"%s: raise an unexpected exception",
self._stream_name,
)
error = err
if error is None and not self._retry_on_exhausted_stream:
_logger.info(
"%s: connection closed, stream exhausted", self._stream_name
)
# No event here, the async iterator will stop, so the user will know
await self._channel.close()
break
error_str = f"Error: {error}" if error else "Stream exhausted"
interval = self._retry_strategy.next_interval()
if interval is None:
_logger.error(
"%s: connection ended, retry limit exceeded (%s), giving up. %s.",
self._stream_name,
self._retry_strategy.get_progress(),
error_str,
)
# If we exhausted the retries but there is an error, just send the error
if error:
await sender.send(error) # We need to add `| Exception` to the receiver type
await self._channel.close()
break
_logger.warning(
"%s: connection ended, retrying %s in %0.3f seconds. %s.",
self._stream_name,
self._retry_strategy.get_progress(),
interval,
error_str,
)
await sender.send(
StreamStoppedEvent(retry_time=timedelta(seconds=interval), exception=error)
)
await asyncio.sleep(interval)| class State(StrEnum): | ||
| """State of the GrpcStreamBroadcaster.""" | ||
|
|
||
| CONNECTING = "connecting" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just a regular enum using auto? I think this kind of enum allows for free conversion between string and the enum, which is probably something we don't need nor want, right? Also the repr will be overly verbose.
| except Exception as err: # pylint: disable=broad-except | ||
| _logger.exception( | ||
| "%s: raise an unexpected exception", | ||
| self._stream_name, | ||
| ) | ||
| error = err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing this? This is a bug fix I merged very recently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebase artifact. I guess I thought it was removed in upstream when I saw it
519d162 to
a9d961e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR extends the streaming functionality by broadcasting state change events (started and stopped) on the channels.
- Added new event classes (StreamStartedEvent and StreamStoppedEvent) to signal stream state changes.
- Updated tests to split out messages and events, and added a retry-related test.
- Revised error message formatting and provided documentation updates in both code and release notes.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| tests/streaming/test_grpc_stream_broadcaster.py | Updated tests to differentiate between messages and events; added _split_message helper and new retry-related test. |
| src/frequenz/client/base/streaming.py | Modified GrpcStreamBroadcaster to broadcast StreamStartedEvent and StreamStoppedEvent; adjusted error logging and typing. |
| RELEASE_NOTES.md | Added release notes for the new state change events, with an example snippet. |
Signed-off-by: Mathias L. Baumann <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is missing the sending of exceptions when retries are exhausted. Arguably this is a separate feature, so it is OK to implement separately. Also my approach of sensing raw Exception is wrong, because it can overlap if the user wants, for some weird reason, send Exceptions via the channel themselves (the transform function could return exceptions), so we would need to wrap it in a StreamEvent too.
I will give this a go.
| * The streaming client now also sends state change events out. Usage example: | ||
| ```python | ||
| recv = streamer.new_receiver() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: For this to be better rendered it should be:
* The streaming client now also sends state change events out. Usage example:
```python
recv = streamer.new_receiver()
...
```There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will create pr wtih fix
| print(f"Stream stopped, reason {event.exception}, retry in {event.retry_time}") | ||
| case int() as output: | ||
| print(f"Received message: {output}") | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
| # Ignore D412: "No blank lines allowed between a section header and its content" | ||
| # flake8: noqa: D412 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange, are you sure you need this? I never seen it before except for function bodies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think black was always adding the line and flake didn't like it?
|
Oh, no, now I see you actually merged again the plain error without retry with the retry. I will think a bit about that. If you usually need to do different things if the stream was ended (like for real) or not, then it would be better to split them, otherwise the way to tell is doing a 2-step check: match msg:
case StreamStoppedEvent(retry_time=retry_time):
if retry_time is None:
print("No more retries now, do X")
else:
print("We are retrying, do Y")Which feels a bit weird and unidiomatic compared to: match msg:
case StreamStoppedEvent():
print("We are retrying, do Y")
case StreamEndedWithErrorEvent():
print("No more retries now, do X")When doing the first review I was unsure if we should call |
No description provided.