From 8b2af8b31ee4177fe16f7a123496d295346b46a7 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 4 Jun 2025 14:20:00 +0200 Subject: [PATCH 1/4] Reuse `GrpcStreamBroadcaster`s for aggregated data streams Signed-off-by: Sahas Subramanian --- src/frequenz/client/reporting/_client.py | 120 +++++++++++------- src/frequenz/client/reporting/cli/__main__.py | 2 +- 2 files changed, 74 insertions(+), 48 deletions(-) diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 5c19037..d338870 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -3,7 +3,7 @@ """Client for requests to the Reporting API.""" -from collections.abc import AsyncIterable, AsyncIterator +from collections.abc import AsyncIterable from datetime import datetime, timedelta from typing import cast @@ -117,6 +117,17 @@ def __init__( PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch ], ] = {} + self._aggregated_data_streams: dict[ + tuple[ + int, # microgrid_id + str, # metric name + str, # aggregation_formula + float | None, # start_time timestamp + float | None, # end_time timestamp + int | None, # resampling period in seconds + ], + GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample], + ] = {} self._metadata = (("key", key),) if key else () @@ -509,7 +520,7 @@ async def receive_aggregated_data( start_time: datetime | None, end_time: datetime | None, resampling_period: timedelta, - ) -> AsyncIterator[MetricSample]: + ) -> Receiver[MetricSample]: """Iterate over aggregated data for a single metric using GrpcStreamBroadcaster. For now this only supports a single metric and aggregation formula. @@ -521,63 +532,78 @@ async def receive_aggregated_data( end_time: end datetime, if None starts streaming indefinitely from start_time resampling_period: The period for resampling the data. - Yields: - An iterator over the aggregated metric samples. + Returns: + A receiver of `MetricSample`s. Raises: ValueError: If the resampling_period is not provided. """ - if not resampling_period: - raise ValueError("resampling_period must be provided") - - aggregation_config = PBAggregationConfig( - microgrid_id=microgrid_id, - metric=metric.to_proto(), - aggregation_formula=aggregation_formula, + stream_key = ( + microgrid_id, + metric.name, + aggregation_formula, + start_time.timestamp() if start_time else None, + end_time.timestamp() if end_time else None, + round(resampling_period.total_seconds()) if resampling_period else None, ) + if ( + stream_key not in self._aggregated_data_streams + or not self._aggregated_data_streams[stream_key].is_running + ): - def dt2ts(dt: datetime) -> PBTimestamp: - ts = PBTimestamp() - ts.FromDatetime(dt) - return ts + if not resampling_period: + raise ValueError("resampling_period must be provided") - time_filter = PBTimeFilter( - start=dt2ts(start_time) if start_time else None, - end=dt2ts(end_time) if end_time else None, - ) + aggregation_config = PBAggregationConfig( + microgrid_id=microgrid_id, + metric=metric.to_proto(), + aggregation_formula=aggregation_formula, + ) - stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter( - time_filter=time_filter, - resampling_options=PBResamplingOptions( - resolution=round(resampling_period.total_seconds()) - ), - ) + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts - request = PBAggregatedStreamRequest( - aggregation_configs=[aggregation_config], - filter=stream_filter, - ) + time_filter = PBTimeFilter( + start=dt2ts(start_time) if start_time else None, + end=dt2ts(end_time) if end_time else None, + ) - def transform_response(response: PBAggregatedStreamResponse) -> MetricSample: - return AggregatedMetric(response).sample() + stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter( + time_filter=time_filter, + resampling_options=PBResamplingOptions( + resolution=round(resampling_period.total_seconds()) + ), + ) - async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]: - call_iterator = self.stub.ReceiveAggregatedMicrogridComponentsDataStream( - request, metadata=self._metadata + request = PBAggregatedStreamRequest( + aggregation_configs=[aggregation_config], + filter=stream_filter, ) - async for response in cast( - AsyncIterable[PBAggregatedStreamResponse], call_iterator - ): - yield response + def transform_response( + response: PBAggregatedStreamResponse, + ) -> MetricSample: + return AggregatedMetric(response).sample() + + async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]: + call_iterator = ( + self.stub.ReceiveAggregatedMicrogridComponentsDataStream( + request, metadata=self._metadata + ) + ) - broadcaster = GrpcStreamBroadcaster( - stream_name="aggregated-microgrid-data-stream", - stream_method=stream_method, - transform=transform_response, - retry_strategy=None, - ) + async for response in cast( + AsyncIterable[PBAggregatedStreamResponse], call_iterator + ): + yield response + + self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster( + stream_name="aggregated-microgrid-data-stream", + stream_method=stream_method, + transform=transform_response, + retry_strategy=None, + ) - receiver = broadcaster.new_receiver() - async for data in receiver: - yield data + return self._aggregated_data_streams[stream_key].new_receiver() diff --git a/src/frequenz/client/reporting/cli/__main__.py b/src/frequenz/client/reporting/cli/__main__.py index 88f94ae..b2dc56b 100644 --- a/src/frequenz/client/reporting/cli/__main__.py +++ b/src/frequenz/client/reporting/cli/__main__.py @@ -171,7 +171,7 @@ async def data_iter() -> AsyncIterator[MetricSample]: for formula in formulas: assert resampling_period is not None for metric in metrics: - async for sample in client.receive_aggregated_data( + async for sample in await client.receive_aggregated_data( microgrid_id=microgrid_id, metric=metric, aggregation_formula=formula, From d0fd5d6268de4a28a35331ca45ee7ecdf16caae6 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 4 Jun 2025 14:33:30 +0200 Subject: [PATCH 2/4] Remove redundant `async` loops and declarations Signed-off-by: Sahas Subramanian --- src/frequenz/client/reporting/_client.py | 23 ++++++++----------- src/frequenz/client/reporting/cli/__main__.py | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index d338870..6515256 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -305,17 +305,16 @@ def transform_response( ) -> ComponentsDataBatch: return ComponentsDataBatch(response) - async def stream_method() -> ( + def stream_method() -> ( AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse] ): call_iterator = self.stub.ReceiveMicrogridComponentsDataStream( request, metadata=self._metadata ) - async for response in cast( + return cast( AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse], call_iterator, - ): - yield response + ) self._components_data_streams[stream_key] = GrpcStreamBroadcaster( stream_name="microgrid-components-data-stream", @@ -491,17 +490,16 @@ def transform_response( ) -> SensorsDataBatch: return SensorsDataBatch(response) - async def stream_method() -> ( + def stream_method() -> ( AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse] ): call_iterator = self.stub.ReceiveMicrogridSensorsDataStream( request, metadata=self._metadata ) - async for response in cast( + return cast( AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse], call_iterator, - ): - yield response + ) self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster( stream_name="microgrid-sensors-data-stream", @@ -511,7 +509,7 @@ async def stream_method() -> ( return self._sensors_data_streams[stream_key].new_receiver() - async def receive_aggregated_data( + def receive_aggregated_data( self, *, microgrid_id: int, @@ -587,17 +585,14 @@ def transform_response( ) -> MetricSample: return AggregatedMetric(response).sample() - async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]: + def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]: call_iterator = ( self.stub.ReceiveAggregatedMicrogridComponentsDataStream( request, metadata=self._metadata ) ) - async for response in cast( - AsyncIterable[PBAggregatedStreamResponse], call_iterator - ): - yield response + return cast(AsyncIterable[PBAggregatedStreamResponse], call_iterator) self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster( stream_name="aggregated-microgrid-data-stream", diff --git a/src/frequenz/client/reporting/cli/__main__.py b/src/frequenz/client/reporting/cli/__main__.py index b2dc56b..88f94ae 100644 --- a/src/frequenz/client/reporting/cli/__main__.py +++ b/src/frequenz/client/reporting/cli/__main__.py @@ -171,7 +171,7 @@ async def data_iter() -> AsyncIterator[MetricSample]: for formula in formulas: assert resampling_period is not None for metric in metrics: - async for sample in await client.receive_aggregated_data( + async for sample in client.receive_aggregated_data( microgrid_id=microgrid_id, metric=metric, aggregation_formula=formula, From c4614b2d0febd5d3fa414529be651fe98741d70d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 10 Jun 2025 11:15:02 +0200 Subject: [PATCH 3/4] Fix docstring typos Signed-off-by: Sahas Subramanian --- src/frequenz/client/reporting/_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 6515256..36537b9 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -230,7 +230,7 @@ def _receive_microgrid_components_data_batch( include_states: bool = False, include_bounds: bool = False, ) -> Receiver[ComponentsDataBatch]: - """Return a GrpcStreamBroadcaster for microgrid component data.""" + """Return a Receiver for the microgrid component data stream.""" stream_key = ( tuple((mid, tuple(cids)) for mid, cids in microgrid_components), tuple(metric.name for metric in metrics), @@ -349,7 +349,7 @@ def receive_single_sensor_data( include_states: Whether to include the state data. Returns: - A receiver of `MetricSample`s. + A receiver of `MetricSample`s. """ receiver = self._receive_microgrid_sensors_data_batch( microgrid_sensors=[(microgrid_id, [sensor_id])], @@ -419,7 +419,7 @@ def _receive_microgrid_sensors_data_batch( include_states: Whether to include the state data. Returns: - A GrpcStreamBroadcaster that can be used to receive sensor data batches. + A receiver of `SensorsDataBatch`s. """ stream_key = ( tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors), From 8c2c93b50cb8dd15650b169fb93aedcc0d6350bf Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 10 Jun 2025 11:30:44 +0200 Subject: [PATCH 4/4] Update release notes Also the note about some functions becoming non-async, is moved to the `Upgrading` section, because it is a breaking change. Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index adb9fa1..c426146 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,8 @@ ## Upgrading - +- The `receive_aggregated_data` method now returns a `Receiver`, which provides more flexibility that the previous `AsyncIterator`. +- The `receive_microgrid_sensors_data`, `receive_single_sensor_data` and `receive_aggregated_data` methods are no-longer `async`. ## New Features @@ -14,4 +15,4 @@ ## Bug Fixes -* Take out 'async' from 'receive_microgrid_sensors_data' and 'receive_single_sensor_data'. +