diff --git a/src/frequenz/client/microgrid/_client.py b/src/frequenz/client/microgrid/_client.py index fed598ba..a9a51b26 100644 --- a/src/frequenz/client/microgrid/_client.py +++ b/src/frequenz/client/microgrid/_client.py @@ -17,6 +17,7 @@ from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2 from frequenz.api.common.v1.microgrid.components import components_pb2 from frequenz.api.microgrid.v1 import microgrid_pb2, microgrid_pb2_grpc +from frequenz.channels import Receiver from frequenz.client.base import channel, client, conversion, retry, streaming from frequenz.client.common.microgrid.components import ComponentId from google.protobuf.empty_pb2 import Empty @@ -30,6 +31,8 @@ from .component._component_proto import component_from_proto from .component._connection import ComponentConnection from .component._connection_proto import component_connection_from_proto +from .component._data_samples import ComponentDataSamples +from .component._data_samples_proto import component_data_samples_from_proto from .component._types import ComponentTypes from .metrics._bounds import Bounds from .metrics._metric import Metric @@ -84,8 +87,11 @@ def __init__( connect=connect, channel_defaults=channel_defaults, ) - self._broadcasters: dict[ - ComponentId, streaming.GrpcStreamBroadcaster[Any, Any] + self._component_data_broadcasters: dict[ + str, + streaming.GrpcStreamBroadcaster[ + microgrid_pb2.ReceiveComponentDataStreamResponse, ComponentDataSamples + ], ] = {} self._sensor_data_broadcasters: dict[ str, @@ -118,7 +124,7 @@ async def __aexit__( *( broadcaster.stop() for broadcaster in itertools.chain( - self._broadcasters.values(), + self._component_data_broadcasters.values(), self._sensor_data_broadcasters.values(), ) ), @@ -126,7 +132,7 @@ async def __aexit__( ) if isinstance(exc, BaseException) ) - self._broadcasters.clear() + self._component_data_broadcasters.clear() self._sensor_data_broadcasters.clear() result = None @@ -530,6 +536,70 @@ async def add_component_bounds( # noqa: DOC502 (Raises ApiClientError indirectl return None + # noqa: DOC502 (Raises ApiClientError indirectly) + def receive_component_data_samples_stream( + self, + component: ComponentId | Component, + metrics: Iterable[Metric | int], + *, + buffer_size: int = 50, + ) -> Receiver[ComponentDataSamples]: + """Stream data samples from a component. + + At least one metric must be specified. If no metric is specified, then the + stream will raise an error. + + Warning: + Components may not support all metrics. If a component does not support + a given metric, then the returned data stream will not contain that metric. + + There is no way to tell if a metric is not being received because the + component does not support it or because there is a transient issue when + retrieving the metric from the component. + + The supported metrics by a component can even change with time, for example, + if a component is updated with new firmware. + + Args: + component: The component to stream data from. + metrics: List of metrics to return. Only the specified metrics will be + returned. + buffer_size: The maximum number of messages to buffer in the returned + receiver. After this limit is reached, the oldest messages will be + dropped. + + Returns: + The data stream from the component. + """ + component_id = _get_component_id(component) + metrics_set = frozenset([_get_metric_value(m) for m in metrics]) + key = f"{component_id}-{hash(metrics_set)}" + broadcaster = self._component_data_broadcasters.get(key) + if broadcaster is None: + client_id = hex(id(self))[2:] + stream_name = f"microgrid-client-{client_id}-component-data-{key}" + # Alias to avoid too long lines linter errors + # pylint: disable-next=invalid-name + Request = microgrid_pb2.ReceiveComponentDataStreamRequest + broadcaster = streaming.GrpcStreamBroadcaster( + stream_name, + lambda: aiter( + self.stub.ReceiveComponentDataStream( + Request( + component_id=_get_component_id(component), + filter=Request.ComponentDataStreamFilter( + metrics=metrics_set + ), + ), + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ) + ), + lambda msg: component_data_samples_from_proto(msg.data), + retry_strategy=self._retry_strategy, + ) + self._component_data_broadcasters[key] = broadcaster + return broadcaster.new_receiver(maxsize=buffer_size) + class Validity(enum.Enum): """The duration for which a given list of bounds will stay in effect.""" diff --git a/src/frequenz/client/microgrid/component/__init__.py b/src/frequenz/client/microgrid/component/__init__.py index ee5c09ab..3341da06 100644 --- a/src/frequenz/client/microgrid/component/__init__.py +++ b/src/frequenz/client/microgrid/component/__init__.py @@ -18,6 +18,7 @@ from ._connection import ComponentConnection from ._converter import Converter from ._crypto_miner import CryptoMiner +from ._data_samples import ComponentDataSamples from ._electrolyzer import Electrolyzer from ._ev_charger import ( AcEvCharger, @@ -50,6 +51,7 @@ UnspecifiedComponent, ) from ._relay import Relay +from ._state_sample import ComponentErrorCode, ComponentStateCode, ComponentStateSample from ._status import ComponentStatus from ._types import ( ComponentTypes, @@ -69,6 +71,10 @@ "Component", "ComponentCategory", "ComponentConnection", + "ComponentDataSamples", + "ComponentErrorCode", + "ComponentStateCode", + "ComponentStateSample", "ComponentStatus", "ComponentTypes", "Converter", diff --git a/src/frequenz/client/microgrid/component/_data_samples.py b/src/frequenz/client/microgrid/component/_data_samples.py new file mode 100644 index 00000000..c7088a7d --- /dev/null +++ b/src/frequenz/client/microgrid/component/_data_samples.py @@ -0,0 +1,25 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Definition of a component data aggregate.""" + +from dataclasses import dataclass + +from frequenz.client.common.microgrid.components import ComponentId + +from ..metrics._sample import MetricSample +from ._state_sample import ComponentStateSample + + +@dataclass(frozen=True, kw_only=True) +class ComponentDataSamples: + """An aggregate of multiple metrics, states, and errors of a component.""" + + component_id: ComponentId + """The unique identifier of the component.""" + + metric_samples: list[MetricSample] + """The metrics sampled from the component.""" + + states: list[ComponentStateSample] + """The states sampled from the component.""" diff --git a/src/frequenz/client/microgrid/component/_data_samples_proto.py b/src/frequenz/client/microgrid/component/_data_samples_proto.py new file mode 100644 index 00000000..326e98f7 --- /dev/null +++ b/src/frequenz/client/microgrid/component/_data_samples_proto.py @@ -0,0 +1,88 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of ComponentDataSamples objects from protobuf messages.""" + + +import logging +from functools import partial + +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.client.common.microgrid.components import ComponentId + +from ..metrics._sample_proto import metric_sample_from_proto_with_issues +from ._data_samples import ComponentDataSamples +from ._state_sample_proto import component_state_sample_from_proto + +_logger = logging.getLogger(__name__) + + +def component_data_samples_from_proto( + message: components_pb2.ComponentData, +) -> ComponentDataSamples: + """Convert a protobuf component data message to a component data object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `ComponentDataSamples` object. + """ + major_issues: list[str] = [] + minor_issues: list[str] = [] + + samples = component_data_samples_from_proto_with_issues( + message, major_issues=major_issues, minor_issues=minor_issues + ) + + # This approach to logging issues might be too noisy. Samples are received + # very often, and sometimes can remain unchanged for a long time, leading to + # repeated log messages. We might need to adjust the logging strategy + # in the future. + if major_issues: + _logger.warning( + "Found issues in component data samples: %s | Protobuf message:\n%s", + ", ".join(major_issues), + message, + ) + + if minor_issues: + _logger.debug( + "Found minor issues in component data samples: %s | Protobuf message:\n%s", + ", ".join(minor_issues), + message, + ) + + return samples + + +def component_data_samples_from_proto_with_issues( + message: components_pb2.ComponentData, + *, + major_issues: list[str], + minor_issues: list[str], +) -> ComponentDataSamples: + """Convert a protobuf component data message to a component data object collecting issues. + + Args: + message: The protobuf message to convert. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + The resulting `ComponentDataSamples` object. + """ + return ComponentDataSamples( + component_id=ComponentId(message.component_id), + metric_samples=list( + map( + partial( + metric_sample_from_proto_with_issues, + major_issues=major_issues, + minor_issues=minor_issues, + ), + message.metric_samples, + ) + ), + states=list(map(component_state_sample_from_proto, message.states)), + ) diff --git a/src/frequenz/client/microgrid/component/_state_sample.py b/src/frequenz/client/microgrid/component/_state_sample.py new file mode 100644 index 00000000..c69381c1 --- /dev/null +++ b/src/frequenz/client/microgrid/component/_state_sample.py @@ -0,0 +1,255 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Definition of component states.""" + +import enum +from dataclasses import dataclass +from datetime import datetime + +from frequenz.api.common.v1.microgrid.components import components_pb2 + + +@enum.unique +class ComponentStateCode(enum.Enum): + """The various states that a component can be in.""" + + UNSPECIFIED = components_pb2.COMPONENT_STATE_CODE_UNSPECIFIED + """The state is unspecified (this should not be normally used).""" + + UNKNOWN = components_pb2.COMPONENT_STATE_CODE_UNKNOWN + """The component is in an unknown or undefined condition. + + This is used when the state can be retrieved from the component but it doesn't match + any known state. + """ + + UNAVAILABLE = components_pb2.COMPONENT_STATE_CODE_UNAVAILABLE + """The component is temporarily unavailable for operation.""" + + SWITCHING_OFF = components_pb2.COMPONENT_STATE_CODE_SWITCHING_OFF + """The component is in the process of switching off.""" + + OFF = components_pb2.COMPONENT_STATE_CODE_OFF + """The component has successfully switched off.""" + + SWITCHING_ON = components_pb2.COMPONENT_STATE_CODE_SWITCHING_ON + """The component is in the process of switching on.""" + + STANDBY = components_pb2.COMPONENT_STATE_CODE_STANDBY + """The component is in standby mode and not immediately ready for operation.""" + + READY = components_pb2.COMPONENT_STATE_CODE_READY + """The component is fully operational and ready for use.""" + + CHARGING = components_pb2.COMPONENT_STATE_CODE_CHARGING + """The component is actively consuming energy.""" + + DISCHARGING = components_pb2.COMPONENT_STATE_CODE_DISCHARGING + """The component is actively producing or releasing energy.""" + + ERROR = components_pb2.COMPONENT_STATE_CODE_ERROR + """The component is in an error state and may need attention.""" + + EV_CHARGING_CABLE_UNPLUGGED = ( + components_pb2.COMPONENT_STATE_CODE_EV_CHARGING_CABLE_UNPLUGGED + ) + """The EV charging cable is unplugged from the charging station.""" + + EV_CHARGING_CABLE_PLUGGED_AT_STATION = ( + components_pb2.COMPONENT_STATE_CODE_EV_CHARGING_CABLE_PLUGGED_AT_STATION + ) + """The EV charging cable is plugged into the charging station.""" + + EV_CHARGING_CABLE_PLUGGED_AT_EV = ( + components_pb2.COMPONENT_STATE_CODE_EV_CHARGING_CABLE_PLUGGED_AT_EV + ) + """The EV charging cable is plugged into the vehicle.""" + + EV_CHARGING_CABLE_LOCKED_AT_STATION = ( + components_pb2.COMPONENT_STATE_CODE_EV_CHARGING_CABLE_LOCKED_AT_STATION + ) + """The EV charging cable is locked at the charging station end.""" + + EV_CHARGING_CABLE_LOCKED_AT_EV = ( + components_pb2.COMPONENT_STATE_CODE_EV_CHARGING_CABLE_LOCKED_AT_EV + ) + """The EV charging cable is locked at the vehicle end.""" + + RELAY_OPEN = components_pb2.COMPONENT_STATE_CODE_RELAY_OPEN + """The relay is in an open state, meaning no current can flow through.""" + + RELAY_CLOSED = components_pb2.COMPONENT_STATE_CODE_RELAY_CLOSED + """The relay is in a closed state, allowing current to flow.""" + + PRECHARGER_OPEN = components_pb2.COMPONENT_STATE_CODE_PRECHARGER_OPEN + """The precharger circuit is open, meaning it's not currently active.""" + + PRECHARGER_PRECHARGING = components_pb2.COMPONENT_STATE_CODE_PRECHARGER_PRECHARGING + """The precharger is in a precharging state, preparing the main circuit for activation.""" + + PRECHARGER_CLOSED = components_pb2.COMPONENT_STATE_CODE_PRECHARGER_CLOSED + """The precharger circuit is closed, allowing full current to flow to the main circuit.""" + + +@enum.unique +class ComponentErrorCode(enum.Enum): + """The various errors that a component can report.""" + + UNSPECIFIED = components_pb2.COMPONENT_ERROR_CODE_UNSPECIFIED + """The error is unspecified (this should not be normally used).""" + + UNKNOWN = components_pb2.COMPONENT_ERROR_CODE_UNKNOWN + """The component is reporting an unknown or undefined error. + + This is used when the error can be retrieved from the component but it doesn't match + any known error. + """ + + SWITCH_ON_FAULT = components_pb2.COMPONENT_ERROR_CODE_SWITCH_ON_FAULT + """The component could not be switched on.""" + + UNDERVOLTAGE = components_pb2.COMPONENT_ERROR_CODE_UNDERVOLTAGE + """The component is operating under the minimum rated voltage.""" + + OVERVOLTAGE = components_pb2.COMPONENT_ERROR_CODE_OVERVOLTAGE + """The component is operating over the maximum rated voltage.""" + + OVERCURRENT = components_pb2.COMPONENT_ERROR_CODE_OVERCURRENT + """The component is drawing more current than the maximum rated value.""" + + OVERCURRENT_CHARGING = components_pb2.COMPONENT_ERROR_CODE_OVERCURRENT_CHARGING + """The component's consumption current is over the maximum rated value during charging.""" + + OVERCURRENT_DISCHARGING = ( + components_pb2.COMPONENT_ERROR_CODE_OVERCURRENT_DISCHARGING + ) + """The component's production current is over the maximum rated value during discharging.""" + + OVERTEMPERATURE = components_pb2.COMPONENT_ERROR_CODE_OVERTEMPERATURE + """The component is operating over the maximum rated temperature.""" + + UNDERTEMPERATURE = components_pb2.COMPONENT_ERROR_CODE_UNDERTEMPERATURE + """The component is operating under the minimum rated temperature.""" + + HIGH_HUMIDITY = components_pb2.COMPONENT_ERROR_CODE_HIGH_HUMIDITY + """The component is exposed to high humidity levels over the maximum rated value.""" + + FUSE_ERROR = components_pb2.COMPONENT_ERROR_CODE_FUSE_ERROR + """The component's fuse has blown.""" + + PRECHARGE_ERROR = components_pb2.COMPONENT_ERROR_CODE_PRECHARGE_ERROR + """The component's precharge unit has failed.""" + + PLAUSIBILITY_ERROR = components_pb2.COMPONENT_ERROR_CODE_PLAUSIBILITY_ERROR + """Plausibility issues within the system involving this component.""" + + UNDERVOLTAGE_SHUTDOWN = components_pb2.COMPONENT_ERROR_CODE_UNDERVOLTAGE_SHUTDOWN + """System shutdown due to undervoltage involving this component.""" + + EV_UNEXPECTED_PILOT_FAILURE = ( + components_pb2.COMPONENT_ERROR_CODE_EV_UNEXPECTED_PILOT_FAILURE + ) + """Unexpected pilot failure in an electric vehicle component.""" + + FAULT_CURRENT = components_pb2.COMPONENT_ERROR_CODE_FAULT_CURRENT + """Fault current detected in the component.""" + + SHORT_CIRCUIT = components_pb2.COMPONENT_ERROR_CODE_SHORT_CIRCUIT + """Short circuit detected in the component.""" + + CONFIG_ERROR = components_pb2.COMPONENT_ERROR_CODE_CONFIG_ERROR + """Configuration error related to the component.""" + + ILLEGAL_COMPONENT_STATE_CODE_REQUESTED = ( + components_pb2.COMPONENT_ERROR_CODE_ILLEGAL_COMPONENT_STATE_CODE_REQUESTED + ) + """Illegal state requested for the component.""" + + HARDWARE_INACCESSIBLE = components_pb2.COMPONENT_ERROR_CODE_HARDWARE_INACCESSIBLE + """Hardware of the component is inaccessible.""" + + INTERNAL = components_pb2.COMPONENT_ERROR_CODE_INTERNAL + """Internal error within the component.""" + + UNAUTHORIZED = components_pb2.COMPONENT_ERROR_CODE_UNAUTHORIZED + """The component is unauthorized to perform the last requested action.""" + + EV_CHARGING_CABLE_UNPLUGGED_FROM_STATION = ( + components_pb2.COMPONENT_ERROR_CODE_EV_CHARGING_CABLE_UNPLUGGED_FROM_STATION + ) + """EV cable was abruptly unplugged from the charging station.""" + + EV_CHARGING_CABLE_UNPLUGGED_FROM_EV = ( + components_pb2.COMPONENT_ERROR_CODE_EV_CHARGING_CABLE_UNPLUGGED_FROM_EV + ) + """EV cable was abruptly unplugged from the vehicle.""" + + EV_CHARGING_CABLE_LOCK_FAILED = ( + components_pb2.COMPONENT_ERROR_CODE_EV_CHARGING_CABLE_LOCK_FAILED + ) + """EV cable lock failure.""" + + EV_CHARGING_CABLE_INVALID = ( + components_pb2.COMPONENT_ERROR_CODE_EV_CHARGING_CABLE_INVALID + ) + """Invalid EV cable.""" + + EV_CONSUMER_INCOMPATIBLE = ( + components_pb2.COMPONENT_ERROR_CODE_EV_CONSUMER_INCOMPATIBLE + ) + """Incompatible EV plug.""" + + BATTERY_IMBALANCE = components_pb2.COMPONENT_ERROR_CODE_BATTERY_IMBALANCE + """Battery system imbalance.""" + + BATTERY_LOW_SOH = components_pb2.COMPONENT_ERROR_CODE_BATTERY_LOW_SOH + """Low state of health (SOH) detected in the battery.""" + + BATTERY_BLOCK_ERROR = components_pb2.COMPONENT_ERROR_CODE_BATTERY_BLOCK_ERROR + """Battery block error.""" + + BATTERY_CONTROLLER_ERROR = ( + components_pb2.COMPONENT_ERROR_CODE_BATTERY_CONTROLLER_ERROR + ) + """Battery controller error.""" + + BATTERY_RELAY_ERROR = components_pb2.COMPONENT_ERROR_CODE_BATTERY_RELAY_ERROR + """Battery relay error.""" + + BATTERY_CALIBRATION_NEEDED = ( + components_pb2.COMPONENT_ERROR_CODE_BATTERY_CALIBRATION_NEEDED + ) + """Battery calibration is needed.""" + + RELAY_CYCLE_LIMIT_REACHED = ( + components_pb2.COMPONENT_ERROR_CODE_RELAY_CYCLE_LIMIT_REACHED + ) + """Relays have been cycled for the maximum number of times.""" + + +@dataclass(frozen=True, kw_only=True) +class ComponentStateSample: + """A collection of the state, warnings, and errors for a component at a specific time.""" + + sampled_at: datetime + """The time at which this state was sampled.""" + + states: frozenset[ComponentStateCode | int] + """The set of states of the component. + + If the reported state is not known by the client (it could happen when using an + older version of the client with a newer version of the server), it will be + represented as an `int` and **not** the + [`ComponentStateCode.UNKNOWN`][frequenz.client.microgrid.component.ComponentStateCode.UNKNOWN] + value (this value is used only when the state is not known by the server). + """ + + warnings: frozenset[ComponentErrorCode | int] + """The set of warnings for the component.""" + + errors: frozenset[ComponentErrorCode | int] + """The set of errors for the component. + + This set will only contain errors if the component is in an error state. + """ diff --git a/src/frequenz/client/microgrid/component/_state_sample_proto.py b/src/frequenz/client/microgrid/component/_state_sample_proto.py new file mode 100644 index 00000000..bfdf122d --- /dev/null +++ b/src/frequenz/client/microgrid/component/_state_sample_proto.py @@ -0,0 +1,34 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Loading of MetricSample and AggregatedMetricValue objects from protobuf messages.""" + +from functools import partial + +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.client.base import conversion + +from .._util import enum_from_proto +from ._state_sample import ComponentErrorCode, ComponentStateCode, ComponentStateSample + +_state_from_proto = partial(enum_from_proto, enum_type=ComponentStateCode) +_error_from_proto = partial(enum_from_proto, enum_type=ComponentErrorCode) + + +def component_state_sample_from_proto( + message: components_pb2.ComponentState, +) -> ComponentStateSample: + """Convert a protobuf message to a `ComponentStateSample` object. + + Args: + message: The protobuf message to convert. + + Returns: + The resulting `ComponentStateSample` object. + """ + return ComponentStateSample( + sampled_at=conversion.to_datetime(message.sampled_at), + states=frozenset(map(_state_from_proto, message.states)), + warnings=frozenset(map(_error_from_proto, message.warnings)), + errors=frozenset(map(_error_from_proto, message.errors)), + ) diff --git a/src/frequenz/client/microgrid/metrics/__init__.py b/src/frequenz/client/microgrid/metrics/__init__.py index 85c80fff..e099f4d1 100644 --- a/src/frequenz/client/microgrid/metrics/__init__.py +++ b/src/frequenz/client/microgrid/metrics/__init__.py @@ -5,11 +5,12 @@ from ._bounds import Bounds from ._metric import Metric -from ._sample import AggregatedMetricValue, AggregationMethod +from ._sample import AggregatedMetricValue, AggregationMethod, MetricSample __all__ = [ "AggregatedMetricValue", "AggregationMethod", "Bounds", "Metric", + "MetricSample", ] diff --git a/src/frequenz/client/microgrid/metrics/_sample.py b/src/frequenz/client/microgrid/metrics/_sample.py index 02dc7185..d4ac9d60 100644 --- a/src/frequenz/client/microgrid/metrics/_sample.py +++ b/src/frequenz/client/microgrid/metrics/_sample.py @@ -6,6 +6,11 @@ import enum from collections.abc import Sequence from dataclasses import dataclass +from datetime import datetime +from typing import assert_never + +from ._bounds import Bounds +from ._metric import Metric @enum.unique @@ -59,3 +64,111 @@ def __str__(self) -> str: extra.append(f"num_raw:{len(self.raw_values)}") extra_str = f"<{' '.join(extra)}>" if extra else "" return f"avg:{self.avg}{extra_str}" + + +@dataclass(frozen=True, kw_only=True) +class MetricSample: + """A sampled metric. + + This represents a single sample of a specific metric, the value of which is either + measured at a particular time. The real-time system-defined bounds are optional and + may not always be present or set. + + Note: Relationship Between Bounds and Metric Samples + Suppose a metric sample for active power has a lower-bound of -10,000 W, and an + upper-bound of 10,000 W. For the system to accept a charge command, clients need + to request current values within the bounds. + """ + + sampled_at: datetime + """The moment when the metric was sampled.""" + + metric: Metric | int + """The metric that was sampled.""" + + # In the protocol this is float | AggregatedMetricValue, but for live data we can't + # receive the AggregatedMetricValue, so we limit this to float for now. + value: float | AggregatedMetricValue | None + """The value of the sampled metric.""" + + bounds: list[Bounds] + """The bounds that apply to the metric sample. + + These bounds adapt in real-time to reflect the operating conditions at the time of + aggregation or derivation. + + In the case of certain components like batteries, multiple bounds might exist. These + multiple bounds collectively extend the range of allowable values, effectively + forming a union of all given bounds. In such cases, the value of the metric must be + within at least one of the bounds. + + In accordance with the passive sign convention, bounds that limit discharge would + have negative numbers, while those limiting charge, such as for the State of Power + (SoP) metric, would be positive. Hence bounds can have positive and negative values + depending on the metric they represent. + + Example: + The diagram below illustrates the relationship between the bounds. + + ``` + bound[0].lower bound[1].upper + <-------|============|------------------|============|---------> + bound[0].upper bound[1].lower + + ---- values here are disallowed and will be rejected + ==== values here are allowed and will be accepted + ``` + """ + + connection: str | None = None + """The electrical connection within the component from which the metric was sampled. + + This will be present when the same `Metric` can be obtained from multiple + electrical connections within the component. Knowing the connection can help in + certain control and monitoring applications. + + In cases where the component has just one connection for a metric, then the + connection is `None`. + + Example: + A hybrid inverter can have a DC string for a battery and another DC string for a + PV array. The connection names could resemble, say, `dc_battery_0` and + ``dc_pv_0`. A metric like DC voltage can be obtained from both connections. For + an application to determine the SoC of the battery using the battery voltage, + which connection the voltage metric was sampled from is important. + """ + + def as_single_value( + self, *, aggregation_method: AggregationMethod = AggregationMethod.AVG + ) -> float | None: + """Return the value of this sample as a single value. + + if [`value`][frequenz.client.microgrid.metrics.MetricSample.value] is a `float`, + it is returned as is. If `value` is an + [`AggregatedMetricValue`][frequenz.client.microgrid.metrics.AggregatedMetricValue], + the value is aggregated using the provided `aggregation_method`. + + Args: + aggregation_method: The method to use to aggregate the value when `value` is + a `AggregatedMetricValue`. + + Returns: + The value of the sample as a single value, or `None` if the value is `None`. + """ + match self.value: + case float() | int(): + return self.value + case AggregatedMetricValue(): + match aggregation_method: + case AggregationMethod.AVG: + return self.value.avg + case AggregationMethod.MIN: + return self.value.min + case AggregationMethod.MAX: + return self.value.max + case unexpected: + assert_never(unexpected) + case None: + return None + case unexpected: + assert_never(unexpected) diff --git a/src/frequenz/client/microgrid/metrics/_sample_proto.py b/src/frequenz/client/microgrid/metrics/_sample_proto.py index cfd2a038..314555c5 100644 --- a/src/frequenz/client/microgrid/metrics/_sample_proto.py +++ b/src/frequenz/client/microgrid/metrics/_sample_proto.py @@ -3,9 +3,16 @@ """Loading of MetricSample and AggregatedMetricValue objects from protobuf messages.""" -from frequenz.api.common.v1.metrics import metric_sample_pb2 +from collections.abc import Sequence -from ._sample import AggregatedMetricValue +from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2 +from frequenz.client.base import conversion + +from .._util import enum_from_proto +from ._bounds import Bounds +from ._bounds_proto import bounds_from_proto +from ._metric import Metric +from ._sample import AggregatedMetricValue, MetricSample def aggregated_metric_sample_from_proto( @@ -25,3 +32,78 @@ def aggregated_metric_sample_from_proto( max=message.max_value if message.HasField("max_value") else None, raw_values=message.raw_values, ) + + +def metric_sample_from_proto_with_issues( + message: metric_sample_pb2.MetricSample, + *, + major_issues: list[str], + minor_issues: list[str], +) -> MetricSample: + """Convert a protobuf message to a `MetricSample` object. + + Args: + message: The protobuf message to convert. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + The resulting `MetricSample` object. + """ + value: float | AggregatedMetricValue | None = None + if message.HasField("value"): + match message.value.WhichOneof("metric_value_variant"): + case "simple_metric": + value = message.value.simple_metric.value + case "aggregated_metric": + value = aggregated_metric_sample_from_proto( + message.value.aggregated_metric + ) + + metric = enum_from_proto(message.metric, Metric) + + return MetricSample( + sampled_at=conversion.to_datetime(message.sampled_at), + metric=metric, + value=value, + bounds=_metric_bounds_from_proto( + metric, + message.bounds, + major_issues=major_issues, + minor_issues=minor_issues, + ), + connection=message.source or None, + ) + + +def _metric_bounds_from_proto( + metric: Metric | int, + messages: Sequence[bounds_pb2.Bounds], + *, + major_issues: list[str], + minor_issues: list[str], # pylint:disable=unused-argument +) -> list[Bounds]: + """Convert a sequence of bounds messages to a list of `Bounds`. + + Args: + metric: The metric for which the bounds are defined, used for logging issues. + messages: The sequence of bounds messages. + major_issues: A list to append major issues to. + minor_issues: A list to append minor issues to. + + Returns: + The resulting list of `Bounds`. + """ + bounds: list[Bounds] = [] + for pb_bound in messages: + try: + bound = bounds_from_proto(pb_bound) + except ValueError as exc: + metric_name = metric if isinstance(metric, int) else metric.name + major_issues.append( + f"bounds for {metric_name} is invalid ({exc}), ignoring these bounds" + ) + continue + bounds.append(bound) + + return bounds diff --git a/tests/client_test_cases/receive_component_data_samples_stream/empty_case.py b/tests/client_test_cases/receive_component_data_samples_stream/empty_case.py new file mode 100644 index 00000000..e5341ca8 --- /dev/null +++ b/tests/client_test_cases/receive_component_data_samples_stream/empty_case.py @@ -0,0 +1,48 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for empty component data stream.""" + +from typing import Any + +import pytest +from frequenz.api.common.v1.metrics import metric_sample_pb2 +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.api.microgrid.v1 import microgrid_pb2 +from frequenz.channels import Receiver, ReceiverStoppedError +from frequenz.client.common.microgrid.components import ComponentId + +from frequenz.client.microgrid.component import ComponentDataSamples + +client_args = (ComponentId(1), [metric_sample_pb2.Metric.METRIC_AC_CURRENT]) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + microgrid_pb2.ReceiveComponentDataStreamRequest( + component_id=1, + filter=microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter( + metrics=[metric_sample_pb2.Metric.METRIC_AC_CURRENT] + ), + ), + timeout=60.0, + ) + + +# The mock response from the server +grpc_response = microgrid_pb2.ReceiveComponentDataStreamResponse( + data=components_pb2.ComponentData(component_id=1, metric_samples=[], states=[]), +) + + +# The expected result from the client method +async def assert_client_result(receiver: Receiver[Any]) -> None: + """Assert that the client result matches the expected empty data.""" + result = await receiver.receive() + assert result == ComponentDataSamples( + component_id=ComponentId(1), metric_samples=[], states=[] + ) + + with pytest.raises(ReceiverStoppedError): + await receiver.receive() diff --git a/tests/client_test_cases/receive_component_data_samples_stream/error_case.py b/tests/client_test_cases/receive_component_data_samples_stream/error_case.py new file mode 100644 index 00000000..d0a7e44c --- /dev/null +++ b/tests/client_test_cases/receive_component_data_samples_stream/error_case.py @@ -0,0 +1,85 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for component data stream with error.""" + +import enum +from collections.abc import AsyncIterator +from typing import Any + +import pytest +from frequenz.api.common.v1.metrics import metric_sample_pb2 +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.api.microgrid.v1 import microgrid_pb2 +from frequenz.channels import Receiver, ReceiverStoppedError +from frequenz.client.common.microgrid.components import ComponentId +from grpc import StatusCode + +from frequenz.client.microgrid.component import ComponentDataSamples +from tests.util import make_grpc_error + +client_args = (ComponentId(1), [metric_sample_pb2.Metric.METRIC_DC_VOLTAGE]) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + microgrid_pb2.ReceiveComponentDataStreamRequest( + component_id=1, + filter=microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter( + metrics=[metric_sample_pb2.Metric.METRIC_DC_VOLTAGE] + ), + ), + timeout=60.0, + ) + + +@enum.unique +class _State(enum.Enum): + """State of the gRPC response simulation.""" + + INITIAL = "initial" + ERROR = "error" + RECEIVING = "receiving" + + +_iterations = 0 +_state: _State = _State.INITIAL + + +async def grpc_response() -> AsyncIterator[Any]: + """Simulate a gRPC response with an error on the first iteration.""" + global _iterations, _state # pylint: disable=global-statement + + _iterations += 1 + if _iterations == 1: + _state = _State.ERROR + raise make_grpc_error(StatusCode.UNAVAILABLE) + + _state = _State.RECEIVING + for _ in range(3): + yield microgrid_pb2.ReceiveComponentDataStreamResponse( + data=components_pb2.ComponentData( + component_id=1, metric_samples=[], states=[] + ), + ) + + +# The expected result from the client method (exception in this case) +async def assert_client_result(receiver: Receiver[Any]) -> None: + """Assert that the client can keep receiving data after an error.""" + assert _state is _State.ERROR + + async for result in receiver: + assert result == ComponentDataSamples( + component_id=ComponentId(1), metric_samples=[], states=[] + ) + # We need the type ignore here because mypy doesn't realize _state is + # global and updated from outside this function, so it wrongly narrows + # its type to `Literal[_State.ERROR]`, and complaining about the + # impossibility of overlapping with _STATE.RECEIVING. + # https://github.com/python/mypy/issues/19283 + assert _state is _State.RECEIVING # type: ignore[comparison-overlap] + + with pytest.raises(ReceiverStoppedError): + await receiver.receive() diff --git a/tests/client_test_cases/receive_component_data_samples_stream/many_case.py b/tests/client_test_cases/receive_component_data_samples_stream/many_case.py new file mode 100644 index 00000000..f5eae1b7 --- /dev/null +++ b/tests/client_test_cases/receive_component_data_samples_stream/many_case.py @@ -0,0 +1,163 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful component data stream.""" + +from datetime import datetime, timezone +from typing import Any + +import pytest +from frequenz.api.common.v1.metrics import metric_sample_pb2 +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.api.microgrid.v1 import microgrid_pb2 +from frequenz.channels import Receiver, ReceiverStoppedError +from frequenz.client.base.conversion import to_timestamp +from frequenz.client.common.microgrid.components import ComponentId + +from frequenz.client.microgrid.component import ComponentDataSamples +from frequenz.client.microgrid.metrics import Metric, MetricSample +from frequenz.client.microgrid.metrics._sample import AggregatedMetricValue + +client_args = ( + ComponentId(1), + [ + metric_sample_pb2.Metric.METRIC_DC_VOLTAGE, + metric_sample_pb2.Metric.METRIC_DC_CURRENT, + ], +) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + microgrid_pb2.ReceiveComponentDataStreamRequest( + component_id=1, + filter=microgrid_pb2.ReceiveComponentDataStreamRequest.ComponentDataStreamFilter( + metrics=[ + metric_sample_pb2.Metric.METRIC_DC_VOLTAGE, + metric_sample_pb2.Metric.METRIC_DC_CURRENT, + ] + ), + ), + timeout=60.0, + ) + + +timestamp = datetime(2023, 10, 1, 12, 0, 0, tzinfo=timezone.utc) +timestamp_proto = to_timestamp(timestamp) +grpc_response = [ + microgrid_pb2.ReceiveComponentDataStreamResponse( + data=components_pb2.ComponentData( + component_id=1, + metric_samples=[ + metric_sample_pb2.MetricSample( + metric=metric_sample_pb2.Metric.METRIC_DC_VOLTAGE, + sampled_at=timestamp_proto, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=230.5), + ), + ), + metric_sample_pb2.MetricSample( + metric=metric_sample_pb2.Metric.METRIC_DC_CURRENT, + sampled_at=timestamp_proto, + value=metric_sample_pb2.MetricValueVariant( + aggregated_metric=metric_sample_pb2.AggregatedMetricValue( + min_value=10.0, + max_value=10.5, + avg_value=10.2, + raw_values=[10.0, 10.1, 10.2, 10.3, 10.4, 10.5], + ), + ), + ), + ], + states=[], + ), + ), + microgrid_pb2.ReceiveComponentDataStreamResponse( + data=components_pb2.ComponentData( + component_id=1, + metric_samples=[ + metric_sample_pb2.MetricSample( + metric=metric_sample_pb2.Metric.METRIC_DC_VOLTAGE, + sampled_at=timestamp_proto, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=231.5), + ), + ), + metric_sample_pb2.MetricSample( + metric=metric_sample_pb2.Metric.METRIC_DC_CURRENT, + sampled_at=timestamp_proto, + value=metric_sample_pb2.MetricValueVariant( + aggregated_metric=metric_sample_pb2.AggregatedMetricValue( + min_value=12.0, + max_value=12.5, + avg_value=12.2, + raw_values=[12.0, 12.1, 12.2, 12.3, 12.4, 12.5], + ), + ), + ), + ], + states=[], + ), + ), +] + + +async def assert_client_result(receiver: Receiver[Any]) -> None: + """Assert that the client result matches the expected ComponentDataSamples.""" + result = await receiver.receive() + assert result == ComponentDataSamples( + component_id=ComponentId(1), + metric_samples=[ + MetricSample( + metric=Metric.DC_VOLTAGE, + sampled_at=timestamp, + value=pytest.approx(230.5), # type: ignore[arg-type] + bounds=[], + ), + MetricSample( + metric=Metric.DC_CURRENT, + sampled_at=timestamp, + value=AggregatedMetricValue( + min=pytest.approx(10.0), # type: ignore[arg-type] + max=pytest.approx(10.5), # type: ignore[arg-type] + avg=pytest.approx(10.2), # type: ignore[arg-type] + raw_values=pytest.approx( # type: ignore[arg-type] + [10.0, 10.1, 10.2, 10.3, 10.4, 10.5] + ), + ), + bounds=[], + ), + ], + states=[], + ) + + result = await receiver.receive() + assert result == ComponentDataSamples( + component_id=ComponentId(1), + metric_samples=[ + MetricSample( + metric=Metric.DC_VOLTAGE, + sampled_at=timestamp, + value=pytest.approx(231.5), # type: ignore[arg-type] + bounds=[], + ), + MetricSample( + metric=Metric.DC_CURRENT, + sampled_at=timestamp, + value=AggregatedMetricValue( + min=pytest.approx(12.0), # type: ignore[arg-type] + max=pytest.approx(12.5), # type: ignore[arg-type] + avg=pytest.approx(12.2), # type: ignore[arg-type] + raw_values=pytest.approx( # type: ignore[arg-type] + [12.0, 12.1, 12.2, 12.3, 12.4, 12.5] + ), + ), + bounds=[], + ), + ], + states=[], + ) + + with pytest.raises(ReceiverStoppedError): + await receiver.receive() diff --git a/tests/component/test_data_samples.py b/tests/component/test_data_samples.py new file mode 100644 index 00000000..052b89c9 --- /dev/null +++ b/tests/component/test_data_samples.py @@ -0,0 +1,383 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests for the ComponentDataSamples class and proto conversion.""" + +from dataclasses import dataclass, field +from datetime import datetime, timezone + +import pytest +from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2 +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.client.common.microgrid.components import ComponentId +from google.protobuf.timestamp_pb2 import Timestamp + +from frequenz.client.microgrid.component import ( + ComponentDataSamples, + ComponentErrorCode, + ComponentStateCode, + ComponentStateSample, +) +from frequenz.client.microgrid.component._data_samples_proto import ( + component_data_samples_from_proto_with_issues, +) +from frequenz.client.microgrid.metrics import ( + AggregatedMetricValue, + Bounds, + Metric, + MetricSample, +) + +DATETIME = datetime(2025, 3, 1, 12, 0, 0, tzinfo=timezone.utc) +TIMESTAMP = Timestamp(seconds=int(DATETIME.timestamp())) + + +@pytest.fixture +def component_id() -> ComponentId: + """Provide a test component ID.""" + return ComponentId(42) + + +@pytest.fixture +def timestamp() -> datetime: + """Provide a fixed timestamp for testing.""" + return DATETIME + + +@pytest.fixture +def metric_sample(timestamp: datetime) -> MetricSample: + """Provide a test metric sample.""" + return MetricSample( + metric=Metric.AC_ACTIVE_POWER, + value=100.0, + bounds=[], + sampled_at=timestamp, + ) + + +@pytest.fixture +def state_sample(timestamp: datetime) -> ComponentStateSample: + """Provide a test component state sample.""" + return ComponentStateSample( + sampled_at=timestamp, + states=frozenset([ComponentStateCode.READY]), + warnings=frozenset(), + errors=frozenset(), + ) + + +def test_init( + component_id: ComponentId, + metric_sample: MetricSample, + state_sample: ComponentStateSample, +) -> None: + """Test initialization of ComponentDataSamples.""" + data_samples = ComponentDataSamples( + component_id=component_id, + metric_samples=[metric_sample], + states=[state_sample], + ) + + assert data_samples.component_id == component_id + assert len(data_samples.metric_samples) == 1 + assert data_samples.metric_samples[0] == metric_sample + assert len(data_samples.states) == 1 + assert data_samples.states[0] == state_sample + + +def test_equality( + component_id: ComponentId, + metric_sample: MetricSample, + state_sample: ComponentStateSample, +) -> None: + """Test equality of ComponentDataSamples instances.""" + data_samples1 = ComponentDataSamples( + component_id=component_id, + metric_samples=[metric_sample], + states=[state_sample], + ) + + data_samples2 = ComponentDataSamples( + component_id=component_id, + metric_samples=[metric_sample], + states=[state_sample], + ) + + different_id = ComponentDataSamples( + component_id=ComponentId(99), + metric_samples=[metric_sample], + states=[state_sample], + ) + + different_metrics = ComponentDataSamples( + component_id=component_id, + metric_samples=[], + states=[state_sample], + ) + + different_states = ComponentDataSamples( + component_id=component_id, + metric_samples=[metric_sample], + states=[], + ) + + assert data_samples1 == data_samples2 + assert data_samples1 != different_id + assert data_samples1 != different_metrics + assert data_samples1 != different_states + + +@dataclass(frozen=True, kw_only=True) +class _ComponentDataSamplesConversionTestCase: + """Test case for ComponentDataSamples protobuf conversion.""" + + name: str + """The description of the test case.""" + + message: components_pb2.ComponentData + """The input protobuf message.""" + + expected_samples: ComponentDataSamples + """The expected ComponentDataSamples object.""" + + expected_major_issues: list[str] = field(default_factory=list) + """Expected major issues during conversion.""" + + expected_minor_issues: list[str] = field(default_factory=list) + """Expected minor issues during conversion.""" + + +@pytest.mark.parametrize( + "case", + [ + _ComponentDataSamplesConversionTestCase( + name="empty", + message=components_pb2.ComponentData(component_id=1), + expected_samples=ComponentDataSamples( + component_id=ComponentId(1), metric_samples=[], states=[] + ), + ), + _ComponentDataSamplesConversionTestCase( + name="metrics_only_valid", + message=components_pb2.ComponentData( + component_id=2, + metric_samples=[ + metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue( + value=100.0 + ) + ), + ) + ], + ), + expected_samples=ComponentDataSamples( + component_id=ComponentId(2), + metric_samples=[ + MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=100.0, + bounds=[], + ) + ], + states=[], + ), + ), + _ComponentDataSamplesConversionTestCase( + name="states_only_valid", + message=components_pb2.ComponentData( + component_id=3, + states=[ + components_pb2.ComponentState( + sampled_at=TIMESTAMP, + states=[components_pb2.COMPONENT_STATE_CODE_READY], + ) + ], + ), + expected_samples=ComponentDataSamples( + component_id=ComponentId(3), + metric_samples=[], + states=[ + ComponentStateSample( + sampled_at=DATETIME, + states=frozenset([ComponentStateCode.READY]), + warnings=frozenset(), + errors=frozenset(), + ) + ], + ), + ), + _ComponentDataSamplesConversionTestCase( + name="metric_with_invalid_bounds", + message=components_pb2.ComponentData( + component_id=4, + metric_samples=[ + metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.DC_CURRENT.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue( + value=50.0 + ) + ), + bounds=[bounds_pb2.Bounds(lower=10.0, upper=5.0)], # Invalid + ) + ], + ), + expected_samples=ComponentDataSamples( + component_id=ComponentId(4), + metric_samples=[ + MetricSample( + sampled_at=DATETIME, + metric=Metric.DC_CURRENT, + value=50.0, + bounds=[], # Invalid bounds are ignored + ) + ], + states=[], + ), + expected_major_issues=[ + "bounds for DC_CURRENT is invalid (Lower bound (10.0) must be " + "less than or equal to upper bound (5.0)), ignoring these bounds" + ], + ), + _ComponentDataSamplesConversionTestCase( + name="metric_with_valid_bounds_and_source", + message=components_pb2.ComponentData( + component_id=5, + metric_samples=[ + metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_FREQUENCY.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue( + value=50.0 + ) + ), + bounds=[bounds_pb2.Bounds(lower=49.0, upper=51.0)], + source="sensor_A", + ) + ], + ), + expected_samples=ComponentDataSamples( + component_id=ComponentId(5), + metric_samples=[ + MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_FREQUENCY, + value=50.0, + bounds=[Bounds(lower=49.0, upper=51.0)], + connection="sensor_A", + ) + ], + states=[], + ), + ), + _ComponentDataSamplesConversionTestCase( + name="full_example_with_issues", + message=components_pb2.ComponentData( + component_id=6, + metric_samples=[ + metric_sample_pb2.MetricSample( # Simple metric + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue( + value=150.0 + ) + ), + ), + metric_sample_pb2.MetricSample( # Aggregated metric + sampled_at=TIMESTAMP, + metric=Metric.AC_REACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + aggregated_metric=metric_sample_pb2.AggregatedMetricValue( + avg_value=75.0, + min_value=70.0, + max_value=80.0, + raw_values=[70.0, 75.0, 80.0], + ) + ), + ), + metric_sample_pb2.MetricSample( # Metric with invalid bounds + sampled_at=TIMESTAMP, + metric=Metric.AC_VOLTAGE.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue( + value=230.0 + ) + ), + bounds=[bounds_pb2.Bounds(lower=250.0, upper=220.0)], # Invalid + ), + ], + states=[ + components_pb2.ComponentState( + sampled_at=TIMESTAMP, + states=[components_pb2.COMPONENT_STATE_CODE_READY], + warnings=[ + components_pb2.COMPONENT_ERROR_CODE_HARDWARE_INACCESSIBLE + ], + errors=[components_pb2.COMPONENT_ERROR_CODE_OVERCURRENT], + ) + ], + ), + expected_samples=ComponentDataSamples( + component_id=ComponentId(6), + metric_samples=[ + MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=150.0, + bounds=[], + ), + MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_REACTIVE_POWER, + value=AggregatedMetricValue( + avg=75.0, min=70.0, max=80.0, raw_values=[70.0, 75.0, 80.0] + ), + bounds=[], + ), + MetricSample( # Metric with invalid bounds is parsed, bounds ignored + sampled_at=DATETIME, + metric=Metric.AC_VOLTAGE, + value=230.0, + bounds=[], + ), + ], + states=[ + ComponentStateSample( + sampled_at=DATETIME, + states=frozenset([ComponentStateCode.READY]), + warnings=frozenset([ComponentErrorCode.HARDWARE_INACCESSIBLE]), + errors=frozenset([ComponentErrorCode.OVERCURRENT]), + ) + ], + ), + expected_major_issues=[ + "bounds for AC_VOLTAGE is invalid (Lower bound (250.0) must be less " + "than or equal to upper bound (220.0)), ignoring these bounds" + ], + ), + ], + ids=lambda c: c.name, +) +def test_from_proto( + case: _ComponentDataSamplesConversionTestCase, +) -> None: + """Test conversion from proto message to ComponentDataSamples, checking issues.""" + major_issues: list[str] = [] + minor_issues: list[str] = [] + + result = component_data_samples_from_proto_with_issues( + case.message, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + assert result == case.expected_samples + assert major_issues == case.expected_major_issues + assert minor_issues == case.expected_minor_issues diff --git a/tests/component/test_state_sample.py b/tests/component/test_state_sample.py new file mode 100644 index 00000000..4152f00e --- /dev/null +++ b/tests/component/test_state_sample.py @@ -0,0 +1,187 @@ +# License: MIT +# Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +"""Tests for the ComponentStateSample class and proto conversion.""" + +from dataclasses import dataclass +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest +from frequenz.api.common.v1.microgrid.components import components_pb2 +from frequenz.client.base.conversion import to_timestamp + +from frequenz.client.microgrid.component import ( + ComponentErrorCode, + ComponentStateCode, + ComponentStateSample, +) +from frequenz.client.microgrid.component._state_sample_proto import ( + component_state_sample_from_proto, +) + + +@pytest.fixture +def timestamp() -> datetime: + """Provide a fixed timestamp for testing.""" + return datetime(2025, 3, 1, 12, 0, 0, tzinfo=timezone.utc) + + +def test_init(timestamp: datetime) -> None: + """Test initialization of ComponentStateSample.""" + states = frozenset([ComponentStateCode.READY]) + warnings = frozenset([ComponentErrorCode.HARDWARE_INACCESSIBLE]) + errors: frozenset[ComponentErrorCode] = frozenset() + + state_sample = ComponentStateSample( + sampled_at=timestamp, + states=states, + warnings=warnings, + errors=errors, + ) + + assert state_sample.sampled_at == timestamp + assert state_sample.states == states + assert state_sample.warnings == warnings + assert state_sample.errors == errors + + +def test_equality(timestamp: datetime) -> None: + """Test equality of ComponentStateSample instances.""" + states1 = frozenset([ComponentStateCode.READY]) + warnings1 = frozenset([ComponentErrorCode.HARDWARE_INACCESSIBLE]) + errors1: frozenset[ComponentErrorCode] = frozenset() + + state_sample1 = ComponentStateSample( + sampled_at=timestamp, + states=states1, + warnings=warnings1, + errors=errors1, + ) + + state_sample2 = ComponentStateSample( + sampled_at=timestamp, + states=states1, + warnings=warnings1, + errors=errors1, + ) + + different_timestamp = ComponentStateSample( + sampled_at=datetime(2025, 3, 1, 13, 0, 0, tzinfo=timezone.utc), + states=states1, + warnings=warnings1, + errors=errors1, + ) + + different_states = ComponentStateSample( + sampled_at=timestamp, + states=frozenset([ComponentStateCode.ERROR]), + warnings=warnings1, + errors=errors1, + ) + + different_warnings = ComponentStateSample( + sampled_at=timestamp, + states=states1, + warnings=frozenset(), + errors=errors1, + ) + + different_errors = ComponentStateSample( + sampled_at=timestamp, + states=frozenset([ComponentStateCode.ERROR]), + warnings=warnings1, + errors=frozenset([ComponentErrorCode.OVERCURRENT]), + ) + + assert state_sample1 == state_sample2 + assert state_sample1 != different_timestamp + assert state_sample1 != different_states + assert state_sample1 != different_warnings + assert state_sample1 != different_errors + + +@dataclass(frozen=True, kw_only=True) +class ProtoConversionCase: + """Test case for proto conversion tests.""" + + name: str + states: list[ComponentStateCode | int] + warnings: list[ComponentErrorCode | int] + errors: list[ComponentErrorCode | int] + + +@pytest.mark.parametrize( + "case", + [ + ProtoConversionCase( + name="full", + states=[ComponentStateCode.ERROR], + warnings=[ComponentErrorCode.HARDWARE_INACCESSIBLE], + errors=[ComponentErrorCode.OVERCURRENT], + ), + ProtoConversionCase( + name="empty", + states=[], + warnings=[], + errors=[], + ), + ProtoConversionCase( + name="only_states", + states=[ComponentStateCode.READY, ComponentStateCode.STANDBY], + warnings=[], + errors=[], + ), + ProtoConversionCase( + name="only_warnings", + states=[], + warnings=[ComponentErrorCode.HARDWARE_INACCESSIBLE], + errors=[], + ), + ProtoConversionCase( + name="only_errors", + states=[], + warnings=[], + errors=[ComponentErrorCode.OVERCURRENT], + ), + ProtoConversionCase( + name="unknown_codes", + states=[9999], + warnings=[8888], + errors=[7777], + ), + ], + ids=lambda case: case.name, +) +def test_from_proto( + case: ProtoConversionCase, + timestamp: datetime, +) -> None: + """Test conversion from proto message to ComponentStateSample.""" + proto = components_pb2.ComponentState( + sampled_at=to_timestamp(timestamp), + states=( + state.value if isinstance(state, ComponentStateCode) else state # type: ignore[misc] + for state in case.states + ), + warnings=( + ( + warning.value # type: ignore[misc] + if isinstance(warning, ComponentErrorCode) + else warning + ) + for warning in case.warnings + ), + errors=( + error.value if isinstance(error, ComponentErrorCode) else error # type: ignore[misc] + for error in case.errors + ), + ) + + with patch("frequenz.client.base.conversion.to_datetime", return_value=timestamp): + result = component_state_sample_from_proto(proto) + + assert result.sampled_at == timestamp + assert result.states == frozenset(case.states) + assert result.warnings == frozenset(case.warnings) + assert result.errors == frozenset(case.errors) diff --git a/tests/metrics/test_sample.py b/tests/metrics/test_sample.py index e4fe59c4..2c427d4c 100644 --- a/tests/metrics/test_sample.py +++ b/tests/metrics/test_sample.py @@ -4,13 +4,23 @@ """Tests for the Sample class and related classes.""" from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Final import pytest -from frequenz.api.common.v1.metrics import metric_sample_pb2 +from frequenz.api.common.v1.metrics import bounds_pb2, metric_sample_pb2 +from google.protobuf.timestamp_pb2 import Timestamp -from frequenz.client.microgrid.metrics import AggregatedMetricValue, AggregationMethod +from frequenz.client.microgrid.metrics import ( + AggregatedMetricValue, + AggregationMethod, + Bounds, + Metric, + MetricSample, +) from frequenz.client.microgrid.metrics._sample_proto import ( aggregated_metric_sample_from_proto, + metric_sample_from_proto_with_issues, ) @@ -40,6 +50,36 @@ class _AggregatedValueTestCase: """The raw values to include.""" +DATETIME: Final[datetime] = datetime(2023, 3, 15, 12, 0, 0, tzinfo=timezone.utc) +TIMESTAMP: Final[Timestamp] = Timestamp(seconds=int(DATETIME.timestamp())) + + +@dataclass(frozen=True, kw_only=True) +class _MetricSampleConversionTestCase: + """Test case for MetricSample protobuf conversion.""" + + name: str + """The description of the test case.""" + + proto_message: metric_sample_pb2.MetricSample + """The input protobuf message.""" + + expected_sample: MetricSample + """The expected MetricSample object.""" + + expected_major_issues: list[str] = field(default_factory=list) + """Expected major issues during conversion.""" + + expected_minor_issues: list[str] = field(default_factory=list) + """Expected minor issues during conversion.""" + + +@pytest.fixture +def now() -> datetime: + """Get the current time.""" + return datetime.now(timezone.utc) + + def test_aggregation_method_values() -> None: """Test that AggregationMethod enum has the expected values.""" assert AggregationMethod.AVG.value == "avg" @@ -47,33 +87,163 @@ def test_aggregation_method_values() -> None: assert AggregationMethod.MAX.value == "max" -def test_aggregated_metric_value() -> None: +@pytest.mark.parametrize( + "avg, min_val, max_val, raw_values, expected_str", + [ + pytest.param( + 5.0, + 1.0, + 10.0, + [1.0, 5.0, 10.0], + "avg:5.0", + id="full_data", + ), + pytest.param( + 5.0, + None, + None, + [], + "avg:5.0", + id="minimal_data", + ), + ], +) +def test_aggregated_metric_value( + avg: float, + min_val: float | None, + max_val: float | None, + raw_values: list[float], + expected_str: str, +) -> None: """Test AggregatedMetricValue creation and string representation.""" - # Test with full data value = AggregatedMetricValue( - avg=5.0, - min=1.0, - max=10.0, - raw_values=[1.0, 5.0, 10.0], + avg=avg, + min=min_val, + max=max_val, + raw_values=raw_values, ) - assert value.avg == 5.0 - assert value.min == 1.0 - assert value.max == 10.0 - assert list(value.raw_values) == [1.0, 5.0, 10.0] - assert str(value) == "avg:5.0" + assert value.avg == avg + assert value.min == min_val + assert value.max == max_val + assert list(value.raw_values) == raw_values + assert str(value) == expected_str - # Test with minimal data (only avg required) - value = AggregatedMetricValue( - avg=5.0, - min=None, - max=None, - raw_values=[], + +@pytest.mark.parametrize( + "value,connection", + [ + pytest.param( + 5.0, + None, + id="simple_value", + ), + pytest.param( + AggregatedMetricValue( + avg=5.0, + min=1.0, + max=10.0, + raw_values=[1.0, 5.0, 10.0], + ), + "dc_battery_0", + id="aggregated_value", + ), + pytest.param( + None, + None, + id="none_value", + ), + ], +) +def test_metric_sample_creation( + now: datetime, + value: float | AggregatedMetricValue | None, + connection: str | None, +) -> None: + """Test MetricSample creation with different value types.""" + bounds = [Bounds(lower=-10.0, upper=10.0)] + sample = MetricSample( + sampled_at=now, + metric=Metric.AC_ACTIVE_POWER, + value=value, + bounds=bounds, + connection=connection, + ) + assert sample.sampled_at == now + assert sample.metric == Metric.AC_ACTIVE_POWER + assert sample.value == value + assert sample.bounds == bounds + assert sample.connection == connection + + +@pytest.mark.parametrize( + "value, method_results", + [ + pytest.param( + 5.0, + { + AggregationMethod.AVG: 5.0, + AggregationMethod.MIN: 5.0, + AggregationMethod.MAX: 5.0, + }, + id="simple_value", + ), + pytest.param( + AggregatedMetricValue( + avg=5.0, + min=1.0, + max=10.0, + raw_values=[1.0, 5.0, 10.0], + ), + { + AggregationMethod.AVG: 5.0, + AggregationMethod.MIN: 1.0, + AggregationMethod.MAX: 10.0, + }, + id="aggregated_value", + ), + pytest.param( + None, + { + AggregationMethod.AVG: None, + AggregationMethod.MIN: None, + AggregationMethod.MAX: None, + }, + id="none_value", + ), + ], +) +def test_metric_sample_as_single_value( + now: datetime, + value: float | AggregatedMetricValue | None, + method_results: dict[AggregationMethod, float | None], +) -> None: + """Test MetricSample.as_single_value with different value types and methods.""" + bounds = [Bounds(lower=-10.0, upper=10.0)] + + sample = MetricSample( + sampled_at=now, + metric=Metric.AC_ACTIVE_POWER, + value=value, + bounds=bounds, + ) + + for method, expected in method_results.items(): + assert sample.as_single_value(aggregation_method=method) == expected + + +def test_metric_sample_multiple_bounds(now: datetime) -> None: + """Test MetricSample creation with multiple bounds.""" + bounds = [ + Bounds(lower=-10.0, upper=-5.0), + Bounds(lower=5.0, upper=10.0), + ] + sample = MetricSample( + sampled_at=now, + metric=Metric.AC_ACTIVE_POWER, + value=7.0, + bounds=bounds, ) - assert value.avg == 5.0 - assert value.min is None - assert value.max is None - assert not value.raw_values - assert str(value) == "avg:5.0" + assert sample.bounds == bounds @pytest.mark.parametrize( @@ -125,3 +295,157 @@ def test_aggregated_metric_value_from_proto(case: _AggregatedValueTestCase) -> N assert value.min == (case.min_value if case.has_min else None) assert value.max == (case.max_value if case.has_max else None) assert list(value.raw_values) == case.raw_values + + +@pytest.mark.parametrize( + "case", + [ + _MetricSampleConversionTestCase( + name="simple_value", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=5.0) + ), + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=5.0, + bounds=[], + connection=None, + ), + ), + _MetricSampleConversionTestCase( + name="aggregated_value", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + aggregated_metric=metric_sample_pb2.AggregatedMetricValue( + avg_value=5.0, min_value=1.0, max_value=10.0 + ) + ), + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=AggregatedMetricValue(avg=5.0, min=1.0, max=10.0, raw_values=[]), + bounds=[], + connection=None, + ), + ), + _MetricSampleConversionTestCase( + name="no_value", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=None, + bounds=[], + connection=None, + ), + ), + _MetricSampleConversionTestCase( + name="unrecognized_metric", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=999, # type: ignore[arg-type] + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=5.0) + ), + ), + expected_sample=MetricSample( + sampled_at=DATETIME, metric=999, value=5.0, bounds=[], connection=None + ), + ), + _MetricSampleConversionTestCase( + name="with_valid_bounds", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=5.0) + ), + bounds=[bounds_pb2.Bounds(lower=-10.0, upper=10.0)], + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=5.0, + bounds=[Bounds(lower=-10.0, upper=10.0)], + connection=None, + ), + ), + _MetricSampleConversionTestCase( + name="with_invalid_bounds", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=5.0) + ), + bounds=[ + bounds_pb2.Bounds(lower=-10.0, upper=10.0), + bounds_pb2.Bounds(lower=10.0, upper=-10.0), # Invalid + ], + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=5.0, + bounds=[Bounds(lower=-10.0, upper=10.0)], # Invalid bounds are ignored + connection=None, + ), + expected_major_issues=[ + ( + "bounds for AC_ACTIVE_POWER is invalid (Lower bound (10.0) must be " + "less than or equal to upper bound (-10.0)), ignoring these bounds" + ) + ], + ), + _MetricSampleConversionTestCase( + name="with_source", + proto_message=metric_sample_pb2.MetricSample( + sampled_at=TIMESTAMP, + metric=Metric.AC_ACTIVE_POWER.value, + value=metric_sample_pb2.MetricValueVariant( + simple_metric=metric_sample_pb2.SimpleMetricValue(value=5.0) + ), + source="dc_battery_0", + ), + expected_sample=MetricSample( + sampled_at=DATETIME, + metric=Metric.AC_ACTIVE_POWER, + value=5.0, + bounds=[], + connection="dc_battery_0", + ), + ), + ], + ids=lambda case: case.name, +) +def test_metric_sample_from_proto_with_issues( + case: _MetricSampleConversionTestCase, +) -> None: + """Test conversion from protobuf message to MetricSample.""" + major_issues: list[str] = [] + minor_issues: list[str] = [] + + # The timestamp in the expected sample needs to match the one from proto conversion + # We use a fixed timestamp in test cases, so this is fine. + # If dynamic timestamps were used, we'd need to adjust here or in the fixture. + + sample = metric_sample_from_proto_with_issues( + case.proto_message, + major_issues=major_issues, + minor_issues=minor_issues, + ) + + assert sample == case.expected_sample + assert major_issues == case.expected_major_issues + assert minor_issues == case.expected_minor_issues diff --git a/tests/test_client.py b/tests/test_client.py index b4574dc4..06054785 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -161,3 +161,19 @@ async def test_add_bounds( ) -> None: """Test add_bounds method.""" await spec.test_unary_unary_call(client, "AddComponentBounds") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs( + "receive_component_data_samples_stream", + tests_dir=TESTS_DIR, + ), + ids=str, +) +async def test_receive_component_data_samples_stream( + client: MicrogridApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test receive_component_data_samples_stream method.""" + await spec.test_unary_stream_call(client, "ReceiveComponentDataStream")