Skip to content

Commit 3dfaf06

Browse files
authored
Implement ReceiveComponentDataStreamRequest (#178)
2 parents 5a701da + 4c073f4 commit 3dfaf06

File tree

16 files changed

+1911
-31
lines changed

16 files changed

+1911
-31
lines changed

src/frequenz/client/microgrid/_client.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2
1818
from frequenz.api.common.v1.microgrid.components import components_pb2
1919
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
20+
from frequenz.channels import Receiver
2021
from frequenz.client.base import channel, client, conversion, retry, streaming
2122
from frequenz.client.common.microgrid.components import ComponentId
2223
from google.protobuf.empty_pb2 import Empty
@@ -30,6 +31,8 @@
3031
from .component._component_proto import component_from_proto
3132
from .component._connection import ComponentConnection
3233
from .component._connection_proto import component_connection_from_proto
34+
from .component._data_samples import ComponentDataSamples
35+
from .component._data_samples_proto import component_data_samples_from_proto
3336
from .component._types import ComponentTypes
3437
from .metrics._bounds import Bounds
3538
from .metrics._metric import Metric
@@ -84,8 +87,11 @@ def __init__(
8487
connect=connect,
8588
channel_defaults=channel_defaults,
8689
)
87-
self._broadcasters: dict[
88-
ComponentId, streaming.GrpcStreamBroadcaster[Any, Any]
90+
self._component_data_broadcasters: dict[
91+
str,
92+
streaming.GrpcStreamBroadcaster[
93+
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
94+
],
8995
] = {}
9096
self._sensor_data_broadcasters: dict[
9197
str,
@@ -118,15 +124,15 @@ async def __aexit__(
118124
*(
119125
broadcaster.stop()
120126
for broadcaster in itertools.chain(
121-
self._broadcasters.values(),
127+
self._component_data_broadcasters.values(),
122128
self._sensor_data_broadcasters.values(),
123129
)
124130
),
125131
return_exceptions=True,
126132
)
127133
if isinstance(exc, BaseException)
128134
)
129-
self._broadcasters.clear()
135+
self._component_data_broadcasters.clear()
130136
self._sensor_data_broadcasters.clear()
131137

132138
result = None
@@ -528,6 +534,70 @@ async def add_component_bounds( # noqa: DOC502 (Raises ApiClientError indirectl
528534

529535
return None
530536

537+
# noqa: DOC502 (Raises ApiClientError indirectly)
538+
def receive_component_data_samples_stream(
539+
self,
540+
component: ComponentId | Component,
541+
metrics: Iterable[Metric | int],
542+
*,
543+
buffer_size: int = 50,
544+
) -> Receiver[ComponentDataSamples]:
545+
"""Stream data samples from a component.
546+
547+
At least one metric must be specified. If no metric is specified, then the
548+
stream will raise an error.
549+
550+
Warning:
551+
Components may not support all metrics. If a component does not support
552+
a given metric, then the returned data stream will not contain that metric.
553+
554+
There is no way to tell if a metric is not being received because the
555+
component does not support it or because there is a transient issue when
556+
retrieving the metric from the component.
557+
558+
The supported metrics by a component can even change with time, for example,
559+
if a component is updated with new firmware.
560+
561+
Args:
562+
component: The component to stream data from.
563+
metrics: List of metrics to return. Only the specified metrics will be
564+
returned.
565+
buffer_size: The maximum number of messages to buffer in the returned
566+
receiver. After this limit is reached, the oldest messages will be
567+
dropped.
568+
569+
Returns:
570+
The data stream from the component.
571+
"""
572+
component_id = _get_component_id(component)
573+
metrics_set = frozenset([_get_metric_value(m) for m in metrics])
574+
key = f"{component_id}-{hash(metrics_set)}"
575+
broadcaster = self._component_data_broadcasters.get(key)
576+
if broadcaster is None:
577+
client_id = hex(id(self))[2:]
578+
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
579+
# Alias to avoid too long lines linter errors
580+
# pylint: disable-next=invalid-name
581+
Request = microgrid_pb2.ReceiveComponentDataStreamRequest
582+
broadcaster = streaming.GrpcStreamBroadcaster(
583+
stream_name,
584+
lambda: aiter(
585+
self.stub.ReceiveComponentDataStream(
586+
Request(
587+
component_id=_get_component_id(component),
588+
filter=Request.ComponentDataStreamFilter(
589+
metrics=metrics_set
590+
),
591+
),
592+
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
593+
)
594+
),
595+
lambda msg: component_data_samples_from_proto(msg.data),
596+
retry_strategy=self._retry_strategy,
597+
)
598+
self._component_data_broadcasters[key] = broadcaster
599+
return broadcaster.new_receiver(maxsize=buffer_size)
600+
531601

532602
class Validity(enum.Enum):
533603
"""The duration for which a given list of bounds will stay in effect."""

src/frequenz/client/microgrid/component/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from ._connection import ComponentConnection
1919
from ._converter import Converter
2020
from ._crypto_miner import CryptoMiner
21+
from ._data_samples import ComponentDataSamples
2122
from ._electrolyzer import Electrolyzer
2223
from ._ev_charger import (
2324
AcEvCharger,
@@ -50,6 +51,7 @@
5051
UnspecifiedComponent,
5152
)
5253
from ._relay import Relay
54+
from ._state_sample import ComponentErrorCode, ComponentStateCode, ComponentStateSample
5355
from ._status import ComponentStatus
5456
from ._types import (
5557
ComponentTypes,
@@ -69,6 +71,10 @@
6971
"Component",
7072
"ComponentCategory",
7173
"ComponentConnection",
74+
"ComponentDataSamples",
75+
"ComponentErrorCode",
76+
"ComponentStateCode",
77+
"ComponentStateSample",
7278
"ComponentStatus",
7379
"ComponentTypes",
7480
"Converter",
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Definition of a component data aggregate."""
5+
6+
from dataclasses import dataclass
7+
8+
from frequenz.client.common.microgrid.components import ComponentId
9+
10+
from ..metrics._sample import MetricSample
11+
from ._state_sample import ComponentStateSample
12+
13+
14+
@dataclass(frozen=True, kw_only=True)
15+
class ComponentDataSamples:
16+
"""An aggregate of multiple metrics, states, and errors of a component."""
17+
18+
component_id: ComponentId
19+
"""The unique identifier of the component."""
20+
21+
metric_samples: list[MetricSample]
22+
"""The metrics sampled from the component."""
23+
24+
states: list[ComponentStateSample]
25+
"""The states sampled from the component."""
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Loading of ComponentDataSamples objects from protobuf messages."""
5+
6+
7+
import logging
8+
from functools import partial
9+
10+
from frequenz.api.common.v1.microgrid.components import components_pb2
11+
from frequenz.client.common.microgrid.components import ComponentId
12+
13+
from ..metrics._sample_proto import metric_sample_from_proto_with_issues
14+
from ._data_samples import ComponentDataSamples
15+
from ._state_sample_proto import component_state_sample_from_proto
16+
17+
_logger = logging.getLogger(__name__)
18+
19+
20+
def component_data_samples_from_proto(
21+
message: components_pb2.ComponentData,
22+
) -> ComponentDataSamples:
23+
"""Convert a protobuf component data message to a component data object.
24+
25+
Args:
26+
message: The protobuf message to convert.
27+
28+
Returns:
29+
The resulting `ComponentDataSamples` object.
30+
"""
31+
major_issues: list[str] = []
32+
minor_issues: list[str] = []
33+
34+
samples = component_data_samples_from_proto_with_issues(
35+
message, major_issues=major_issues, minor_issues=minor_issues
36+
)
37+
38+
# This approach to logging issues might be too noisy. Samples are received
39+
# very often, and sometimes can remain unchanged for a long time, leading to
40+
# repeated log messages. We might need to adjust the logging strategy
41+
# in the future.
42+
if major_issues:
43+
_logger.warning(
44+
"Found issues in component data samples: %s | Protobuf message:\n%s",
45+
", ".join(major_issues),
46+
message,
47+
)
48+
49+
if minor_issues:
50+
_logger.debug(
51+
"Found minor issues in component data samples: %s | Protobuf message:\n%s",
52+
", ".join(minor_issues),
53+
message,
54+
)
55+
56+
return samples
57+
58+
59+
def component_data_samples_from_proto_with_issues(
60+
message: components_pb2.ComponentData,
61+
*,
62+
major_issues: list[str],
63+
minor_issues: list[str],
64+
) -> ComponentDataSamples:
65+
"""Convert a protobuf component data message to a component data object collecting issues.
66+
67+
Args:
68+
message: The protobuf message to convert.
69+
major_issues: A list to append major issues to.
70+
minor_issues: A list to append minor issues to.
71+
72+
Returns:
73+
The resulting `ComponentDataSamples` object.
74+
"""
75+
return ComponentDataSamples(
76+
component_id=ComponentId(message.component_id),
77+
metric_samples=list(
78+
map(
79+
partial(
80+
metric_sample_from_proto_with_issues,
81+
major_issues=major_issues,
82+
minor_issues=minor_issues,
83+
),
84+
message.metric_samples,
85+
)
86+
),
87+
states=list(map(component_state_sample_from_proto, message.states)),
88+
)

0 commit comments

Comments
 (0)