Skip to content

Commit 4744975

Browse files
committed
Add method to receive aggregated data
This adds support for the API endpoint to query aggregated data based on user-defined formulae. Latter can be passed by the user as strings (e.g. `#CID1 + #CID2`) as documented in the API specs. Signed-off-by: cwasicki <[email protected]>
1 parent 7c45c9b commit 4744975

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

src/frequenz/client/reporting/_client.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,19 @@
1515
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
1616
MicrogridComponentIDs as PBMicrogridComponentIDs,
1717
)
18+
from frequenz.api.reporting.v1.reporting_pb2 import (
19+
AggregationConfig as PBAggregationConfig,
20+
)
1821
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
1922
from frequenz.api.reporting.v1.reporting_pb2 import (
2023
MetricConnections as PBMetricConnections,
2124
)
25+
from frequenz.api.reporting.v1.reporting_pb2 import (
26+
ReceiveAggregatedMicrogridComponentsDataStreamRequest as PBAggregatedStreamRequest,
27+
)
28+
from frequenz.api.reporting.v1.reporting_pb2 import (
29+
ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse,
30+
)
2231
from frequenz.api.reporting.v1.reporting_pb2 import (
2332
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
2433
)
@@ -145,6 +154,24 @@ def __iter__(self) -> Iterator[MetricSample]:
145154
)
146155

147156

157+
@dataclass(frozen=True)
158+
class AggregatedMetric:
159+
"""An aggregated metric sample returned by the Reporting service."""
160+
161+
_data_pb: PBAggregatedStreamResponse
162+
"""The underlying protobuf message."""
163+
164+
def sample(self) -> MetricSample:
165+
"""Return the aggregated metric sample."""
166+
return MetricSample(
167+
timestamp=self._data_pb.sample.sampled_at.ToDatetime(),
168+
microgrid_id=self._data_pb.aggregation_config.microgrid_id,
169+
component_id=self._data_pb.aggregation_config.aggregation_formula,
170+
metric=self._data_pb.aggregation_config.metric,
171+
value=self._data_pb.sample.sample.value,
172+
)
173+
174+
148175
class ReportingApiClient(BaseApiClient[ReportingStub]):
149176
"""A client for the Reporting service."""
150177

@@ -353,3 +380,78 @@ def dt2ts(dt: datetime) -> PBTimestamp:
353380
except grpcaio.AioRpcError as e:
354381
print(f"RPC failed: {e}")
355382
return
383+
384+
async def receive_aggregated_data(
385+
self,
386+
*,
387+
microgrid_id: int,
388+
metric: Metric,
389+
aggregation_formula: str,
390+
start: datetime | None,
391+
end: datetime | None,
392+
resampling_period: timedelta,
393+
) -> AsyncIterator[MetricSample]:
394+
"""Iterate over aggregated data for a single metric.
395+
396+
For now this only supports a single metric and aggregation formula.
397+
398+
Args:
399+
microgrid_id: The microgrid ID.
400+
metric: The metric name.
401+
aggregation_formula: The aggregation formula.
402+
start: start datetime, if None, the earliest available data will be used
403+
end: end datetime, if None starts streaming indefinitely from start
404+
resampling_period: The period for resampling the data.
405+
406+
Yields:
407+
An iterator over the aggregated metric samples.
408+
409+
Raises:
410+
ValueError: If the resampling_period is not provided.
411+
"""
412+
if not resampling_period:
413+
raise ValueError("resampling_period must be provided")
414+
415+
aggregation_config = PBAggregationConfig(
416+
microgrid_id=microgrid_id,
417+
metric=metric.to_proto(),
418+
aggregation_formula=aggregation_formula,
419+
)
420+
421+
def dt2ts(dt: datetime) -> PBTimestamp:
422+
ts = PBTimestamp()
423+
ts.FromDatetime(dt)
424+
return ts
425+
426+
time_filter = PBTimeFilter(
427+
start=dt2ts(start) if start else None,
428+
end=dt2ts(end) if end else None,
429+
)
430+
431+
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
432+
time_filter=time_filter,
433+
resampling_options=PBResamplingOptions(
434+
resolution=round(resampling_period.total_seconds())
435+
),
436+
)
437+
438+
request = PBAggregatedStreamRequest(
439+
aggregation_configs=[aggregation_config],
440+
filter=stream_filter,
441+
)
442+
443+
try:
444+
stream = cast(
445+
AsyncIterator[PBAggregatedStreamResponse],
446+
self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
447+
request, metadata=self._metadata
448+
),
449+
)
450+
async for response in stream:
451+
if not response:
452+
break
453+
yield AggregatedMetric(response).sample()
454+
455+
except grpcaio.AioRpcError as e:
456+
print(f"RPC failed: {e}")
457+
return

0 commit comments

Comments
 (0)