|
15 | 15 | from frequenz.api.common.v1.microgrid.microgrid_pb2 import ( |
16 | 16 | MicrogridComponentIDs as PBMicrogridComponentIDs, |
17 | 17 | ) |
| 18 | +from frequenz.api.reporting.v1.reporting_pb2 import ( |
| 19 | + AggregationConfig as PBAggregationConfig, |
| 20 | +) |
18 | 21 | from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions |
19 | 22 | from frequenz.api.reporting.v1.reporting_pb2 import ( |
20 | 23 | MetricConnections as PBMetricConnections, |
21 | 24 | ) |
| 25 | +from frequenz.api.reporting.v1.reporting_pb2 import ( |
| 26 | + ReceiveAggregatedMicrogridComponentsDataStreamRequest as PBAggregatedStreamRequest, |
| 27 | +) |
| 28 | +from frequenz.api.reporting.v1.reporting_pb2 import ( |
| 29 | + ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse, |
| 30 | +) |
22 | 31 | from frequenz.api.reporting.v1.reporting_pb2 import ( |
23 | 32 | ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest, |
24 | 33 | ) |
@@ -145,6 +154,24 @@ def __iter__(self) -> Iterator[MetricSample]: |
145 | 154 | ) |
146 | 155 |
|
147 | 156 |
|
| 157 | +@dataclass(frozen=True) |
| 158 | +class AggregatedMetric: |
| 159 | + """An aggregated metric sample returned by the Reporting service.""" |
| 160 | + |
| 161 | + _data_pb: PBAggregatedStreamResponse |
| 162 | + """The underlying protobuf message.""" |
| 163 | + |
| 164 | + def sample(self) -> MetricSample: |
| 165 | + """Return the aggregated metric sample.""" |
| 166 | + return MetricSample( |
| 167 | + timestamp=self._data_pb.sample.sampled_at.ToDatetime(), |
| 168 | + microgrid_id=self._data_pb.aggregation_config.microgrid_id, |
| 169 | + component_id=self._data_pb.aggregation_config.aggregation_formula, |
| 170 | + metric=self._data_pb.aggregation_config.metric, |
| 171 | + value=self._data_pb.sample.sample.value, |
| 172 | + ) |
| 173 | + |
| 174 | + |
148 | 175 | class ReportingApiClient(BaseApiClient[ReportingStub]): |
149 | 176 | """A client for the Reporting service.""" |
150 | 177 |
|
@@ -353,3 +380,78 @@ def dt2ts(dt: datetime) -> PBTimestamp: |
353 | 380 | except grpcaio.AioRpcError as e: |
354 | 381 | print(f"RPC failed: {e}") |
355 | 382 | return |
| 383 | + |
| 384 | + async def receive_aggregated_data( |
| 385 | + self, |
| 386 | + *, |
| 387 | + microgrid_id: int, |
| 388 | + metric: Metric, |
| 389 | + aggregation_formula: str, |
| 390 | + start: datetime | None, |
| 391 | + end: datetime | None, |
| 392 | + resampling_period: timedelta, |
| 393 | + ) -> AsyncIterator[MetricSample]: |
| 394 | + """Iterate over aggregated data for a single metric. |
| 395 | +
|
| 396 | + For now this only supports a single metric and aggregation formula. |
| 397 | +
|
| 398 | + Args: |
| 399 | + microgrid_id: The microgrid ID. |
| 400 | + metric: The metric name. |
| 401 | + aggregation_formula: The aggregation formula. |
| 402 | + start: start datetime, if None, the earliest available data will be used |
| 403 | + end: end datetime, if None starts streaming indefinitely from start |
| 404 | + resampling_period: The period for resampling the data. |
| 405 | +
|
| 406 | + Yields: |
| 407 | + An iterator over the aggregated metric samples. |
| 408 | +
|
| 409 | + Raises: |
| 410 | + ValueError: If the resampling_period is not provided. |
| 411 | + """ |
| 412 | + if not resampling_period: |
| 413 | + raise ValueError("resampling_period must be provided") |
| 414 | + |
| 415 | + aggregation_config = PBAggregationConfig( |
| 416 | + microgrid_id=microgrid_id, |
| 417 | + metric=metric.to_proto(), |
| 418 | + aggregation_formula=aggregation_formula, |
| 419 | + ) |
| 420 | + |
| 421 | + def dt2ts(dt: datetime) -> PBTimestamp: |
| 422 | + ts = PBTimestamp() |
| 423 | + ts.FromDatetime(dt) |
| 424 | + return ts |
| 425 | + |
| 426 | + time_filter = PBTimeFilter( |
| 427 | + start=dt2ts(start) if start else None, |
| 428 | + end=dt2ts(end) if end else None, |
| 429 | + ) |
| 430 | + |
| 431 | + stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter( |
| 432 | + time_filter=time_filter, |
| 433 | + resampling_options=PBResamplingOptions( |
| 434 | + resolution=round(resampling_period.total_seconds()) |
| 435 | + ), |
| 436 | + ) |
| 437 | + |
| 438 | + request = PBAggregatedStreamRequest( |
| 439 | + aggregation_configs=[aggregation_config], |
| 440 | + filter=stream_filter, |
| 441 | + ) |
| 442 | + |
| 443 | + try: |
| 444 | + stream = cast( |
| 445 | + AsyncIterator[PBAggregatedStreamResponse], |
| 446 | + self.stub.ReceiveAggregatedMicrogridComponentsDataStream( |
| 447 | + request, metadata=self._metadata |
| 448 | + ), |
| 449 | + ) |
| 450 | + async for response in stream: |
| 451 | + if not response: |
| 452 | + break |
| 453 | + yield AggregatedMetric(response).sample() |
| 454 | + |
| 455 | + except grpcaio.AioRpcError as e: |
| 456 | + print(f"RPC failed: {e}") |
| 457 | + return |
0 commit comments