Skip to content

Commit 4c073f4

Browse files
committed
Implement ReceiveComponentDataStreamRequest
Signed-off-by: Leandro Lucarella <[email protected]>
1 parent b967671 commit 4c073f4

File tree

5 files changed

+386
-4
lines changed

5 files changed

+386
-4
lines changed

src/frequenz/client/microgrid/_client.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2
1818
from frequenz.api.common.v1.microgrid.components import components_pb2
1919
from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc
20+
from frequenz.channels import Receiver
2021
from frequenz.client.base import channel, client, conversion, retry, streaming
2122
from frequenz.client.common.microgrid.components import ComponentId
2223
from google.protobuf.empty_pb2 import Empty
@@ -30,6 +31,8 @@
3031
from .component._component_proto import component_from_proto
3132
from .component._connection import ComponentConnection
3233
from .component._connection_proto import component_connection_from_proto
34+
from .component._data_samples import ComponentDataSamples
35+
from .component._data_samples_proto import component_data_samples_from_proto
3336
from .component._types import ComponentTypes
3437
from .metrics._bounds import Bounds
3538
from .metrics._metric import Metric
@@ -84,8 +87,11 @@ def __init__(
8487
connect=connect,
8588
channel_defaults=channel_defaults,
8689
)
87-
self._broadcasters: dict[
88-
ComponentId, streaming.GrpcStreamBroadcaster[Any, Any]
90+
self._component_data_broadcasters: dict[
91+
str,
92+
streaming.GrpcStreamBroadcaster[
93+
microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples
94+
],
8995
] = {}
9096
self._sensor_data_broadcasters: dict[
9197
str,
@@ -118,15 +124,15 @@ async def __aexit__(
118124
*(
119125
broadcaster.stop()
120126
for broadcaster in itertools.chain(
121-
self._broadcasters.values(),
127+
self._component_data_broadcasters.values(),
122128
self._sensor_data_broadcasters.values(),
123129
)
124130
),
125131
return_exceptions=True,
126132
)
127133
if isinstance(exc, BaseException)
128134
)
129-
self._broadcasters.clear()
135+
self._component_data_broadcasters.clear()
130136
self._sensor_data_broadcasters.clear()
131137

132138
result = None
@@ -530,6 +536,70 @@ async def add_component_bounds( # noqa: DOC502 (Raises ApiClientError indirectl
530536

531537
return None
532538

539+
# noqa: DOC502 (Raises ApiClientError indirectly)
540+
def receive_component_data_samples_stream(
541+
self,
542+
component: ComponentId | Component,
543+
metrics: Iterable[Metric | int],
544+
*,
545+
buffer_size: int = 50,
546+
) -> Receiver[ComponentDataSamples]:
547+
"""Stream data samples from a component.
548+
549+
At least one metric must be specified. If no metric is specified, then the
550+
stream will raise an error.
551+
552+
Warning:
553+
Components may not support all metrics. If a component does not support
554+
a given metric, then the returned data stream will not contain that metric.
555+
556+
There is no way to tell if a metric is not being received because the
557+
component does not support it or because there is a transient issue when
558+
retrieving the metric from the component.
559+
560+
The supported metrics by a component can even change with time, for example,
561+
if a component is updated with new firmware.
562+
563+
Args:
564+
component: The component to stream data from.
565+
metrics: List of metrics to return. Only the specified metrics will be
566+
returned.
567+
buffer_size: The maximum number of messages to buffer in the returned
568+
receiver. After this limit is reached, the oldest messages will be
569+
dropped.
570+
571+
Returns:
572+
The data stream from the component.
573+
"""
574+
component_id = _get_component_id(component)
575+
metrics_set = frozenset([_get_metric_value(m) for m in metrics])
576+
key = f"{component_id}-{hash(metrics_set)}"
577+
broadcaster = self._component_data_broadcasters.get(key)
578+
if broadcaster is None:
579+
client_id = hex(id(self))[2:]
580+
stream_name = f"microgrid-client-{client_id}-component-data-{key}"
581+
# Alias to avoid too long lines linter errors
582+
# pylint: disable-next=invalid-name
583+
Request = microgrid_pb2.ReceiveComponentDataStreamRequest
584+
broadcaster = streaming.GrpcStreamBroadcaster(
585+
stream_name,
586+
lambda: aiter(
587+
self.stub.ReceiveComponentDataStream(
588+
Request(
589+
component_id=_get_component_id(component),
590+
filter=Request.ComponentDataStreamFilter(
591+
metrics=metrics_set
592+
),
593+
),
594+
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
595+
)
596+
),
597+
lambda msg: component_data_samples_from_proto(msg.data),
598+
retry_strategy=self._retry_strategy,
599+
)
600+
self._component_data_broadcasters[key] = broadcaster
601+
return broadcaster.new_receiver(maxsize=buffer_size)
602+
533603

534604
class Validity(enum.Enum):
535605
"""The duration for which a given list of bounds will stay in effect."""
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test data for empty component data stream."""
5+
6+
from typing import Any
7+
8+
import pytest
9+
from frequenz.api.common.v1.metrics import metric_sample_pb2
10+
from frequenz.api.common.v1.microgrid.components import components_pb2
11+
from frequenz.api.microgrid.v1 import microgrid_pb2
12+
from frequenz.channels import Receiver, ReceiverStoppedError
13+
from frequenz.client.common.microgrid.components import ComponentId
14+
15+
from frequenz.client.microgrid.component import ComponentDataSamples
16+
17+
client_args = (ComponentId(1), [metric_sample_pb2.Metric.METRIC_AC_CURRENT])
18+
19+
20+
def assert_stub_method_call(stub_method: Any) -> None:
21+
"""Assert that the gRPC request matches the expected request."""
22+
stub_method.assert_called_once_with(
23+
microgrid_pb2.ReceiveComponentDataStreamRequest(
24+
component_id=1,
25+
filter=microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter(
26+
metrics=[metric_sample_pb2.Metric.METRIC_AC_CURRENT]
27+
),
28+
),
29+
timeout=60.0,
30+
)
31+
32+
33+
# The mock response from the server
34+
grpc_response = microgrid_pb2.ReceiveComponentDataStreamResponse(
35+
data=components_pb2.ComponentData(component_id=1, metric_samples=[], states=[]),
36+
)
37+
38+
39+
# The expected result from the client method
40+
async def assert_client_result(receiver: Receiver[Any]) -> None:
41+
"""Assert that the client result matches the expected empty data."""
42+
result = await receiver.receive()
43+
assert result == ComponentDataSamples(
44+
component_id=ComponentId(1), metric_samples=[], states=[]
45+
)
46+
47+
with pytest.raises(ReceiverStoppedError):
48+
await receiver.receive()
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test data for component data stream with error."""
5+
6+
import enum
7+
from collections.abc import AsyncIterator
8+
from typing import Any
9+
10+
import pytest
11+
from frequenz.api.common.v1.metrics import metric_sample_pb2
12+
from frequenz.api.common.v1.microgrid.components import components_pb2
13+
from frequenz.api.microgrid.v1 import microgrid_pb2
14+
from frequenz.channels import Receiver, ReceiverStoppedError
15+
from frequenz.client.common.microgrid.components import ComponentId
16+
from grpc import StatusCode
17+
18+
from frequenz.client.microgrid.component import ComponentDataSamples
19+
from tests.util import make_grpc_error
20+
21+
client_args = (ComponentId(1), [metric_sample_pb2.Metric.METRIC_DC_VOLTAGE])
22+
23+
24+
def assert_stub_method_call(stub_method: Any) -> None:
25+
"""Assert that the gRPC request matches the expected request."""
26+
stub_method.assert_called_once_with(
27+
microgrid_pb2.ReceiveComponentDataStreamRequest(
28+
component_id=1,
29+
filter=microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter(
30+
metrics=[metric_sample_pb2.Metric.METRIC_DC_VOLTAGE]
31+
),
32+
),
33+
timeout=60.0,
34+
)
35+
36+
37+
@enum.unique
38+
class _State(enum.Enum):
39+
"""State of the gRPC response simulation."""
40+
41+
INITIAL = "initial"
42+
ERROR = "error"
43+
RECEIVING = "receiving"
44+
45+
46+
_iterations = 0
47+
_state: _State = _State.INITIAL
48+
49+
50+
async def grpc_response() -> AsyncIterator[Any]:
51+
"""Simulate a gRPC response with an error on the first iteration."""
52+
global _iterations, _state # pylint: disable=global-statement
53+
54+
_iterations += 1
55+
if _iterations == 1:
56+
_state = _State.ERROR
57+
raise make_grpc_error(StatusCode.UNAVAILABLE)
58+
59+
_state = _State.RECEIVING
60+
for _ in range(3):
61+
yield microgrid_pb2.ReceiveComponentDataStreamResponse(
62+
data=components_pb2.ComponentData(
63+
component_id=1, metric_samples=[], states=[]
64+
),
65+
)
66+
67+
68+
# The expected result from the client method (exception in this case)
69+
async def assert_client_result(receiver: Receiver[Any]) -> None:
70+
"""Assert that the client can keep receiving data after an error."""
71+
assert _state is _State.ERROR
72+
73+
async for result in receiver:
74+
assert result == ComponentDataSamples(
75+
component_id=ComponentId(1), metric_samples=[], states=[]
76+
)
77+
# We need the type ignore here because mypy doesn't realize _state is
78+
# global and updated from outside this function, so it wrongly narrows
79+
# its type to `Literal[_State.ERROR]`, and complaining about the
80+
# impossibility of overlapping with _STATE.RECEIVING.
81+
# https://github.com/python/mypy/issues/19283
82+
assert _state is _State.RECEIVING # type: ignore[comparison-overlap]
83+
84+
with pytest.raises(ReceiverStoppedError):
85+
await receiver.receive()

0 commit comments

Comments
 (0)