Skip to content
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- The `MicrogridApiClient` can now list sensor retrieving their metadata (`list_sensors()`) and can stream sensor data (`stream_sensor_data()`).

## Bug Fixes

Expand Down
5 changes: 2 additions & 3 deletions src/frequenz/client/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
UnknownError,
UnrecognizedGrpcStatus,
)
from ._id import ComponentId, MicrogridId
from ._lifetime import Lifetime
from ._metadata import Location, Metadata

__all__ = [
Expand All @@ -76,7 +76,6 @@
"Component",
"ComponentCategory",
"ComponentData",
"ComponentId",
"ComponentMetadata",
"ComponentMetricId",
"ComponentType",
Expand All @@ -98,11 +97,11 @@
"InverterError",
"InverterErrorCode",
"InverterType",
"Lifetime",
"Location",
"Metadata",
"MeterData",
"MicrogridApiClient",
"MicrogridId",
"OperationAborted",
"OperationCancelled",
"OperationNotImplemented",
Expand Down
144 changes: 138 additions & 6 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from __future__ import annotations

import asyncio
import itertools
import logging
from collections.abc import Callable, Iterable, Set
from dataclasses import replace
from typing import Any, TypeVar
from functools import partial
from typing import Any, NotRequired, TypedDict, TypeVar, assert_never

from frequenz.api.common import components_pb2, metrics_pb2
from frequenz.api.microgrid import microgrid_pb2, microgrid_pb2_grpc
from frequenz.api.microgrid import microgrid_pb2, microgrid_pb2_grpc, sensor_pb2
from frequenz.channels import Receiver
from frequenz.client.base import channel, client, retry, streaming
from google.protobuf.empty_pb2 import Empty
Expand All @@ -35,8 +37,10 @@
from ._connection import Connection
from ._constants import RECEIVER_MAX_SIZE
from ._exception import ApiClientError, ClientNotConnected
from ._id import ComponentId, MicrogridId
from ._metadata import Location, Metadata
from ._sensor_proto import sensor_data_samples_from_proto, sensor_from_proto
from .id import ComponentId, MicrogridId, SensorId
from .sensor import Sensor, SensorDataSamples, SensorMetric

DEFAULT_GRPC_CALL_TIMEOUT = 60.0
"""The default timeout for gRPC calls made by this client (in seconds)."""
Expand Down Expand Up @@ -96,6 +100,12 @@ def __init__(
self._broadcasters: dict[
ComponentId, streaming.GrpcStreamBroadcaster[Any, Any]
] = {}
self._sensor_data_broadcasters: dict[
str,
streaming.GrpcStreamBroadcaster[
microgrid_pb2.ComponentData, SensorDataSamples
],
] = {}
self._retry_strategy = retry_strategy

@property
Expand All @@ -117,15 +127,22 @@ async def __aexit__(
exc_tb: Any | None,
) -> bool | None:
"""Close the gRPC channel and stop all broadcasters."""
exceptions = [
exceptions = list(
exc
for exc in await asyncio.gather(
*(broadcaster.stop() for broadcaster in self._broadcasters.values()),
*(
broadcaster.stop()
for broadcaster in itertools.chain(
self._broadcasters.values(),
self._sensor_data_broadcasters.values(),
)
),
return_exceptions=True,
)
if isinstance(exc, BaseException)
]
)
self._broadcasters.clear()
self._sensor_data_broadcasters.clear()

result = None
try:
Expand Down Expand Up @@ -177,6 +194,33 @@ async def components( # noqa: DOC502 (raises ApiClientError indirectly)

return result

async def list_sensors( # noqa: DOC502 (raises ApiClientError indirectly)
self,
) -> Iterable[Sensor]:
"""Fetch all the sensors present in the microgrid.

Returns:
Iterator whose elements are all the sensors in the microgrid.

Raises:
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
component_list = await client.call_stub_method(
self,
lambda: self.stub.ListComponents(
microgrid_pb2.ComponentFilter(
categories=[
components_pb2.ComponentCategory.COMPONENT_CATEGORY_SENSOR
]
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
method_name="ListComponents",
)
return map(sensor_from_proto, component_list.components)

async def metadata(self) -> Metadata:
"""Fetch the microgrid metadata.

Expand Down Expand Up @@ -539,3 +583,91 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
),
method_name="AddInclusionBounds",
)

# noqa: DOC502 (Raises ApiClientError indirectly)
def stream_sensor_data(
self,
sensor: SensorId | Sensor,
metrics: Iterable[SensorMetric | int] | None = None,
*,
buffer_size: int = 50,
) -> Receiver[SensorDataSamples]:
"""Stream data samples from a sensor.

Warning:
Sensors may not support all metrics. If a sensor does not support
a given metric, then the returned data stream will not contain that metric.

There is no way to tell if a metric is not being received because the
sensor does not support it or because there is a transient issue when
retrieving the metric from the sensor.

The supported metrics by a sensor can even change with time, for example,
if a sensor is updated with new firmware.

Args:
sensor: The sensor to stream data from.
metrics: If not `None`, only the specified metrics will be retrieved.
Otherwise all available metrics will be retrieved.
buffer_size: The maximum number of messages to buffer in the returned
receiver. After this limit is reached, the oldest messages will be
dropped.

Returns:
A receiver to retrieve data from the sensor.
"""
sensor_id = _get_sensor_id(sensor)
key = str(sensor_id)

class _ExtraArgs(TypedDict):
metrics: NotRequired[frozenset[sensor_pb2.SensorMetric.ValueType]]

extra_args: _ExtraArgs = {}
if metrics is not None:
extra_args["metrics"] = frozenset(
[_get_sensor_metric_value(m) for m in metrics]
)
# We use the frozenset because iterables are not hashable
key += f"{hash(extra_args['metrics'])}"

broadcaster = self._sensor_data_broadcasters.get(key)
if broadcaster is None:
client_id = hex(id(self))[2:]
stream_name = f"microgrid-client-{client_id}-sensor-data-{key}"
broadcaster = streaming.GrpcStreamBroadcaster(
stream_name,
lambda: aiter(
self.stub.StreamComponentData(
microgrid_pb2.ComponentIdParam(id=sensor_id),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
)
),
partial(sensor_data_samples_from_proto, **extra_args),
retry_strategy=self._retry_strategy,
)
self._sensor_data_broadcasters[key] = broadcaster
return broadcaster.new_receiver(maxsize=buffer_size)


def _get_sensor_id(sensor: SensorId | Sensor) -> int:
"""Get the sensor ID from a sensor or sensor ID."""
match sensor:
case SensorId():
return int(sensor)
case Sensor():
return int(sensor.id)
case unexpected:
assert_never(unexpected)


def _get_sensor_metric_value(
metric: SensorMetric | int,
) -> sensor_pb2.SensorMetric.ValueType:
"""Get the sensor metric ID from a sensor metric or sensor metric ID."""
match metric:
case SensorMetric():
return sensor_pb2.SensorMetric.ValueType(metric.value)
case int():
return sensor_pb2.SensorMetric.ValueType(metric)
case unexpected:
assert_never(unexpected)
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from frequenz.api.common import components_pb2
from frequenz.api.microgrid import grid_pb2, inverter_pb2

from ._id import ComponentId
from .id import ComponentId


class ComponentType(Enum):
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_component_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
EVChargerComponentState,
InverterComponentState,
)
from ._id import ComponentId
from .id import ComponentId


@dataclass(frozen=True)
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/client/microgrid/_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from dataclasses import dataclass

from ._id import ComponentId
from .id import ComponentId


@dataclass(frozen=True)
Expand Down
107 changes: 0 additions & 107 deletions src/frequenz/client/microgrid/_id.py

This file was deleted.

Loading
Loading