diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9bde965..41fa9f5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,6 +13,8 @@ * Introduced 'GrpcStreamBroadcaster' from the base client to enable keep-alive options for gRPC streaming. * The 'ChannelOptions' are currently set to the base client default, but can be change as an input. +* Add receiver method (optional through flag) in addition to the iterator. + ## Bug Fixes diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 3bc8737..53373b3 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -222,6 +222,7 @@ async def list_single_component_data( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, + stream_as_receiver: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for a single metric. @@ -234,6 +235,7 @@ async def list_single_component_data( resampling_period: The period for resampling the data. include_states: Whether to include the state data. include_bounds: Whether to include the bound data. + stream_as_receiver: Whether to return a receiver instead of using an iterator. Yields: A named tuple with the following fields: @@ -248,6 +250,7 @@ async def list_single_component_data( resampling_period=resampling_period, include_states=include_states, include_bounds=include_bounds, + stream_as_receiver=stream_as_receiver, ): for entry in batch: yield entry @@ -263,6 +266,7 @@ async def list_microgrid_components_data( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, + stream_as_receiver: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over the data for multiple microgrids and components. @@ -275,6 +279,7 @@ async def list_microgrid_components_data( resampling_period: The period for resampling the data. include_states: Whether to include the state data. include_bounds: Whether to include the bound data. + stream_as_receiver: Whether to return a receiver instead of using an iterator. Yields: A named tuple with the following fields: @@ -292,6 +297,7 @@ async def list_microgrid_components_data( resampling_period=resampling_period, include_states=include_states, include_bounds=include_bounds, + stream_as_receiver=stream_as_receiver, ): for entry in batch: yield entry @@ -308,6 +314,7 @@ async def _list_microgrid_components_data_batch( resampling_period: timedelta | None, include_states: bool = False, include_bounds: bool = False, + stream_as_receiver: bool = False, ) -> AsyncIterator[ComponentsDataBatch]: """Iterate over the component data batches in the stream using GrpcStreamBroadcaster. @@ -319,9 +326,11 @@ async def _list_microgrid_components_data_batch( resampling_period: The period for resampling the data. include_states: Whether to include the state data. include_bounds: Whether to include the bound data. + stream_as_receiver: Whether to return a receiver instead of using an iterator. - Yields: - A ComponentsDataBatch object of microgrid components data. + Returns: + AsyncIterator[ComponentsDataBatch]: An asynchronous iterator yielding + `ComponentsDataBatch` objects. """ microgrid_components_pb = [ PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids) @@ -393,7 +402,7 @@ async def stream_method() -> ( async for response in cast( AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse], call_iterator, - ): + ): # call_iterator: yield response broadcaster = GrpcStreamBroadcaster( @@ -404,8 +413,15 @@ async def stream_method() -> ( ) receiver = broadcaster.new_receiver() - async for data in receiver: - yield data + + if stream_as_receiver: + # Yield from receiver instead of using an iterator + async for data in receiver: + yield data + else: + # Default: Use the original iterator approach + async for data in broadcaster.new_receiver(): + yield data async def receive_aggregated_data( self, @@ -416,6 +432,7 @@ async def receive_aggregated_data( start: datetime | None, end: datetime | None, resampling_period: timedelta, + stream_as_receiver: bool = False, ) -> AsyncIterator[MetricSample]: """Iterate over aggregated data for a single metric using GrpcStreamBroadcaster. @@ -427,9 +444,11 @@ async def receive_aggregated_data( start: start datetime, if None, the earliest available data will be used end: end datetime, if None starts streaming indefinitely from start resampling_period: The period for resampling the data. + stream_as_receiver: Whether to return a receiver instead of using an iterator. - Yields: - An iterator over the aggregated metric samples. + Returns: + AsyncIterator[MetricSample]: An asynchronous iterator yielding + aggregated metric samples. Raises: ValueError: If the resampling_period is not provided. @@ -486,5 +505,12 @@ async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]: ) receiver = broadcaster.new_receiver() - async for data in receiver: - yield data + + if stream_as_receiver: + # Yield from receiver instead of using an iterator + async for data in receiver: + yield data + else: + # Default: Use the original iterator approach + async for data in broadcaster.new_receiver(): + yield data