diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index f8640ca..55bd53f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -9,6 +9,8 @@ ## New Features +* Adds new method `receive_aggregated_data` to receive microgrid component data + aggregated by user-defined formulae. ## Bug Fixes diff --git a/src/frequenz/client/reporting/_client.py b/src/frequenz/client/reporting/_client.py index 270e641..a90ca3c 100644 --- a/src/frequenz/client/reporting/_client.py +++ b/src/frequenz/client/reporting/_client.py @@ -15,10 +15,19 @@ from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( MicrogridComponentIDs as PBMicrogridComponentIDs, ) +from frequenz.api.reporting.v1.reporting_pb2 import ( + AggregationConfig as PBAggregationConfig, +) from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions from frequenz.api.reporting.v1.reporting_pb2 import ( MetricConnections as PBMetricConnections, ) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveAggregatedMicrogridComponentsDataStreamRequest as PBAggregatedStreamRequest, +) +from frequenz.api.reporting.v1.reporting_pb2 import ( + ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse, +) from frequenz.api.reporting.v1.reporting_pb2 import ( ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest, ) @@ -145,6 +154,24 @@ def __iter__(self) -> Iterator[MetricSample]: ) +@dataclass(frozen=True) +class AggregatedMetric: + """An aggregated metric sample returned by the Reporting service.""" + + _data_pb: PBAggregatedStreamResponse + """The underlying protobuf message.""" + + def sample(self) -> MetricSample: + """Return the aggregated metric sample.""" + return MetricSample( + timestamp=self._data_pb.sample.sampled_at.ToDatetime(), + microgrid_id=self._data_pb.aggregation_config.microgrid_id, + component_id=self._data_pb.aggregation_config.aggregation_formula, + metric=self._data_pb.aggregation_config.metric, + value=self._data_pb.sample.sample.value, + ) + + class ReportingApiClient(BaseApiClient[ReportingStub]): """A client for the Reporting service.""" @@ -353,3 +380,78 @@ def dt2ts(dt: datetime) -> PBTimestamp: except grpcaio.AioRpcError as e: print(f"RPC failed: {e}") return + + async def receive_aggregated_data( + self, + *, + microgrid_id: int, + metric: Metric, + aggregation_formula: str, + start: datetime | None, + end: datetime | None, + resampling_period: timedelta, + ) -> AsyncIterator[MetricSample]: + """Iterate over aggregated data for a single metric. + + For now this only supports a single metric and aggregation formula. + + Args: + microgrid_id: The microgrid ID. + metric: The metric name. + aggregation_formula: The aggregation formula. + start: start datetime, if None, the earliest available data will be used + end: end datetime, if None starts streaming indefinitely from start + resampling_period: The period for resampling the data. + + Yields: + An iterator over the aggregated metric samples. + + Raises: + ValueError: If the resampling_period is not provided. + """ + if not resampling_period: + raise ValueError("resampling_period must be provided") + + aggregation_config = PBAggregationConfig( + microgrid_id=microgrid_id, + metric=metric.to_proto(), + aggregation_formula=aggregation_formula, + ) + + def dt2ts(dt: datetime) -> PBTimestamp: + ts = PBTimestamp() + ts.FromDatetime(dt) + return ts + + time_filter = PBTimeFilter( + start=dt2ts(start) if start else None, + end=dt2ts(end) if end else None, + ) + + stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter( + time_filter=time_filter, + resampling_options=PBResamplingOptions( + resolution=round(resampling_period.total_seconds()) + ), + ) + + request = PBAggregatedStreamRequest( + aggregation_configs=[aggregation_config], + filter=stream_filter, + ) + + try: + stream = cast( + AsyncIterator[PBAggregatedStreamResponse], + self.stub.ReceiveAggregatedMicrogridComponentsDataStream( + request, metadata=self._metadata + ), + ) + async for response in stream: + if not response: + break + yield AggregatedMetric(response).sample() + + except grpcaio.AioRpcError as e: + print(f"RPC failed: {e}") + return