Skip to content

Commit 743025b

Browse files
committed
Implement ReceiveComponentDataStream method
This method allows to stream data samples from a component. It uses a `GrpcStreamBroadcaster` to handle the streaming of the data samples, even when it doesn't make a lot of sense given how the API works now (it is much less likely that we will have the same request twice, justifiying the reuse of the broadcaster/data channel). To minimize changes, we go with this approach but it will probably be changed in the future. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent fc51378 commit 743025b

File tree

9 files changed

+647
-3
lines changed

9 files changed

+647
-3
lines changed

src/frequenz/client/microgrid/_client.py

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99
from collections.abc import Iterable
1010
from dataclasses import replace
1111
from datetime import datetime, timedelta
12-
from typing import Any, assert_never
12+
from typing import assert_never
1313

14+
from frequenz.api.common.v1.metrics import metric_sample_pb2
1415
from frequenz.api.common.v1.microgrid.components import components_pb2
1516
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
17+
from frequenz.channels import Receiver
1618
from frequenz.client.base import channel, client, conversion, retry, streaming
1719
from google.protobuf.empty_pb2 import Empty
1820

@@ -26,6 +28,9 @@
2628
from .component._component_proto import component_from_proto
2729
from .component._connection import ComponentConnection
2830
from .component._connection_proto import component_connection_from_proto
31+
from .component._data_samples import ComponentDataSamples
32+
from .component._data_samples_proto import component_data_sample_from_proto
33+
from .metrics._metric import Metric
2934

3035
DEFAULT_GRPC_CALL_TIMEOUT = 60.0
3136
"""The default timeout for gRPC calls made by this client (in seconds)."""
@@ -76,7 +81,12 @@ def __init__(
7681
connect=connect,
7782
channel_defaults=channel_defaults,
7883
)
79-
self._broadcasters: dict[int, streaming.GrpcStreamBroadcaster[Any, Any]] = {}
84+
self._component_data_samples_broadcasters: dict[
85+
str,
86+
streaming.GrpcStreamBroadcaster[
87+
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
88+
],
89+
] = {}
8090
self._retry_strategy = retry_strategy
8191

8292
@property
@@ -372,7 +382,7 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
372382

373383
response = await client.call_stub_method(
374384
self,
375-
lambda: self._async_stub.SetComponentPowerReactive(
385+
lambda: self.stub.SetComponentPowerReactive(
376386
microgrid_pb2.SetComponentPowerReactiveRequest(
377387
component_id=_get_component_id(component),
378388
power=power,
@@ -388,6 +398,68 @@ async def set_component_power_reactive( # noqa: DOC502 (raises ApiClientError i
388398

389399
return None
390400

401+
# noqa: DOC502 (Raises ApiClientError indirectly)
402+
async def receive_component_data_samples_stream(
403+
self,
404+
component: ComponentId | Component,
405+
metrics: Iterable[Metric | int],
406+
*,
407+
buffer_size: int = 50,
408+
) -> Receiver[ComponentDataSamples]:
409+
"""Stream data samples from a component.
410+
411+
At least one metric must be specified. If no metric is specified, then the
412+
stream will raise an error.
413+
414+
Warning:
415+
Components may not support all metrics. If a component does not support
416+
a given metric, then the returned data stream will not contain that metric.
417+
418+
There is no way to tell if a metric is not being received because the
419+
component does not support it or because there is a transient issue when
420+
retrieving the metric from the component.
421+
422+
The supported metrics by a component can even change with time, for example,
423+
if a component is updated with new firmware.
424+
425+
Args:
426+
component: The component to stream data from.
427+
metrics: List of metrics to return. Only the specified metrics will be
428+
returned.
429+
buffer_size: The maximum number of messages to buffer in the returned
430+
receiver. After this limit is reached, the oldest messages will be
431+
dropped.
432+
433+
Returns:
434+
The data stream from the component.
435+
"""
436+
component_id = _get_component_id(component)
437+
metrics_set = frozenset([_get_metric_value(m) for m in metrics])
438+
key = f"{component_id}-{hash(metrics_set)}"
439+
broadcaster = self._component_data_samples_broadcasters.get(key)
440+
if broadcaster is None:
441+
client_id = hex(id(self))[2:]
442+
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
443+
create_filter = (
444+
microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter
445+
)
446+
broadcaster = streaming.GrpcStreamBroadcaster(
447+
stream_name,
448+
lambda: aiter(
449+
self.stub.ReceiveComponentDataStream(
450+
microgrid_pb2.ReceiveComponentDataStreamRequest(
451+
component_id=_get_component_id(component),
452+
filter=create_filter(metrics=metrics_set),
453+
),
454+
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
455+
)
456+
),
457+
lambda msg: component_data_sample_from_proto(msg.data),
458+
retry_strategy=self._retry_strategy,
459+
)
460+
self._component_data_samples_broadcasters[key] = broadcaster
461+
return broadcaster.new_receiver(maxsize=buffer_size)
462+
391463

392464
def _get_component_id(component: ComponentId | Component) -> int:
393465
"""Get the component ID from a component or component ID."""
@@ -400,6 +472,17 @@ def _get_component_id(component: ComponentId | Component) -> int:
400472
assert_never(unexpected)
401473

402474

475+
def _get_metric_value(metric: Metric | int) -> metric_sample_pb2.Metric.ValueType:
476+
"""Get the metric ID from a metric or metric ID."""
477+
match metric:
478+
case Metric():
479+
return metric_sample_pb2.Metric.ValueType(metric.value)
480+
case int():
481+
return metric_sample_pb2.Metric.ValueType(metric)
482+
case unexpected:
483+
assert_never(unexpected)
484+
485+
403486
def _get_category_value(
404487
category: ComponentCategory | int,
405488
) -> components_pb2.ComponentCategory.ValueType:

src/frequenz/client/microgrid/component/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ._connection import ComponentConnection
2020
from ._converter import Converter
2121
from ._crypto_miner import CryptoMiner
22+
from ._data_samples import ComponentDataSamples
2223
from ._electrolyzer import Electrolyzer
2324
from ._ev_charger import (
2425
AcEvCharger,
@@ -52,6 +53,7 @@
5253
UnspecifiedComponent,
5354
)
5455
from ._relay import Relay
56+
from ._state_sample import ComponentErrorCode, ComponentStateCode, ComponentStateSample
5557
from ._status import ComponentStatus
5658
from ._voltage_transformer import VoltageTransformer
5759

@@ -65,6 +67,10 @@
6567
"Component",
6668
"ComponentCategory",
6769
"ComponentConnection",
70+
"ComponentDataSamples",
71+
"ComponentErrorCode",
72+
"ComponentStateCode",
73+
"ComponentStateSample",
6874
"ComponentStatus",
6975
"ComponentTypes",
7076
"Converter",
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Definition of a component data aggregate."""
5+
6+
from dataclasses import dataclass
7+
8+
from .._id import ComponentId
9+
from ..metrics._sample import MetricSample
10+
from ._state_sample import ComponentStateSample
11+
12+
13+
@dataclass(frozen=True, kw_only=True)
14+
class ComponentDataSamples:
15+
"""An aggregate of multiple metrics, states, and errors of a component."""
16+
17+
component_id: ComponentId
18+
"""The unique identifier of the component."""
19+
20+
metrics: list[MetricSample]
21+
"""The metrics sampled from the component."""
22+
23+
states: list[ComponentStateSample]
24+
"""The states sampled from the component."""
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Loading of ComponentDataSamples objects from protobuf messages."""
5+
6+
7+
from frequenz.api.common.v1.microgrid.components import components_pb2
8+
9+
from .._id import ComponentId
10+
from ..metrics._sample_proto import metric_sample_from_proto
11+
from ._data_samples import ComponentDataSamples
12+
from ._state_sample_proto import component_state_sample_from_proto
13+
14+
15+
def component_data_sample_from_proto(
16+
message: components_pb2.ComponentData,
17+
) -> ComponentDataSamples:
18+
"""Convert a protobuf component data message to a component data object.
19+
20+
Args:
21+
message: The protobuf message to convert.
22+
23+
Returns:
24+
The resulting `ComponentDataSamples` object.
25+
"""
26+
# At some point it might make sense to also log issues found in the samples, but
27+
# using a naive approach like in `component_from_proto` might spam the logs too
28+
# much, as we can receive several samples per second, and if a component is in
29+
# a unrecognized state for long, it will mean we will emit the same log message
30+
# again and again.
31+
return ComponentDataSamples(
32+
component_id=ComponentId(message.component_id),
33+
metrics=[metric_sample_from_proto(sample) for sample in message.metric_samples],
34+
states=[component_state_sample_from_proto(state) for state in message.states],
35+
)

0 commit comments

Comments
 (0)