Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 80 additions & 59 deletions src/frequenz/client/reporting/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Client for requests to the Reporting API."""

from collections.abc import AsyncIterable, AsyncIterator
from collections.abc import AsyncIterable
from datetime import datetime, timedelta
from typing import cast

Expand Down Expand Up @@ -117,6 +117,17 @@ def __init__(
PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
],
] = {}
self._aggregated_data_streams: dict[
tuple[
int, # microgrid_id
str, # metric name
str, # aggregation_formula
float | None, # start_time timestamp
float | None, # end_time timestamp
int | None, # resampling period in seconds
],
GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample],
] = {}

self._metadata = (("key", key),) if key else ()

Expand Down Expand Up @@ -219,7 +230,7 @@ def _receive_microgrid_components_data_batch(
include_states: bool = False,
include_bounds: bool = False,
) -> Receiver[ComponentsDataBatch]:
"""Return a GrpcStreamBroadcaster for microgrid component data."""
"""Return a Receiver for the microgrid component data stream."""
stream_key = (
tuple((mid, tuple(cids)) for mid, cids in microgrid_components),
tuple(metric.name for metric in metrics),
Expand Down Expand Up @@ -294,17 +305,16 @@ def transform_response(
) -> ComponentsDataBatch:
return ComponentsDataBatch(response)

async def stream_method() -> (
def stream_method() -> (
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse]
):
call_iterator = self.stub.ReceiveMicrogridComponentsDataStream(
request, metadata=self._metadata
)
async for response in cast(
return cast(
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
call_iterator,
):
yield response
)

self._components_data_streams[stream_key] = GrpcStreamBroadcaster(
stream_name="microgrid-components-data-stream",
Expand Down Expand Up @@ -339,7 +349,7 @@ def receive_single_sensor_data(
include_states: Whether to include the state data.

Returns:
A receiver of `MetricSample`s.
A receiver of `MetricSample`s.
"""
receiver = self._receive_microgrid_sensors_data_batch(
microgrid_sensors=[(microgrid_id, [sensor_id])],
Expand Down Expand Up @@ -409,7 +419,7 @@ def _receive_microgrid_sensors_data_batch(
include_states: Whether to include the state data.

Returns:
A GrpcStreamBroadcaster that can be used to receive sensor data batches.
A receiver of `SensorsDataBatch`s.
"""
stream_key = (
tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors),
Expand Down Expand Up @@ -480,17 +490,16 @@ def transform_response(
) -> SensorsDataBatch:
return SensorsDataBatch(response)

async def stream_method() -> (
def stream_method() -> (
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse]
):
call_iterator = self.stub.ReceiveMicrogridSensorsDataStream(
request, metadata=self._metadata
)
async for response in cast(
return cast(
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse],
call_iterator,
):
yield response
)

self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster(
stream_name="microgrid-sensors-data-stream",
Expand All @@ -500,7 +509,7 @@ async def stream_method() -> (

return self._sensors_data_streams[stream_key].new_receiver()

async def receive_aggregated_data(
def receive_aggregated_data(
self,
*,
microgrid_id: int,
Expand All @@ -509,7 +518,7 @@ async def receive_aggregated_data(
start_time: datetime | None,
end_time: datetime | None,
resampling_period: timedelta,
) -> AsyncIterator[MetricSample]:
) -> Receiver[MetricSample]:
"""Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.

For now this only supports a single metric and aggregation formula.
Expand All @@ -521,63 +530,75 @@ async def receive_aggregated_data(
end_time: end datetime, if None starts streaming indefinitely from start_time
resampling_period: The period for resampling the data.

Yields:
An iterator over the aggregated metric samples.
Returns:
A receiver of `MetricSample`s.

Raises:
ValueError: If the resampling_period is not provided.
"""
if not resampling_period:
raise ValueError("resampling_period must be provided")

aggregation_config = PBAggregationConfig(
microgrid_id=microgrid_id,
metric=metric.to_proto(),
aggregation_formula=aggregation_formula,
stream_key = (
microgrid_id,
metric.name,
aggregation_formula,
start_time.timestamp() if start_time else None,
end_time.timestamp() if end_time else None,
round(resampling_period.total_seconds()) if resampling_period else None,
)
if (
stream_key not in self._aggregated_data_streams
or not self._aggregated_data_streams[stream_key].is_running
):

def dt2ts(dt: datetime) -> PBTimestamp:
ts = PBTimestamp()
ts.FromDatetime(dt)
return ts
if not resampling_period:
raise ValueError("resampling_period must be provided")

time_filter = PBTimeFilter(
start=dt2ts(start_time) if start_time else None,
end=dt2ts(end_time) if end_time else None,
)
aggregation_config = PBAggregationConfig(
microgrid_id=microgrid_id,
metric=metric.to_proto(),
aggregation_formula=aggregation_formula,
)

stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(
resolution=round(resampling_period.total_seconds())
),
)
def dt2ts(dt: datetime) -> PBTimestamp:
ts = PBTimestamp()
ts.FromDatetime(dt)
return ts

request = PBAggregatedStreamRequest(
aggregation_configs=[aggregation_config],
filter=stream_filter,
)
time_filter = PBTimeFilter(
start=dt2ts(start_time) if start_time else None,
end=dt2ts(end_time) if end_time else None,
)

def transform_response(response: PBAggregatedStreamResponse) -> MetricSample:
return AggregatedMetric(response).sample()
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
time_filter=time_filter,
resampling_options=PBResamplingOptions(
resolution=round(resampling_period.total_seconds())
),
)

async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
call_iterator = self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
request, metadata=self._metadata
request = PBAggregatedStreamRequest(
aggregation_configs=[aggregation_config],
filter=stream_filter,
)

async for response in cast(
AsyncIterable[PBAggregatedStreamResponse], call_iterator
):
yield response
def transform_response(
response: PBAggregatedStreamResponse,
) -> MetricSample:
return AggregatedMetric(response).sample()

def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
call_iterator = (
self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
request, metadata=self._metadata
)
)

broadcaster = GrpcStreamBroadcaster(
stream_name="aggregated-microgrid-data-stream",
stream_method=stream_method,
transform=transform_response,
retry_strategy=None,
)
return cast(AsyncIterable[PBAggregatedStreamResponse], call_iterator)

self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster(
stream_name="aggregated-microgrid-data-stream",
stream_method=stream_method,
transform=transform_response,
retry_strategy=None,
)

receiver = broadcaster.new_receiver()
async for data in receiver:
yield data
return self._aggregated_data_streams[stream_key].new_receiver()
Loading