Skip to content
Closed
5 changes: 3 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- The client can now list sensor retrieving their metadata (`MicrogridApiClient.list_sensors()`) and can stream sensor data (`MicrogridApiClient.stream_sensor_data()`).

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- When retrieving the microgrid metadata using `metadata()`, if the location was empty in the protobuf message, a wrong location with long=0, lat=0 was used. Now the location will be properly set to `None` in that case.
- The client now does some missing cleanup (stopping background tasks) when disconnecting (and when used as a context manager).
5 changes: 4 additions & 1 deletion src/frequenz/client/microgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
UnknownError,
UnrecognizedGrpcStatus,
)
from ._id import ComponentId, MicrogridId
from ._id import ComponentId, MicrogridId, SensorId
from ._lifetime import Lifetime
from ._metadata import Location, Metadata

__all__ = [
Expand Down Expand Up @@ -98,6 +99,7 @@
"InverterError",
"InverterErrorCode",
"InverterType",
"Lifetime",
"Location",
"Metadata",
"MeterData",
Expand All @@ -112,6 +114,7 @@
"OperationUnauthenticated",
"PermissionDenied",
"ResourceExhausted",
"SensorId",
"ServiceUnavailable",
"UnknownError",
"UnrecognizedGrpcStatus",
Expand Down
164 changes: 161 additions & 3 deletions src/frequenz/client/microgrid/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
from __future__ import annotations

import asyncio
import itertools
import logging
from collections.abc import Callable, Iterable, Set
from dataclasses import replace
from typing import Any, TypeVar
from typing import Any, TypeVar, assert_never

from frequenz.api.common import components_pb2, metrics_pb2
from frequenz.api.microgrid import microgrid_pb2, microgrid_pb2_grpc
from frequenz.api.microgrid import microgrid_pb2, microgrid_pb2_grpc, sensor_pb2
from frequenz.channels import Receiver
from frequenz.client.base import channel, client, retry, streaming
from google.protobuf.empty_pb2 import Empty
from typing_extensions import override

from frequenz.client.microgrid._id import SensorId
from frequenz.client.microgrid.sensor._base import Sensor
from frequenz.client.microgrid.sensor._data import SensorDataSamples, SensorMetric
from frequenz.client.microgrid.sensor._data_proto import sensor_data_samples_from_proto

from ._component import (
Component,
Expand All @@ -36,6 +43,8 @@
from ._exception import ApiClientError, ClientNotConnected
from ._id import ComponentId, MicrogridId
from ._metadata import Location, Metadata
from .sensor._proto import sensor_from_proto
from .sensor._types import SensorTypes

DEFAULT_GRPC_CALL_TIMEOUT = 60.0
"""The default timeout for gRPC calls made by this client (in seconds)."""
Expand Down Expand Up @@ -95,6 +104,12 @@ def __init__(
self._broadcasters: dict[
ComponentId, streaming.GrpcStreamBroadcaster[Any, Any]
] = {}
self._sensor_data_broadcasters: dict[
str,
streaming.GrpcStreamBroadcaster[
microgrid_pb2.ComponentData, SensorDataSamples
],
] = {}
self._retry_strategy = retry_strategy

@property
Expand All @@ -108,6 +123,42 @@ def stub(self) -> microgrid_pb2_grpc.MicrogridAsyncStub:
# type-checker, so it can only be used for type hints.
return self._stub # type: ignore

@override
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any | None,
) -> bool | None:
"""Close the gRPC channel and stop all broadcasters."""
exceptions = list(
exc
for exc in await asyncio.gather(
*(
broadcaster.stop()
for broadcaster in itertools.chain(
self._broadcasters.values(),
self._sensor_data_broadcasters.values(),
)
),
return_exceptions=True,
)
if isinstance(exc, BaseException)
)
self._broadcasters.clear()
self._sensor_data_broadcasters.clear()

result = None
try:
result = await super().__aexit__(exc_type, exc_val, exc_tb)
except Exception as exc: # pylint: disable=broad-except
exceptions.append(exc)
if exceptions:
raise BaseExceptionGroup(
"Error while disconnecting from the microgrid API", exceptions
)
return result

async def components( # noqa: DOC502 (raises ApiClientError indirectly)
self,
) -> Iterable[Component]:
Expand Down Expand Up @@ -147,6 +198,33 @@ async def components( # noqa: DOC502 (raises ApiClientError indirectly)

return result

async def list_sensors( # noqa: DOC502 (raises ApiClientError indirectly)
self,
) -> Iterable[SensorTypes]:
"""Fetch all the sensors present in the microgrid.

Returns:
Iterator whose elements are all the sensors in the microgrid.

Raises:
ApiClientError: If the are any errors communicating with the Microgrid API,
most likely a subclass of
[GrpcError][frequenz.client.microgrid.GrpcError].
"""
component_list = await client.call_stub_method(
self,
lambda: self.stub.ListComponents(
microgrid_pb2.ComponentFilter(
categories=[
components_pb2.ComponentCategory.COMPONENT_CATEGORY_SENSOR
]
),
timeout=int(DEFAULT_GRPC_CALL_TIMEOUT),
),
method_name="ListComponents",
)
return map(sensor_from_proto, component_list.components)

async def metadata(self) -> Metadata:
"""Fetch the microgrid metadata.

Expand All @@ -173,7 +251,7 @@ async def metadata(self) -> Metadata:
return Metadata()

location: Location | None = None
if microgrid_metadata.location:
if microgrid_metadata.HasField("location"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to all our deployments at https://en.wikipedia.org/wiki/Null_Island? 😱

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are fine, now microgrids in Null Island need to provide a explicit (0, 0) location 😆

location = Location(
latitude=microgrid_metadata.location.latitude,
longitude=microgrid_metadata.location.longitude,
Expand Down Expand Up @@ -509,3 +587,83 @@ async def set_bounds( # noqa: DOC503 (raises ApiClientError indirectly)
),
method_name="AddInclusionBounds",
)

# noqa: DOC502 (Raises ApiClientError indirectly)
def stream_sensor_data(
self,
sensor: SensorId | Sensor,
metrics: Iterable[SensorMetric | int],
*,
buffer_size: int = 50,
) -> Receiver[SensorDataSamples]:
"""Stream data samples from a sensor.

At least one metric must be specified. If no metric is specified, then the
stream will raise an error.

Warning:
Sensors may not support all metrics. If a sensor does not support
a given metric, then the returned data stream will not contain that metric.

There is no way to tell if a metric is not being received because the
sensor does not support it or because there is a transient issue when
retrieving the metric from the sensor.

The supported metrics by a sensor can even change with time, for example,
if a sensor is updated with new firmware.

Args:
sensor: The sensor to stream data from.
metrics: List of metrics to return. Only the specified metrics will be
returned.
buffer_size: The maximum number of messages to buffer in the returned
receiver. After this limit is reached, the oldest messages will be
dropped.

Returns:
The data stream from the sensor.
"""
sensor_id = _get_sensor_id(sensor)
metrics_set = frozenset([_get_sensor_metric_value(m) for m in metrics])
key = f"{sensor_id}-{hash(metrics_set)}"
broadcaster = self._sensor_data_broadcasters.get(key)
if broadcaster is None:
client_id = hex(id(self))[2:]
stream_name = f"microgrid-client-{client_id}-sensor-data-{key}"
broadcaster = streaming.GrpcStreamBroadcaster(
stream_name,
lambda: aiter(
self.stub.StreamComponentData(
microgrid_pb2.ComponentIdParam(id=sensor_id),
timeout=DEFAULT_GRPC_CALL_TIMEOUT,
)
),
lambda msg: sensor_data_samples_from_proto(msg, metrics_set),
retry_strategy=self._retry_strategy,
)
self._sensor_data_broadcasters[key] = broadcaster
return broadcaster.new_receiver(maxsize=buffer_size)


def _get_sensor_id(sensor: SensorId | Sensor) -> int:
"""Get the sensor ID from a sensor or sensor ID."""
match sensor:
case SensorId():
return int(sensor)
case Sensor():
return int(sensor.id)
case unexpected:
assert_never(unexpected)


def _get_sensor_metric_value(
metric: SensorMetric | int,
) -> sensor_pb2.SensorMetric.ValueType:
"""Get the sensor metric ID from a sensor metric or sensor metric ID."""
match metric:
case SensorMetric():
return sensor_pb2.SensorMetric.ValueType(metric.value)
case int():
return sensor_pb2.SensorMetric.ValueType(metric)
case unexpected:
assert_never(unexpected)
52 changes: 51 additions & 1 deletion src/frequenz/client/microgrid/_id.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Strongly typed IDs for microgrids and components."""
"""Strongly typed IDs for microgrids, components and sensors."""


from typing import final
Expand Down Expand Up @@ -105,3 +105,53 @@ def __repr__(self) -> str:
def __str__(self) -> str:
"""Return the short string representation of this instance."""
return f"CID{self._id}"


@final
class SensorId:
"""A unique identifier for a microgrid sensor."""

def __init__(self, id_: int, /) -> None:
"""Initialize this instance.

Args:
id_: The numeric unique identifier of the microgrid sensor.

Raises:
ValueError: If the ID is negative.
"""
if id_ < 0:
raise ValueError("Sensor ID can't be negative.")
self._id = id_

def __int__(self) -> int:
"""Return the numeric ID of this instance."""
return self._id

def __eq__(self, other: object) -> bool:
"""Check if this instance is equal to another object."""
# This is not an unidiomatic typecheck, that's an odd name for the check.
# isinstance() returns True for subclasses, which is not what we want here.
# pylint: disable-next=unidiomatic-typecheck
return type(other) is SensorId and self._id == other._id

def __lt__(self, other: object) -> bool:
"""Check if this instance is less than another object."""
# pylint: disable-next=unidiomatic-typecheck
if type(other) is SensorId:
return self._id < other._id
return NotImplemented

def __hash__(self) -> int:
"""Return the hash of this instance."""
# We include the class because we explicitly want to avoid the same ID to give
# the same hash for different classes of IDs
return hash((SensorId, self._id))

def __repr__(self) -> str:
"""Return the string representation of this instance."""
return f"{type(self).__name__}({self._id!r})"

def __str__(self) -> str:
"""Return the short string representation of this instance."""
return f"SID{self._id}"
51 changes: 51 additions & 0 deletions src/frequenz/client/microgrid/_lifetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH

"""Lifetime of a microgrid asset."""


from dataclasses import dataclass
from datetime import datetime, timezone
from functools import cached_property


@dataclass(frozen=True, kw_only=True)
class Lifetime:
"""An active operational period of a microgrid asset.

Warning:
The [`end`][frequenz.client.microgrid.Lifetime.end] timestamp indicates that the
asset has been permanently removed from the system.
"""

start: datetime | None = None
"""The moment when the asset became operationally active.

If `None`, the asset is considered to be active in any past moment previous to the
[`end`][frequenz.client.microgrid.Lifetime.end].
"""

end: datetime | None = None
"""The moment when the asset's operational activity ceased.

If `None`, the asset is considered to be active with no plans to be deactivated.
"""

def __post_init__(self) -> None:
"""Validate this lifetime."""
if self.start is not None and self.end is not None and self.start > self.end:
raise ValueError("Start must be before or equal to end.")

def active_at(self, timestamp: datetime) -> bool:
"""Check whether this lifetime is active at a specific timestamp."""
if self.start is not None and self.start > timestamp:
return False
if self.end is not None:
return self.end >= timestamp
# Both are None, so it is always active
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only self.end is known to be None with the above checks. self.start could be None or earlier than the given timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return True

@cached_property
def active(self) -> bool:
"""Whether this lifetime is currently active."""
return self.active_at(datetime.now(timezone.utc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be a cached_property, because then you can do this with less assumptions. This property is not expected to be in any hot path, so this is too much optimization anyway.

Loading
Loading