Skip to content

Commit 8b2af8b

Browse files
committed
Reuse GrpcStreamBroadcasters for aggregated data streams
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent ad031dc commit 8b2af8b

File tree

2 files changed

+74
-48
lines changed

2 files changed

+74
-48
lines changed

src/frequenz/client/reporting/_client.py

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
"""Client for requests to the Reporting API."""
55

6-
from collections.abc import AsyncIterable, AsyncIterator
6+
from collections.abc import AsyncIterable
77
from datetime import datetime, timedelta
88
from typing import cast
99

@@ -117,6 +117,17 @@ def __init__(
117117
PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
118118
],
119119
] = {}
120+
self._aggregated_data_streams: dict[
121+
tuple[
122+
int, # microgrid_id
123+
str, # metric name
124+
str, # aggregation_formula
125+
float | None, # start_time timestamp
126+
float | None, # end_time timestamp
127+
int | None, # resampling period in seconds
128+
],
129+
GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample],
130+
] = {}
120131

121132
self._metadata = (("key", key),) if key else ()
122133

@@ -509,7 +520,7 @@ async def receive_aggregated_data(
509520
start_time: datetime | None,
510521
end_time: datetime | None,
511522
resampling_period: timedelta,
512-
) -> AsyncIterator[MetricSample]:
523+
) -> Receiver[MetricSample]:
513524
"""Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.
514525
515526
For now this only supports a single metric and aggregation formula.
@@ -521,63 +532,78 @@ async def receive_aggregated_data(
521532
end_time: end datetime, if None starts streaming indefinitely from start_time
522533
resampling_period: The period for resampling the data.
523534
524-
Yields:
525-
An iterator over the aggregated metric samples.
535+
Returns:
536+
A receiver of `MetricSample`s.
526537
527538
Raises:
528539
ValueError: If the resampling_period is not provided.
529540
"""
530-
if not resampling_period:
531-
raise ValueError("resampling_period must be provided")
532-
533-
aggregation_config = PBAggregationConfig(
534-
microgrid_id=microgrid_id,
535-
metric=metric.to_proto(),
536-
aggregation_formula=aggregation_formula,
541+
stream_key = (
542+
microgrid_id,
543+
metric.name,
544+
aggregation_formula,
545+
start_time.timestamp() if start_time else None,
546+
end_time.timestamp() if end_time else None,
547+
round(resampling_period.total_seconds()) if resampling_period else None,
537548
)
549+
if (
550+
stream_key not in self._aggregated_data_streams
551+
or not self._aggregated_data_streams[stream_key].is_running
552+
):
538553

539-
def dt2ts(dt: datetime) -> PBTimestamp:
540-
ts = PBTimestamp()
541-
ts.FromDatetime(dt)
542-
return ts
554+
if not resampling_period:
555+
raise ValueError("resampling_period must be provided")
543556

544-
time_filter = PBTimeFilter(
545-
start=dt2ts(start_time) if start_time else None,
546-
end=dt2ts(end_time) if end_time else None,
547-
)
557+
aggregation_config = PBAggregationConfig(
558+
microgrid_id=microgrid_id,
559+
metric=metric.to_proto(),
560+
aggregation_formula=aggregation_formula,
561+
)
548562

549-
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
550-
time_filter=time_filter,
551-
resampling_options=PBResamplingOptions(
552-
resolution=round(resampling_period.total_seconds())
553-
),
554-
)
563+
def dt2ts(dt: datetime) -> PBTimestamp:
564+
ts = PBTimestamp()
565+
ts.FromDatetime(dt)
566+
return ts
555567

556-
request = PBAggregatedStreamRequest(
557-
aggregation_configs=[aggregation_config],
558-
filter=stream_filter,
559-
)
568+
time_filter = PBTimeFilter(
569+
start=dt2ts(start_time) if start_time else None,
570+
end=dt2ts(end_time) if end_time else None,
571+
)
560572

561-
def transform_response(response: PBAggregatedStreamResponse) -> MetricSample:
562-
return AggregatedMetric(response).sample()
573+
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
574+
time_filter=time_filter,
575+
resampling_options=PBResamplingOptions(
576+
resolution=round(resampling_period.total_seconds())
577+
),
578+
)
563579

564-
async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
565-
call_iterator = self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
566-
request, metadata=self._metadata
580+
request = PBAggregatedStreamRequest(
581+
aggregation_configs=[aggregation_config],
582+
filter=stream_filter,
567583
)
568584

569-
async for response in cast(
570-
AsyncIterable[PBAggregatedStreamResponse], call_iterator
571-
):
572-
yield response
585+
def transform_response(
586+
response: PBAggregatedStreamResponse,
587+
) -> MetricSample:
588+
return AggregatedMetric(response).sample()
589+
590+
async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
591+
call_iterator = (
592+
self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
593+
request, metadata=self._metadata
594+
)
595+
)
573596

574-
broadcaster = GrpcStreamBroadcaster(
575-
stream_name="aggregated-microgrid-data-stream",
576-
stream_method=stream_method,
577-
transform=transform_response,
578-
retry_strategy=None,
579-
)
597+
async for response in cast(
598+
AsyncIterable[PBAggregatedStreamResponse], call_iterator
599+
):
600+
yield response
601+
602+
self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster(
603+
stream_name="aggregated-microgrid-data-stream",
604+
stream_method=stream_method,
605+
transform=transform_response,
606+
retry_strategy=None,
607+
)
580608

581-
receiver = broadcaster.new_receiver()
582-
async for data in receiver:
583-
yield data
609+
return self._aggregated_data_streams[stream_key].new_receiver()

src/frequenz/client/reporting/cli/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ async def data_iter() -> AsyncIterator[MetricSample]:
171171
for formula in formulas:
172172
assert resampling_period is not None
173173
for metric in metrics:
174-
async for sample in client.receive_aggregated_data(
174+
async for sample in await client.receive_aggregated_data(
175175
microgrid_id=microgrid_id,
176176
metric=metric,
177177
aggregation_formula=formula,

0 commit comments

Comments
 (0)