Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

## New Features

* Adds new method `receive_aggregated_data` to receive microgrid component data
aggregated by user-defined formulae.

## Bug Fixes

Expand Down
102 changes: 102 additions & 0 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,19 @@
from frequenz.api.common.v1.microgrid.microgrid_pb2 import (
MicrogridComponentIDs as PBMicrogridComponentIDs,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
AggregationConfig as PBAggregationConfig,
)
from frequenz.api.reporting.v1.reporting_pb2 import IncludeOptions as PBIncludeOptions
from frequenz.api.reporting.v1.reporting_pb2 import (
MetricConnections as PBMetricConnections,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveAggregatedMicrogridComponentsDataStreamRequest as PBAggregatedStreamRequest,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveAggregatedMicrogridComponentsDataStreamResponse as PBAggregatedStreamResponse,
)
from frequenz.api.reporting.v1.reporting_pb2 import (
ReceiveMicrogridComponentsDataStreamRequest as PBReceiveMicrogridComponentsDataStreamRequest,
)
Expand Down Expand Up @@ -145,6 +154,24 @@ def __iter__(self) -> Iterator[MetricSample]:
)


@dataclass(frozen=True)
class AggregatedMetric:
"""An aggregated metric sample returned by the Reporting service."""

_data_pb: PBAggregatedStreamResponse
"""The underlying protobuf message."""

def sample(self) -> MetricSample:
"""Return the aggregated metric sample."""
return MetricSample(
timestamp=self._data_pb.sample.sampled_at.ToDatetime(),
microgrid_id=self._data_pb.aggregation_config.microgrid_id,
component_id=self._data_pb.aggregation_config.aggregation_formula,
metric=self._data_pb.aggregation_config.metric,
value=self._data_pb.sample.sample.value,
)


class ReportingApiClient(BaseApiClient[ReportingStub]):
"""A client for the Reporting service."""

Expand Down Expand Up @@ -353,3 +380,78 @@ def dt2ts(dt: datetime) -> PBTimestamp:
except grpcaio.AioRpcError as e:
print(f"RPC failed: {e}")
return

async def receive_aggregated_data(
self,
*,
microgrid_id: int,
metric: Metric,
aggregation_formula: str,
start: datetime | None,
end: datetime | None,
resampling_period: timedelta,
) -> AsyncIterator[MetricSample]:
"""Iterate over aggregated data for a single metric.

For now this only supports a single metric and aggregation formula.

Args:
microgrid_id: The microgrid ID.
metric: The metric name.
aggregation_formula: The aggregation formula.
start: start datetime, if None, the earliest available data will be used
end: end datetime, if None starts streaming indefinitely from start
resampling_period: The period for resampling the data.

Yields:
An iterator over the aggregated metric samples.

Raises:
ValueError: If the resampling_period is not provided.
"""
if not resampling_period:
raise ValueError("resampling_period must be provided")

aggregation_config = PBAggregationConfig(
microgrid_id=microgrid_id,
metric=metric.to_proto(),
aggregation_formula=aggregation_formula,
)

def dt2ts(dt: datetime) -> PBTimestamp:
ts = PBTimestamp()
ts.FromDatetime(dt)
return ts

time_filter = PBTimeFilter(
start=dt2ts(start) if start else None,
end=dt2ts(end) if end else None,
)

stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(
resolution=round(resampling_period.total_seconds())
),
)

request = PBAggregatedStreamRequest(
aggregation_configs=[aggregation_config],
filter=stream_filter,
)

try:
stream = cast(
AsyncIterator[PBAggregatedStreamResponse],
self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
request, metadata=self._metadata
),
)
async for response in stream:
if not response:
break
yield AggregatedMetric(response).sample()

except grpcaio.AioRpcError as e:
print(f"RPC failed: {e}")
return
Loading