Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/frequenz/client/microgrid/_sensor_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ def sensor_from_proto_with_issues(

def sensor_data_samples_from_proto(
message: microgrid_pb2.ComponentData,
metrics: Set[sensor_pb2.SensorMetric.ValueType],
metrics: Set[sensor_pb2.SensorMetric.ValueType] | None = None,
) -> SensorDataSamples:
"""Convert a protobuf component data message to a sensor data object.

Args:
message: The protobuf message to convert.
metrics: A set of metrics to filter the samples.
metrics: If not `None`, only the specified metrics will be retrieved.
Otherwise all available metrics will be retrieved.

Returns:
The resulting `SensorDataSamples` object.
Expand All @@ -128,7 +129,7 @@ def sensor_data_samples_from_proto(
metrics=[
sensor_metric_sample_from_proto(ts, sample)
for sample in message.sensor.data.sensor_data
if sample.sensor_metric in metrics
if metrics is None or sample.sensor_metric in metrics
],
states=[sensor_state_sample_from_proto(ts, message.sensor)],
)
Expand Down
59 changes: 57 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ async def test_set_bounds_grpc_error(client: _TestClient) -> None:
await client.set_bounds(ComponentId(99), 0.0, 100.0)


async def test_stream_sensor_data_success(
async def test_stream_sensor_data_one_metric(
sensor201: microgrid_pb2.Component, client: _TestClient
) -> None:
"""Test successful streaming of sensor data."""
Expand All @@ -933,7 +933,11 @@ async def stream_data_impl(
sensor_pb2.SensorData(
value=1.0,
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_TEMPERATURE,
)
),
sensor_pb2.SensorData(
value=2.0,
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_PRESSURE,
),
],
),
),
Expand All @@ -960,6 +964,57 @@ async def stream_data_impl(
]


async def test_stream_sensor_data_all_metrics(
sensor201: microgrid_pb2.Component, client: _TestClient
) -> None:
"""Test successful streaming of sensor data."""
now = datetime.now(timezone.utc)

async def stream_data_impl(
*_: Any, **__: Any
) -> AsyncIterator[microgrid_pb2.ComponentData]:
yield microgrid_pb2.ComponentData(
id=int(sensor201.id),
ts=conversion.to_timestamp(now),
sensor=sensor_pb2.Sensor(
state=sensor_pb2.State(
component_state=sensor_pb2.ComponentState.COMPONENT_STATE_OK
),
data=sensor_pb2.Data(
sensor_data=[
sensor_pb2.SensorData(
value=1.0,
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_TEMPERATURE,
),
sensor_pb2.SensorData(
value=2.0,
sensor_metric=sensor_pb2.SensorMetric.SENSOR_METRIC_PRESSURE,
),
],
),
),
)

client.mock_stub.StreamComponentData.side_effect = stream_data_impl
receiver = client.stream_sensor_data(SensorId(sensor201.id))
sample = await receiver.receive()

assert isinstance(sample, SensorDataSamples)
assert int(sample.sensor_id) == sensor201.id
assert sample.states == [
SensorStateSample(
sampled_at=now,
states=frozenset({SensorStateCode.ON}),
warnings=frozenset(),
errors=frozenset(),
)
]
assert sample.metrics == [
SensorMetricSample(sampled_at=now, metric=SensorMetric.TEMPERATURE, value=1.0),
SensorMetricSample(sampled_at=now, metric=SensorMetric.PRESSURE, value=2.0),
]


async def test_stream_sensor_data_grpc_error(
sensor201: microgrid_pb2.Component, caplog: pytest.LogCaptureFixture
) -> None:
Expand Down
Loading