Skip to content

Commit f266023

Browse files
committed
Implement receive_component_data_samples_stream method
This method allows to stream data samples from a component. It uses a `GrpcStreamBroadcaster` to handle the streaming of the data samples, even when it doesn't make a lot of sense given how the API works now (it is much less likely that we will have the same request twice, justifiying the reuse of the broadcaster/data channel). To minimize changes, we go with this approach but it will probably be changed in the future. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 9ec02a7 commit f266023

File tree

7 files changed

+636
-2
lines changed

7 files changed

+636
-2
lines changed

src/frequenz/client/microgrid/_client.py

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
from collections.abc import Iterable
88
from dataclasses import replace
99
from datetime import datetime, timedelta
10-
from typing import Any, assert_never
10+
from typing import assert_never
1111

12+
from frequenz.api.common.v1.metrics import metric_sample_pb2
1213
from frequenz.api.common.v1.microgrid.components import components_pb2
1314
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
15+
from frequenz.channels import Receiver
1416
from frequenz.client.base import channel, client, conversion, retry, streaming
1517
from google.protobuf.empty_pb2 import Empty
1618

@@ -23,6 +25,9 @@
2325
from .component._component_proto import component_from_proto
2426
from .component._connection import ComponentConnection
2527
from .component._connection_proto import component_connection_from_proto
28+
from .component._data_samples import ComponentDataSamples
29+
from .component._data_samples_proto import component_data_sample_from_proto
30+
from .metrics._metric import Metric
2631

2732
DEFAULT_GRPC_CALL_TIMEOUT = 60.0
2833
"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -74,7 +79,12 @@ def __init__(
7479
channel_defaults=channel_defaults,
7580
)
7681
self._async_stub: microgrid_pb2_grpc.MicrogridAsyncStub = self.stub # type: ignore
77-
self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {}
82+
self._component_data_samples_broadcasters: dict[
83+
str,
84+
streaming.GrpcStreamBroadcaster[
85+
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
86+
],
87+
] = {}
7888
self._retry_strategy = retry_strategy
7989

8090
async def get_microgrid_info( # noqa: DOC502 (raises ApiClientError indirectly)
@@ -375,6 +385,68 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
375385

376386
return None
377387

388+
# noqa: DOC502 (Raises ApiClientError indirectly)
389+
async def receive_component_data_samples_stream(
390+
self,
391+
component: ComponentId | Component,
392+
metrics: Iterable[Metric | int],
393+
*,
394+
buffer_size: int = 50,
395+
) -> Receiver[ComponentDataSamples]:
396+
"""Stream data samples from a component.
397+
398+
At least one metric must be specified. If no metric is specified, then the
399+
stream will raise an error.
400+
401+
Warning:
402+
Components may not support all metrics. If a component does not support
403+
a given metric, then the returned data stream will not contain that metric.
404+
405+
There is no way to tell if a metric is not being received because the
406+
component does not support it or because there is a transient issue when
407+
retrieving the metric from the component.
408+
409+
The supported metrics by a component can even change with time, for example,
410+
if a component is updated with new firmware.
411+
412+
Args:
413+
component: The component to stream data from.
414+
metrics: List of metrics to return. Only the specified metrics will be
415+
returned.
416+
buffer_size: The maximum number of messages to buffer in the returned
417+
receiver. After this limit is reached, the oldest messages will be
418+
dropped.
419+
420+
Returns:
421+
The data stream from the component.
422+
"""
423+
component_id = _get_component_id(component)
424+
metrics_set = frozenset([_get_metric_value(m) for m in metrics])
425+
key = f"{component_id}-{hash(metrics_set)}"
426+
broadcaster = self._component_data_samples_broadcasters.get(key)
427+
if broadcaster is None:
428+
client_id = hex(id(self))[2:]
429+
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
430+
create_filter = (
431+
microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter
432+
)
433+
broadcaster = streaming.GrpcStreamBroadcaster(
434+
stream_name,
435+
lambda: aiter(
436+
self._async_stub.ReceiveComponentDataStream(
437+
microgrid_pb2.ReceiveComponentDataStreamRequest(
438+
component_id=_get_component_id(component),
439+
filter=create_filter(metrics=metrics_set),
440+
),
441+
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
442+
)
443+
),
444+
lambda msg: component_data_sample_from_proto(msg.data),
445+
retry_strategy=self._retry_strategy,
446+
)
447+
self._component_data_samples_broadcasters[key] = broadcaster
448+
return broadcaster.new_receiver(maxsize=buffer_size)
449+
378450

379451
def _get_component_id(component: ComponentId | Component) -> int:
380452
"""Get the component ID from a component or component ID."""
@@ -387,6 +459,17 @@ def _get_component_id(component: ComponentId | Component) -> int:
387459
assert_never(unexpected)
388460

389461

462+
def _get_metric_value(metric: Metric | int) -> metric_sample_pb2.Metric.ValueType:
463+
"""Get the metric ID from a metric or metric ID."""
464+
match metric:
465+
case Metric():
466+
return metric_sample_pb2.Metric.ValueType(metric.value)
467+
case int():
468+
return metric_sample_pb2.Metric.ValueType(metric)
469+
case unexpected:
470+
assert_never(unexpected)
471+
472+
390473
def _get_category_value(
391474
category: ComponentCategory | int,
392475
) -> components_pb2.ComponentCategory.ValueType:
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Definition of a component data aggregate."""
5+
6+
from dataclasses import dataclass
7+
8+
from .._id import ComponentId
9+
from ..metrics._sample import MetricSample
10+
from ._state_sample import ComponentStateSample
11+
12+
13+
@dataclass(frozen=True, kw_only=True)
14+
class ComponentDataSamples:
15+
"""An aggregate of multiple metrics, states, and errors of a component."""
16+
17+
component_id: ComponentId
18+
"""The unique identifier of the component."""
19+
20+
metrics: list[MetricSample]
21+
"""The metrics sampled from the component."""
22+
23+
states: list[ComponentStateSample]
24+
"""The states sampled from the component."""
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Loading of ComponentDataSamples objects from protobuf messages."""
5+
6+
7+
from frequenz.api.common.v1.microgrid.components import components_pb2
8+
9+
from .._id import ComponentId
10+
from ..metrics._sample_proto import metric_sample_from_proto
11+
from ._data_samples import ComponentDataSamples
12+
from ._state_sample_proto import component_state_sample_from_proto
13+
14+
15+
def component_data_sample_from_proto(
16+
message: components_pb2.ComponentData,
17+
) -> ComponentDataSamples:
18+
"""Convert a protobuf component data message to a component data object.
19+
20+
Args:
21+
message: The protobuf message to convert.
22+
23+
Returns:
24+
The resulting `ComponentDataSamples` object.
25+
"""
26+
# At some point it might make sense to also log issues found in the samples, but
27+
# using a naive approach like in `component_from_proto` might spam the logs too
28+
# much, as we can receive several samples per second, and if a component is in
29+
# a unrecognized state for long, it will mean we will emit the same log message
30+
# again and again.
31+
return ComponentDataSamples(
32+
component_id=ComponentId(message.component_id),
33+
metrics=[metric_sample_from_proto(sample) for sample in message.metric_samples],
34+
states=[component_state_sample_from_proto(state) for state in message.states],
35+
)

0 commit comments

Comments
 (0)