Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* Introduced 'GrpcStreamBroadcaster' from the base client to enable keep-alive options for gRPC streaming.
* The 'ChannelOptions' are currently set to the base client default, but can be change as an input.

* Add receiver method (optional through flag) in addition to the iterator.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
44 changes: 35 additions & 9 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ async def list_single_component_data(
resampling_period: timedelta | None,
include_states: bool = False,
include_bounds: bool = False,
stream_as_receiver: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for a single metric.

Expand All @@ -234,6 +235,7 @@ async def list_single_component_data(
resampling_period: The period for resampling the data.
include_states: Whether to include the state data.
include_bounds: Whether to include the bound data.
stream_as_receiver: Whether to return a receiver instead of using an iterator.

Yields:
A named tuple with the following fields:
Expand All @@ -248,6 +250,7 @@ async def list_single_component_data(
resampling_period=resampling_period,
include_states=include_states,
include_bounds=include_bounds,
stream_as_receiver=stream_as_receiver,
):
for entry in batch:
yield entry
Expand All @@ -263,6 +266,7 @@ async def list_microgrid_components_data(
resampling_period: timedelta | None,
include_states: bool = False,
include_bounds: bool = False,
stream_as_receiver: bool = False,
) -> AsyncIterator[MetricSample]:
"""Iterate over the data for multiple microgrids and components.

Expand All @@ -275,6 +279,7 @@ async def list_microgrid_components_data(
resampling_period: The period for resampling the data.
include_states: Whether to include the state data.
include_bounds: Whether to include the bound data.
stream_as_receiver: Whether to return a receiver instead of using an iterator.

Yields:
A named tuple with the following fields:
Expand All @@ -292,6 +297,7 @@ async def list_microgrid_components_data(
resampling_period=resampling_period,
include_states=include_states,
include_bounds=include_bounds,
stream_as_receiver=stream_as_receiver,
):
for entry in batch:
yield entry
Expand All @@ -308,6 +314,7 @@ async def _list_microgrid_components_data_batch(
resampling_period: timedelta | None,
include_states: bool = False,
include_bounds: bool = False,
stream_as_receiver: bool = False,
) -> AsyncIterator[ComponentsDataBatch]:
"""Iterate over the component data batches in the stream using GrpcStreamBroadcaster.

Expand All @@ -319,9 +326,11 @@ async def _list_microgrid_components_data_batch(
resampling_period: The period for resampling the data.
include_states: Whether to include the state data.
include_bounds: Whether to include the bound data.
stream_as_receiver: Whether to return a receiver instead of using an iterator.

Yields:
A ComponentsDataBatch object of microgrid components data.
Returns:
AsyncIterator[ComponentsDataBatch]: An asynchronous iterator yielding
`ComponentsDataBatch` objects.
"""
microgrid_components_pb = [
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
Expand Down Expand Up @@ -393,7 +402,7 @@ async def stream_method() -> (
async for response in cast(
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
call_iterator,
):
): # call_iterator:
yield response

broadcaster = GrpcStreamBroadcaster(
Expand All @@ -404,8 +413,15 @@ async def stream_method() -> (
)

receiver = broadcaster.new_receiver()
async for data in receiver:
yield data

if stream_as_receiver:
# Yield from receiver instead of using an iterator
async for data in receiver:
yield data
else:
# Default: Use the original iterator approach
async for data in broadcaster.new_receiver():
yield data

async def receive_aggregated_data(
self,
Expand All @@ -416,6 +432,7 @@ async def receive_aggregated_data(
start: datetime | None,
end: datetime | None,
resampling_period: timedelta,
stream_as_receiver: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method should be split into two, one returning the receiver, and the other one calling the same but loops over the receiver and yields an iterator.

Probably this method should return the receiver and we add a new method e.g. iterate_aggregated_data that yields the iterator. There is a big drawback thought, which is that this would be a breaking change. Maybe there are better options?

) -> AsyncIterator[MetricSample]:
"""Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.

Expand All @@ -427,9 +444,11 @@ async def receive_aggregated_data(
start: start datetime, if None, the earliest available data will be used
end: end datetime, if None starts streaming indefinitely from start
resampling_period: The period for resampling the data.
stream_as_receiver: Whether to return a receiver instead of using an iterator.

Yields:
An iterator over the aggregated metric samples.
Returns:
AsyncIterator[MetricSample]: An asynchronous iterator yielding
aggregated metric samples.

Raises:
ValueError: If the resampling_period is not provided.
Expand Down Expand Up @@ -486,5 +505,12 @@ async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
)

receiver = broadcaster.new_receiver()
async for data in receiver:
yield data

if stream_as_receiver:
# Yield from receiver instead of using an iterator
async for data in receiver:
yield data
else:
# Default: Use the original iterator approach
async for data in broadcaster.new_receiver():
yield data