Skip to content

Commit dbe823f

Browse files
flora-hofmann-frequenzshsms
authored andcommitted
Reuse GrpcStreamBroadcaster instances if they exist
Signed-off-by: Flora <[email protected]>
1 parent 028487f commit dbe823f

File tree

1 file changed

+111
-88
lines changed

1 file changed

+111
-88
lines changed

src/frequenz/client/reporting/_client.py

Lines changed: 111 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,22 @@ def __init__(
222222
channel_defaults=channel_defaults,
223223
)
224224

225-
self._broadcasters: dict[int, GrpcStreamBroadcaster[Any, Any]] = {}
225+
self._components_data_streams: dict[
226+
tuple[
227+
tuple[
228+
tuple[int, tuple[int, ...]], ...
229+
], # microgrid_components as a tuple of tuples
230+
tuple[str, ...], # metric names
231+
float | None, # start_time timestamp
232+
float | None, # end_time timestamp
233+
int | None, # resampling period in seconds
234+
bool, # include_states
235+
bool, # include_bounds
236+
],
237+
GrpcStreamBroadcaster[
238+
PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
239+
],
240+
] = {}
226241

227242
self._metadata = (("key", key),) if key else ()
228243

@@ -263,7 +278,7 @@ async def receive_single_component_data(
263278
* timestamp: The timestamp of the metric sample.
264279
* value: The metric value.
265280
"""
266-
receiver = await self._receive_microgrid_components_data_batch(
281+
broadcaster = await self._receive_microgrid_components_data_batch(
267282
microgrid_components=[(microgrid_id, [component_id])],
268283
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
269284
start_time=start_time,
@@ -272,6 +287,9 @@ async def receive_single_component_data(
272287
include_states=include_states,
273288
include_bounds=include_bounds,
274289
)
290+
291+
receiver = broadcaster.new_receiver()
292+
275293
async for batch in receiver:
276294
for entry in batch:
277295
yield entry
@@ -308,7 +326,7 @@ async def receive_microgrid_components_data(
308326
* timestamp: The timestamp of the metric sample.
309327
* value: The metric value.
310328
"""
311-
receiver = await self._receive_microgrid_components_data_batch(
329+
broadcaster = await self._receive_microgrid_components_data_batch(
312330
microgrid_components=microgrid_components,
313331
metrics=[metrics] if isinstance(metrics, Metric) else metrics,
314332
start_time=start_time,
@@ -317,6 +335,9 @@ async def receive_microgrid_components_data(
317335
include_states=include_states,
318336
include_bounds=include_bounds,
319337
)
338+
339+
receiver = broadcaster.new_receiver()
340+
320341
async for batch in receiver:
321342
for entry in batch:
322343
yield entry
@@ -333,102 +354,104 @@ async def _receive_microgrid_components_data_batch(
333354
resampling_period: timedelta | None,
334355
include_states: bool = False,
335356
include_bounds: bool = False,
336-
) -> AsyncIterator[ComponentsDataBatch]:
337-
"""Iterate over the component data batches in the stream using GrpcStreamBroadcaster.
338-
339-
Args:
340-
microgrid_components: A list of tuples of microgrid IDs and component IDs.
341-
metrics: A list of metrics.
342-
start_time: start datetime, if None, the earliest available data will be used
343-
end_time: end datetime, if None starts streaming indefinitely from start_time
344-
resampling_period: The period for resampling the data.
345-
include_states: Whether to include the state data.
346-
include_bounds: Whether to include the bound data.
347-
348-
Returns:
349-
A ComponentsDataBatch object of microgrid components data.
350-
"""
351-
microgrid_components_pb = [
352-
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
353-
for mid, cids in microgrid_components
354-
]
355-
356-
def dt2ts(dt: datetime) -> PBTimestamp:
357-
ts = PBTimestamp()
358-
ts.FromDatetime(dt)
359-
return ts
360-
361-
time_filter = PBTimeFilter(
362-
start=dt2ts(start_time) if start_time else None,
363-
end=dt2ts(end_time) if end_time else None,
357+
) -> GrpcStreamBroadcaster[
358+
PBReceiveMicrogridComponentsDataStreamResponse, ComponentsDataBatch
359+
]:
360+
"""Return a GrpcStreamBroadcaster for microgrid component data."""
361+
stream_key = (
362+
tuple((mid, tuple(cids)) for mid, cids in microgrid_components),
363+
tuple(metric.name for metric in metrics),
364+
start_time.timestamp() if start_time else None,
365+
end_time.timestamp() if end_time else None,
366+
round(resampling_period.total_seconds()) if resampling_period else None,
367+
include_states,
368+
include_bounds,
364369
)
365370

366-
incl_states = (
367-
PBFilterOption.FILTER_OPTION_INCLUDE
368-
if include_states
369-
else PBFilterOption.FILTER_OPTION_EXCLUDE
370-
)
371-
incl_bounds = (
372-
PBFilterOption.FILTER_OPTION_INCLUDE
373-
if include_bounds
374-
else PBFilterOption.FILTER_OPTION_EXCLUDE
375-
)
376-
include_options = PBReceiveMicrogridComponentsDataStreamRequest.IncludeOptions(
377-
bounds=incl_bounds,
378-
states=incl_states,
379-
)
371+
if (
372+
stream_key not in self._components_data_streams
373+
or not self._components_data_streams[stream_key].is_running
374+
):
375+
microgrid_components_pb = [
376+
PBMicrogridComponentIDs(microgrid_id=mid, component_ids=cids)
377+
for mid, cids in microgrid_components
378+
]
379+
380+
def dt2ts(dt: datetime) -> PBTimestamp:
381+
ts = PBTimestamp()
382+
ts.FromDatetime(dt)
383+
return ts
384+
385+
time_filter = PBTimeFilter(
386+
start=dt2ts(start_time) if start_time else None,
387+
end=dt2ts(end_time) if end_time else None,
388+
)
380389

381-
stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
382-
time_filter=time_filter,
383-
resampling_options=PBResamplingOptions(
384-
resolution=(
385-
round(resampling_period.total_seconds())
386-
if resampling_period is not None
387-
else None
390+
incl_states = (
391+
PBFilterOption.FILTER_OPTION_INCLUDE
392+
if include_states
393+
else PBFilterOption.FILTER_OPTION_EXCLUDE
394+
)
395+
incl_bounds = (
396+
PBFilterOption.FILTER_OPTION_INCLUDE
397+
if include_bounds
398+
else PBFilterOption.FILTER_OPTION_EXCLUDE
399+
)
400+
include_options = (
401+
PBReceiveMicrogridComponentsDataStreamRequest.IncludeOptions(
402+
bounds=incl_bounds,
403+
states=incl_states,
388404
)
389-
),
390-
include_options=include_options,
391-
)
392-
393-
metric_conns_pb = [
394-
PBMetricConnections(
395-
metric=metric.to_proto(),
396-
connections=[],
397405
)
398-
for metric in metrics
399-
]
400406

401-
request = PBReceiveMicrogridComponentsDataStreamRequest(
402-
microgrid_components=microgrid_components_pb,
403-
metrics=metric_conns_pb,
404-
filter=stream_filter,
405-
)
407+
stream_filter = PBReceiveMicrogridComponentsDataStreamRequest.StreamFilter(
408+
time_filter=time_filter,
409+
resampling_options=PBResamplingOptions(
410+
resolution=(
411+
round(resampling_period.total_seconds())
412+
if resampling_period
413+
else None
414+
)
415+
),
416+
include_options=include_options,
417+
)
406418

407-
def transform_response(
408-
response: PBReceiveMicrogridComponentsDataStreamResponse,
409-
) -> ComponentsDataBatch:
410-
return ComponentsDataBatch(response)
419+
metric_conns_pb = [
420+
PBMetricConnections(metric=metric.to_proto(), connections=[])
421+
for metric in metrics
422+
]
411423

412-
async def stream_method() -> (
413-
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse]
414-
):
415-
call_iterator = self.stub.ReceiveMicrogridComponentsDataStream(
416-
request, metadata=self._metadata
424+
request = PBReceiveMicrogridComponentsDataStreamRequest(
425+
microgrid_components=microgrid_components_pb,
426+
metrics=metric_conns_pb,
427+
filter=stream_filter,
417428
)
418-
async for response in cast(
419-
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
420-
call_iterator,
421-
):
422-
yield response
423429

424-
broadcaster = GrpcStreamBroadcaster(
425-
stream_name="microgrid-components-data-stream",
426-
stream_method=stream_method,
427-
transform=transform_response,
428-
retry_strategy=None,
429-
)
430+
def transform_response(
431+
response: PBReceiveMicrogridComponentsDataStreamResponse,
432+
) -> ComponentsDataBatch:
433+
return ComponentsDataBatch(response)
430434

431-
return broadcaster.new_receiver()
435+
async def stream_method() -> (
436+
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse]
437+
):
438+
call_iterator = self.stub.ReceiveMicrogridComponentsDataStream(
439+
request, metadata=self._metadata
440+
)
441+
async for response in cast(
442+
AsyncIterable[PBReceiveMicrogridComponentsDataStreamResponse],
443+
call_iterator,
444+
):
445+
yield response
446+
447+
self._components_data_streams[stream_key] = GrpcStreamBroadcaster(
448+
stream_name="microgrid-components-data-stream",
449+
stream_method=stream_method,
450+
transform=transform_response,
451+
retry_strategy=None,
452+
)
453+
454+
return self._components_data_streams[stream_key]
432455

433456
# pylint: disable=too-many-arguments
434457
async def receive_single_sensor_data(

0 commit comments

Comments
 (0)