Skip to content

Commit dd24055

Browse files
authored
Reuse GrpcStreamBroadcasters for aggregated data streams (frequenz-floss#190)
Also remove unneecessary `async` declarations.
2 parents ad031dc + 8c2c93b commit dd24055

File tree

2 files changed

+83
-61
lines changed

2 files changed

+83
-61
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
9+
- The `receive_aggregated_data` method now returns a `Receiver`, which provides more flexibility that the previous `AsyncIterator`.
10+
- The `receive_microgrid_sensors_data`, `receive_single_sensor_data` and `receive_aggregated_data` methods are no-longer `async`.
1011

1112
## New Features
1213

1314
<!-- Here goes the main new features and examples or instructions on how to use them -->
1415

1516
## Bug Fixes
1617

17-
* Take out 'async' from 'receive_microgrid_sensors_data' and 'receive_single_sensor_data'.
18+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/client/reporting/_client.py

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
"""Client for requests to the Reporting API."""
55

6-
from collections.abc import AsyncIterable, AsyncIterator
6+
from collections.abc import AsyncIterable
77
from datetime import datetime, timedelta
88
from typing import cast
99

@@ -117,6 +117,17 @@ def __init__(
117117
PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
118118
],
119119
] = {}
120+
self._aggregated_data_streams: dict[
121+
tuple[
122+
int, # microgrid_id
123+
str, # metric name
124+
str, # aggregation_formula
125+
float | None, # start_time timestamp
126+
float | None, # end_time timestamp
127+
int | None, # resampling period in seconds
128+
],
129+
GrpcStreamBroadcaster[PBAggregatedStreamResponse, MetricSample],
130+
] = {}
120131

121132
self._metadata = (("key", key),) if key else ()
122133

@@ -219,7 +230,7 @@ def _receive_microgrid_components_data_batch(
219230
include_states: bool = False,
220231
include_bounds: bool = False,
221232
) -> Receiver[ComponentsDataBatch]:
222-
"""Return a GrpcStreamBroadcaster for microgrid component data."""
233+
"""Return a Receiver for the microgrid component data stream."""
223234
stream_key = (
224235
tuple((mid, tuple(cids)) for mid, cids in microgrid_components),
225236
tuple(metric.name for metric in metrics),
@@ -294,17 +305,16 @@ def transform_response(
294305
) -> ComponentsDataBatch:
295306
return ComponentsDataBatch(response)
296307

297-
async def stream_method() -> (
308+
def stream_method() -> (
298309
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse]
299310
):
300311
call_iterator = self.stub.ReceiveMicrogridComponentsDataStream(
301312
request, metadata=self._metadata
302313
)
303-
async for response in cast(
314+
return cast(
304315
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
305316
call_iterator,
306-
):
307-
yield response
317+
)
308318

309319
self._components_data_streams[stream_key] = GrpcStreamBroadcaster(
310320
stream_name="microgrid-components-data-stream",
@@ -339,7 +349,7 @@ def receive_single_sensor_data(
339349
include_states: Whether to include the state data.
340350
341351
Returns:
342-
A receiver of `MetricSample`s.
352+
A receiver of `MetricSample`s.
343353
"""
344354
receiver = self._receive_microgrid_sensors_data_batch(
345355
microgrid_sensors=[(microgrid_id, [sensor_id])],
@@ -409,7 +419,7 @@ def _receive_microgrid_sensors_data_batch(
409419
include_states: Whether to include the state data.
410420
411421
Returns:
412-
A GrpcStreamBroadcaster that can be used to receive sensor data batches.
422+
A receiver of `SensorsDataBatch`s.
413423
"""
414424
stream_key = (
415425
tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors),
@@ -480,17 +490,16 @@ def transform_response(
480490
) -> SensorsDataBatch:
481491
return SensorsDataBatch(response)
482492

483-
async def stream_method() -> (
493+
def stream_method() -> (
484494
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse]
485495
):
486496
call_iterator = self.stub.ReceiveMicrogridSensorsDataStream(
487497
request, metadata=self._metadata
488498
)
489-
async for response in cast(
499+
return cast(
490500
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse],
491501
call_iterator,
492-
):
493-
yield response
502+
)
494503

495504
self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster(
496505
stream_name="microgrid-sensors-data-stream",
@@ -500,7 +509,7 @@ async def stream_method() -> (
500509

501510
return self._sensors_data_streams[stream_key].new_receiver()
502511

503-
async def receive_aggregated_data(
512+
def receive_aggregated_data(
504513
self,
505514
*,
506515
microgrid_id: int,
@@ -509,7 +518,7 @@ async def receive_aggregated_data(
509518
start_time: datetime | None,
510519
end_time: datetime | None,
511520
resampling_period: timedelta,
512-
) -> AsyncIterator[MetricSample]:
521+
) -> Receiver[MetricSample]:
513522
"""Iterate over aggregated data for a single metric using GrpcStreamBroadcaster.
514523
515524
For now this only supports a single metric and aggregation formula.
@@ -521,63 +530,75 @@ async def receive_aggregated_data(
521530
end_time: end datetime, if None starts streaming indefinitely from start_time
522531
resampling_period: The period for resampling the data.
523532
524-
Yields:
525-
An iterator over the aggregated metric samples.
533+
Returns:
534+
A receiver of `MetricSample`s.
526535
527536
Raises:
528537
ValueError: If the resampling_period is not provided.
529538
"""
530-
if not resampling_period:
531-
raise ValueError("resampling_period must be provided")
532-
533-
aggregation_config = PBAggregationConfig(
534-
microgrid_id=microgrid_id,
535-
metric=metric.to_proto(),
536-
aggregation_formula=aggregation_formula,
539+
stream_key = (
540+
microgrid_id,
541+
metric.name,
542+
aggregation_formula,
543+
start_time.timestamp() if start_time else None,
544+
end_time.timestamp() if end_time else None,
545+
round(resampling_period.total_seconds()) if resampling_period else None,
537546
)
547+
if (
548+
stream_key not in self._aggregated_data_streams
549+
or not self._aggregated_data_streams[stream_key].is_running
550+
):
538551

539-
def dt2ts(dt: datetime) -> PBTimestamp:
540-
ts = PBTimestamp()
541-
ts.FromDatetime(dt)
542-
return ts
552+
if not resampling_period:
553+
raise ValueError("resampling_period must be provided")
543554

544-
time_filter = PBTimeFilter(
545-
start=dt2ts(start_time) if start_time else None,
546-
end=dt2ts(end_time) if end_time else None,
547-
)
555+
aggregation_config = PBAggregationConfig(
556+
microgrid_id=microgrid_id,
557+
metric=metric.to_proto(),
558+
aggregation_formula=aggregation_formula,
559+
)
548560

549-
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
550-
time_filter=time_filter,
551-
resampling_options=PBResamplingOptions(
552-
resolution=round(resampling_period.total_seconds())
553-
),
554-
)
561+
def dt2ts(dt: datetime) -> PBTimestamp:
562+
ts = PBTimestamp()
563+
ts.FromDatetime(dt)
564+
return ts
555565

556-
request = PBAggregatedStreamRequest(
557-
aggregation_configs=[aggregation_config],
558-
filter=stream_filter,
559-
)
566+
time_filter = PBTimeFilter(
567+
start=dt2ts(start_time) if start_time else None,
568+
end=dt2ts(end_time) if end_time else None,
569+
)
560570

561-
def transform_response(response: PBAggregatedStreamResponse) -> MetricSample:
562-
return AggregatedMetric(response).sample()
571+
stream_filter = PBAggregatedStreamRequest.AggregationStreamFilter(
572+
time_filter=time_filter,
573+
resampling_options=PBResamplingOptions(
574+
resolution=round(resampling_period.total_seconds())
575+
),
576+
)
563577

564-
async def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
565-
call_iterator = self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
566-
request, metadata=self._metadata
578+
request = PBAggregatedStreamRequest(
579+
aggregation_configs=[aggregation_config],
580+
filter=stream_filter,
567581
)
568582

569-
async for response in cast(
570-
AsyncIterable[PBAggregatedStreamResponse], call_iterator
571-
):
572-
yield response
583+
def transform_response(
584+
response: PBAggregatedStreamResponse,
585+
) -> MetricSample:
586+
return AggregatedMetric(response).sample()
587+
588+
def stream_method() -> AsyncIterable[PBAggregatedStreamResponse]:
589+
call_iterator = (
590+
self.stub.ReceiveAggregatedMicrogridComponentsDataStream(
591+
request, metadata=self._metadata
592+
)
593+
)
573594

574-
broadcaster = GrpcStreamBroadcaster(
575-
stream_name="aggregated-microgrid-data-stream",
576-
stream_method=stream_method,
577-
transform=transform_response,
578-
retry_strategy=None,
579-
)
595+
return cast(AsyncIterable[PBAggregatedStreamResponse], call_iterator)
596+
597+
self._aggregated_data_streams[stream_key] = GrpcStreamBroadcaster(
598+
stream_name="aggregated-microgrid-data-stream",
599+
stream_method=stream_method,
600+
transform=transform_response,
601+
retry_strategy=None,
602+
)
580603

581-
receiver = broadcaster.new_receiver()
582-
async for data in receiver:
583-
yield data
604+
return self._aggregated_data_streams[stream_key].new_receiver()

0 commit comments

Comments
 (0)