Skip to content

Commit cc0f3b5

Browse files
committed
Reuse GrpcStreamBroadcasters for sensor data streams as well
Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 6017e47 commit cc0f3b5

File tree

1 file changed

+90
-62
lines changed

1 file changed

+90
-62
lines changed

src/frequenz/client/reporting/_client.py

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ def __init__(
102102
PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
103103
],
104104
] = {}
105+
self._sensors_data_streams: dict[
106+
tuple[
107+
tuple[
108+
tuple[int, tuple[int, ...]], ...
109+
], # microgrid_sensors as a tuple of tuples
110+
tuple[str, ...], # metric names
111+
float | None, # start_time timestamp
112+
float | None, # end_time timestamp
113+
int | None, # resampling period in seconds
114+
bool, # include_states
115+
],
116+
GrpcStreamBroadcaster[
117+
PBReceiveMicrogridSensorsDataStreamResponse, SensorsDataBatch
118+
],
119+
] = {}
105120

106121
self._metadata = (("key", key),) if key else ()
107122

@@ -407,81 +422,94 @@ async def _receive_microgrid_sensors_data_batch(
407422
Returns:
408423
A SensorDataBatch object of microgrid sensors data.
409424
"""
410-
microgrid_sensors_pb = [
411-
PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids)
412-
for mid, sids in microgrid_sensors
413-
]
425+
stream_key = (
426+
tuple((mid, tuple(sids)) for mid, sids in microgrid_sensors),
427+
tuple(metric.name for metric in metrics),
428+
start_time.timestamp() if start_time else None,
429+
end_time.timestamp() if end_time else None,
430+
round(resampling_period.total_seconds()) if resampling_period else None,
431+
include_states,
432+
)
414433

415-
def dt2ts(dt: datetime) -> PBTimestamp:
416-
ts = PBTimestamp()
417-
ts.FromDatetime(dt)
418-
return ts
434+
if (
435+
stream_key not in self._sensors_data_streams
436+
or not self._sensors_data_streams[stream_key].is_running
437+
):
419438

420-
time_filter = PBTimeFilter(
421-
start=dt2ts(start_time) if start_time else None,
422-
end=dt2ts(end_time) if end_time else None,
423-
)
439+
microgrid_sensors_pb = [
440+
PBMicrogridSensorIDs(microgrid_id=mid, sensor_ids=sids)
441+
for mid, sids in microgrid_sensors
442+
]
424443

425-
incl_states = (
426-
PBFilterOption.FILTER_OPTION_INCLUDE
427-
if include_states
428-
else PBFilterOption.FILTER_OPTION_EXCLUDE
429-
)
430-
include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions(
431-
states=incl_states,
432-
)
444+
def dt2ts(dt: datetime) -> PBTimestamp:
445+
ts = PBTimestamp()
446+
ts.FromDatetime(dt)
447+
return ts
433448

434-
stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter(
435-
time_filter=time_filter,
436-
resampling_options=PBResamplingOptions(
437-
resolution=(
438-
round(resampling_period.total_seconds())
439-
if resampling_period is not None
440-
else None
441-
)
442-
),
443-
include_options=include_options,
444-
)
449+
time_filter = PBTimeFilter(
450+
start=dt2ts(start_time) if start_time else None,
451+
end=dt2ts(end_time) if end_time else None,
452+
)
445453

446-
metric_conns_pb = [
447-
PBMetricConnections(
448-
metric=metric.to_proto(),
449-
connections=[],
454+
incl_states = (
455+
PBFilterOption.FILTER_OPTION_INCLUDE
456+
if include_states
457+
else PBFilterOption.FILTER_OPTION_EXCLUDE
458+
)
459+
include_options = PBReceiveMicrogridSensorsDataStreamRequest.IncludeOptions(
460+
states=incl_states,
450461
)
451-
for metric in metrics
452-
]
453462

454-
request = PBReceiveMicrogridSensorsDataStreamRequest(
455-
microgrid_sensors=microgrid_sensors_pb,
456-
metrics=metric_conns_pb,
457-
filter=stream_filter,
458-
)
463+
stream_filter = PBReceiveMicrogridSensorsDataStreamRequest.StreamFilter(
464+
time_filter=time_filter,
465+
resampling_options=PBResamplingOptions(
466+
resolution=(
467+
round(resampling_period.total_seconds())
468+
if resampling_period is not None
469+
else None
470+
)
471+
),
472+
include_options=include_options,
473+
)
459474

460-
def transform_response(
461-
response: PBReceiveMicrogridSensorsDataStreamResponse,
462-
) -> SensorsDataBatch:
463-
return SensorsDataBatch(response)
475+
metric_conns_pb = [
476+
PBMetricConnections(
477+
metric=metric.to_proto(),
478+
connections=[],
479+
)
480+
for metric in metrics
481+
]
464482

465-
async def stream_method() -> (
466-
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse]
467-
):
468-
call_iterator = self.stub.ReceiveMicrogridSensorsDataStream(
469-
request, metadata=self._metadata
483+
request = PBReceiveMicrogridSensorsDataStreamRequest(
484+
microgrid_sensors=microgrid_sensors_pb,
485+
metrics=metric_conns_pb,
486+
filter=stream_filter,
470487
)
471-
async for response in cast(
472-
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse],
473-
call_iterator,
488+
489+
def transform_response(
490+
response: PBReceiveMicrogridSensorsDataStreamResponse,
491+
) -> SensorsDataBatch:
492+
return SensorsDataBatch(response)
493+
494+
async def stream_method() -> (
495+
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse]
474496
):
475-
yield response
497+
call_iterator = self.stub.ReceiveMicrogridSensorsDataStream(
498+
request, metadata=self._metadata
499+
)
500+
async for response in cast(
501+
AsyncIterable[PBReceiveMicrogridSensorsDataStreamResponse],
502+
call_iterator,
503+
):
504+
yield response
476505

477-
broadcaster = GrpcStreamBroadcaster(
478-
stream_name="microgrid-sensors-data-stream",
479-
stream_method=stream_method,
480-
transform=transform_response,
481-
retry_strategy=None,
482-
)
506+
self._sensors_data_streams[stream_key] = GrpcStreamBroadcaster(
507+
stream_name="microgrid-sensors-data-stream",
508+
stream_method=stream_method,
509+
transform=transform_response,
510+
)
483511

484-
return broadcaster.new_receiver()
512+
return self._sensors_data_streams[stream_key].new_receiver()
485513

486514
async def receive_aggregated_data(
487515
self,

0 commit comments

Comments
 (0)