diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index f75fcc6ec..06141b008 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -592,13 +592,12 @@ async def main() -> None: # (6)! [_run]: #the-_run-method """ -from ..timeseries._resampling import ResamplerConfig from ._actor import Actor from ._background_service import BackgroundService from ._channel_registry import ChannelRegistry from ._config_managing import ConfigManagingActor from ._data_sourcing import ComponentMetricRequest, DataSourcingActor -from ._resampling import ComponentMetricsResamplingActor +from ._resampling import ComponentMetricsResamplingActor, ResamplingActorConfig from ._run_utils import run __all__ = [ @@ -609,6 +608,6 @@ async def main() -> None: # (6)! "ComponentMetricsResamplingActor", "ConfigManagingActor", "DataSourcingActor", - "ResamplerConfig", + "ResamplingActorConfig", "run", ] diff --git a/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py index 2c235336e..209cb95f9 100644 --- a/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py @@ -20,7 +20,6 @@ MeterData, ) from ...timeseries import Sample -from ...timeseries._quantities import Quantity from .._channel_registry import ChannelRegistry from ._component_metric_request import ComponentMetricRequest @@ -318,7 +317,7 @@ def _get_metric_senders( self, category: ComponentCategory, requests: dict[ComponentMetricId, list[ComponentMetricRequest]], - ) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]: + ) -> list[tuple[Callable[[Any], float], list[Sender[Sample[float]]]]]: """Get channel senders from the channel registry for each requested metric. Args: @@ -335,7 +334,7 @@ def _get_metric_senders( self._get_data_extraction_method(category, metric), [ self._registry.get_or_create( - Sample[Quantity], request.get_channel_name() + Sample[float], request.get_channel_name() ).new_sender() for request in req_list ], @@ -353,48 +352,55 @@ async def _handle_data_stream( Args: comp_id: Id of the requested component. category: The category of the component. + + Raises: + Exception: if an error occurs while streaming data. """ - stream_senders = [] - if comp_id in self._req_streaming_metrics: - await self._check_requested_component_and_metrics( - comp_id, category, self._req_streaming_metrics[comp_id] + try: + stream_senders = [] + if comp_id in self._req_streaming_metrics: + await self._check_requested_component_and_metrics( + comp_id, category, self._req_streaming_metrics[comp_id] + ) + stream_senders = self._get_metric_senders( + category, self._req_streaming_metrics[comp_id] + ) + api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] + + senders_done: asyncio.Event = asyncio.Event() + pending_messages = 0 + + def process_msg(data: Any) -> None: + tasks = [] + for extractor, senders in stream_senders: + for sender in senders: + tasks.append( + sender.send(Sample(data.timestamp, extractor(data))) + ) + asyncio.gather(*tasks) + nonlocal pending_messages + pending_messages -= 1 + if pending_messages == 0: + senders_done.set() + + async for data in api_data_receiver: + pending_messages += 1 + senders_done.clear() + process_msg(data) + + while pending_messages > 0: + await senders_done.wait() + + await asyncio.gather( + *[ + self._registry.close_and_remove(r.get_channel_name()) + for requests in self._req_streaming_metrics[comp_id].values() + for r in requests + ] ) - stream_senders = self._get_metric_senders( - category, self._req_streaming_metrics[comp_id] - ) - api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] - - senders_done: asyncio.Event = asyncio.Event() - pending_messages = 0 - - def process_msg(data: Any) -> None: - tasks = [] - for extractor, senders in stream_senders: - for sender in senders: - tasks.append( - sender.send(Sample(data.timestamp, Quantity(extractor(data)))) - ) - asyncio.gather(*tasks) - nonlocal pending_messages - pending_messages -= 1 - if pending_messages == 0: - senders_done.set() - - async for data in api_data_receiver: - pending_messages += 1 - senders_done.clear() - process_msg(data) - - while pending_messages > 0: - await senders_done.wait() - - await asyncio.gather( - *[ - self._registry.close_and_remove(r.get_channel_name()) - for requests in self._req_streaming_metrics[comp_id].values() - for r in requests - ] - ) + except Exception as exc: + _logger.exception("Error while streaming data for component %d", comp_id) + raise exc async def _update_streams( self, diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index 788c0bbc7..6d232404e 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -7,12 +7,12 @@ import asyncio import dataclasses import logging +from typing import Callable from frequenz.channels import Receiver, Sender from .._internal._asyncio import cancel_and_await -from ..timeseries import Sample -from ..timeseries._quantities import Quantity +from ..timeseries._base_types import Sample from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError from ._actor import Actor from ._channel_registry import ChannelRegistry @@ -21,6 +21,17 @@ _logger = logging.getLogger(__name__) +# We need to use the dataclass decorator again because we are making a required +# attribute optional, so we need the dataclass to re-generate the constructor with the +# new signature. +@dataclasses.dataclass(frozen=True) +class ResamplingActorConfig(ResamplerConfig[float]): + """Configuration for the resampling actor.""" + + value_constructor: Callable[[float], float] = float + """The constructor to use to create new sample values.""" + + class ComponentMetricsResamplingActor(Actor): """An actor to resample microgrid component metrics.""" @@ -30,7 +41,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], - config: ResamplerConfig, + config: ResamplingActorConfig, name: str | None = None, ) -> None: """Initialize an instance. @@ -49,13 +60,15 @@ def __init__( # pylint: disable=too-many-arguments """ super().__init__(name=name) self._channel_registry: ChannelRegistry = channel_registry + self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) self._resampling_request_receiver: Receiver[ComponentMetricRequest] = ( resampling_request_receiver ) - self._resampler: Resampler = Resampler(config) + self._resampler: Resampler[float] = Resampler(config) + self._active_req_channels: set[str] = set() async def _subscribe(self, request: ComponentMetricRequest) -> None: @@ -78,13 +91,13 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: data_source_channel_name = data_source_request.get_channel_name() await self._data_sourcing_request_sender.send(data_source_request) receiver = self._channel_registry.get_or_create( - Sample[Quantity], data_source_channel_name + Sample[float], data_source_channel_name ).new_receiver() # This is a temporary hack until the Sender implementation uses # exceptions to report errors. sender = self._channel_registry.get_or_create( - Sample[Quantity], request_channel_name + Sample[float], request_channel_name ).new_sender() self._resampler.add_timeseries(request_channel_name, receiver, sender.send) diff --git a/src/frequenz/sdk/microgrid/__init__.py b/src/frequenz/sdk/microgrid/__init__.py index c853d3920..b284aaeb2 100644 --- a/src/frequenz/sdk/microgrid/__init__.py +++ b/src/frequenz/sdk/microgrid/__init__.py @@ -121,7 +121,8 @@ to limit the charge power of individual EV Chargers. """ # noqa: D205, D400 -from ..actor import ResamplerConfig +import typing + from . import _data_pipeline, client, component, connection_manager, metadata from ._data_pipeline import ( battery_pool, @@ -134,8 +135,13 @@ voltage, ) +if typing.TYPE_CHECKING: + from ..actor._resampling import ResamplingActorConfig + -async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> None: +async def initialize( + host: str, port: int, resampler_config: "ResamplingActorConfig" +) -> None: """Initialize the microgrid connection manager and the data pipeline. Args: diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index f80dd8bf3..09c7f4303 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -35,7 +35,7 @@ # # pylint: disable=import-outside-toplevel if typing.TYPE_CHECKING: - from ..actor import ComponentMetricRequest, ResamplerConfig, _power_managing + from ..actor import ComponentMetricRequest, ResamplingActorConfig, _power_managing from ..actor.power_distributing import ( # noqa: F401 (imports used by string type hints) ComponentPoolStatus, PowerDistributingActor, @@ -81,7 +81,7 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes def __init__( self, - resampler_config: ResamplerConfig, + resampler_config: ResamplingActorConfig, ) -> None: """Create a `DataPipeline` instance. @@ -90,7 +90,7 @@ def __init__( """ from ..actor import ChannelRegistry - self._resampler_config: ResamplerConfig = resampler_config + self._resampler_config: ResamplingActorConfig = resampler_config self._channel_registry: ChannelRegistry = ChannelRegistry( name="Data Pipeline Registry" @@ -408,7 +408,7 @@ async def _stop(self) -> None: _DATA_PIPELINE: _DataPipeline | None = None -async def initialize(resampler_config: ResamplerConfig) -> None: +async def initialize(resampler_config: ResamplingActorConfig) -> None: """Initialize a `DataPipeline` instance. Args: diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index ef376e154..76ea2ce98 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -9,16 +9,19 @@ from collections.abc import Callable, Iterator from dataclasses import dataclass from datetime import datetime, timezone -from typing import Generic, Self, TypeVar, overload +from typing import Generic, Self, SupportsFloat, TypeVar, overload -from ._quantities import Power, QuantityT +from ._quantities import Power + +SupportsFloatT = TypeVar("SupportsFloatT", bound=SupportsFloat) +"""Type variable for types that support conversion to float.""" UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc) """The UNIX epoch (in UTC).""" @dataclass(frozen=True, order=True) -class Sample(Generic[QuantityT]): +class Sample(Generic[SupportsFloatT]): """A measurement taken at a particular point in time. The `value` could be `None` if a component is malfunctioning or data is @@ -29,12 +32,12 @@ class Sample(Generic[QuantityT]): timestamp: datetime """The time when this sample was generated.""" - value: QuantityT | None = None + value: SupportsFloatT | None = None """The value of this sample.""" @dataclass(frozen=True) -class Sample3Phase(Generic[QuantityT]): +class Sample3Phase(Generic[SupportsFloatT]): """A 3-phase measurement made at a particular point in time. Each of the `value` fields could be `None` if a component is malfunctioning @@ -45,16 +48,16 @@ class Sample3Phase(Generic[QuantityT]): timestamp: datetime """The time when this sample was generated.""" - value_p1: QuantityT | None + value_p1: SupportsFloatT | None """The value of the 1st phase in this sample.""" - value_p2: QuantityT | None + value_p2: SupportsFloatT | None """The value of the 2nd phase in this sample.""" - value_p3: QuantityT | None + value_p3: SupportsFloatT | None """The value of the 3rd phase in this sample.""" - def __iter__(self) -> Iterator[QuantityT | None]: + def __iter__(self) -> Iterator[SupportsFloatT | None]: """Return an iterator that yields values from each of the phases. Yields: @@ -65,12 +68,12 @@ def __iter__(self) -> Iterator[QuantityT | None]: yield self.value_p3 @overload - def max(self, default: QuantityT) -> QuantityT: ... + def max(self, default: SupportsFloatT) -> SupportsFloatT: ... @overload - def max(self, default: None = None) -> QuantityT | None: ... + def max(self, default: None = None) -> SupportsFloatT | None: ... - def max(self, default: QuantityT | None = None) -> QuantityT | None: + def max(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None: """Return the max value among all phases, or default if they are all `None`. Args: @@ -81,19 +84,19 @@ def max(self, default: QuantityT | None = None) -> QuantityT | None: """ if not any(self): return default - value: QuantityT = functools.reduce( - lambda x, y: x if x > y else y, + value: SupportsFloatT = functools.reduce( + lambda x, y: x if float(x) > float(y) else y, filter(None, self), ) return value @overload - def min(self, default: QuantityT) -> QuantityT: ... + def min(self, default: SupportsFloatT) -> SupportsFloatT: ... @overload - def min(self, default: None = None) -> QuantityT | None: ... + def min(self, default: None = None) -> SupportsFloatT | None: ... - def min(self, default: QuantityT | None = None) -> QuantityT | None: + def min(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None: """Return the min value among all phases, or default if they are all `None`. Args: @@ -104,16 +107,16 @@ def min(self, default: QuantityT | None = None) -> QuantityT | None: """ if not any(self): return default - value: QuantityT = functools.reduce( - lambda x, y: x if x < y else y, + value: SupportsFloatT = functools.reduce( + lambda x, y: x if float(x) < float(y) else y, filter(None, self), ) return value def map( self, - function: Callable[[QuantityT], QuantityT], - default: QuantityT | None = None, + function: Callable[[SupportsFloatT], SupportsFloatT], + default: SupportsFloatT | None = None, ) -> Self: """Apply the given function on each of the phase values and return the result. diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 624287d2d..729cdd17c 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -7,6 +7,7 @@ import asyncio import logging +import math from typing import TYPE_CHECKING from frequenz.channels import Receiver, Sender @@ -15,7 +16,7 @@ from ..microgrid import connection_manager from ..microgrid.component import Component, ComponentCategory, ComponentMetricId from ..timeseries._base_types import Sample -from ..timeseries._quantities import Frequency, Quantity +from ..timeseries._quantities import Frequency if TYPE_CHECKING: # Imported here to avoid a circular import. @@ -96,7 +97,7 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: A receiver that will receive grid frequency samples. """ receiver = self._channel_registry.get_or_create( - Sample[Quantity], self._component_metric_request.get_channel_name() + Sample[float], self._component_metric_request.get_channel_name() ).new_receiver() if not self._task: @@ -109,10 +110,8 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: return receiver.map( lambda sample: ( Sample[Frequency](sample.timestamp, None) - if sample.value is None or sample.value.isnan() - else Sample( - sample.timestamp, Frequency.from_hertz(sample.value.base_value) - ) + if sample.value is None or math.isnan(sample.value) + else Sample(sample.timestamp, Frequency.from_hertz(float(sample.value))) ) ) diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index 62376f21b..ec5882fa5 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -9,22 +9,21 @@ import math from collections.abc import Sequence from datetime import datetime, timedelta -from typing import SupportsIndex, overload +from typing import Generic, SupportsIndex, overload import numpy as np from frequenz.channels import Broadcast, Receiver, Sender from numpy.typing import ArrayLike from ..actor._background_service import BackgroundService -from ._base_types import UNIX_EPOCH, Sample -from ._quantities import Quantity +from ._base_types import UNIX_EPOCH, Sample, SupportsFloatT from ._resampling import Resampler, ResamplerConfig from ._ringbuffer import OrderedRingBuffer _logger = logging.getLogger(__name__) -class MovingWindow(BackgroundService): +class MovingWindow(BackgroundService, Generic[SupportsFloatT]): """ A data window that moves with the latest datapoints of a data stream. @@ -130,9 +129,9 @@ async def run() -> None: def __init__( # pylint: disable=too-many-arguments self, size: timedelta, - resampled_data_recv: Receiver[Sample[Quantity]], + resampled_data_recv: Receiver[Sample[SupportsFloatT]], input_sampling_period: timedelta, - resampler_config: ResamplerConfig | None = None, + resampler_config: ResamplerConfig[SupportsFloatT] | None = None, align_to: datetime = UNIX_EPOCH, *, name: str | None = None, @@ -166,8 +165,8 @@ def __init__( # pylint: disable=too-many-arguments self._sampling_period = input_sampling_period - self._resampler: Resampler | None = None - self._resampler_sender: Sender[Sample[Quantity]] | None = None + self._resampler: Resampler[SupportsFloatT] | None = None + self._resampler_sender: Sender[Sample[SupportsFloatT]] | None = None if resampler_config: assert ( @@ -182,7 +181,9 @@ def __init__( # pylint: disable=too-many-arguments size.total_seconds() / self._sampling_period.total_seconds() ) - self._resampled_data_recv = resampled_data_recv + self._resampled_data_recv: Receiver[Sample[SupportsFloatT]] = ( + resampled_data_recv + ) self._buffer = OrderedRingBuffer( np.empty(shape=num_samples, dtype=float), sampling_period=self._sampling_period, @@ -341,11 +342,11 @@ def _configure_resampler(self) -> None: """Configure the components needed to run the resampler.""" assert self._resampler is not None - async def sink_buffer(sample: Sample[Quantity]) -> None: + async def sink_buffer(sample: Sample[SupportsFloatT]) -> None: if sample.value is not None: self._buffer.update(sample) - resampler_channel = Broadcast[Sample[Quantity]]("average") + resampler_channel = Broadcast[Sample[SupportsFloatT]]("average") self._resampler_sender = resampler_channel.new_sender() self._resampler.add_timeseries( "avg", resampler_channel.new_receiver(), sink_buffer diff --git a/src/frequenz/sdk/timeseries/_periodic_feature_extractor.py b/src/frequenz/sdk/timeseries/_periodic_feature_extractor.py index ab646e154..a2073b7c7 100644 --- a/src/frequenz/sdk/timeseries/_periodic_feature_extractor.py +++ b/src/frequenz/sdk/timeseries/_periodic_feature_extractor.py @@ -16,12 +16,14 @@ import logging from dataclasses import dataclass from datetime import datetime, timedelta +from typing import Generic import numpy as np from numpy.typing import NDArray from .._internal._math import is_close_to_zero from ._moving_window import MovingWindow +from ._quantities import SupportsFloatT from ._ringbuffer import OrderedRingBuffer _logger = logging.getLogger(__name__) @@ -48,7 +50,7 @@ class RelativePositions: """The relative position of the next incoming sample.""" -class PeriodicFeatureExtractor: +class PeriodicFeatureExtractor(Generic[SupportsFloatT]): """ A feature extractor for historical timeseries data. @@ -106,7 +108,7 @@ class PeriodicFeatureExtractor: def __init__( self, - moving_window: MovingWindow, + moving_window: MovingWindow[SupportsFloatT], period: timedelta, ) -> None: """ @@ -119,7 +121,7 @@ def __init__( Raises: ValueError: If the MovingWindow size is not a integer multiple of the period. """ - self._moving_window = moving_window + self._moving_window: MovingWindow[SupportsFloatT] = moving_window self._sampling_period = self._moving_window.sampling_period """The sampling_period as float to use it for indexing of samples.""" diff --git a/src/frequenz/sdk/timeseries/_quantities.py b/src/frequenz/sdk/timeseries/_quantities.py index 48ebeb819..3e679492b 100644 --- a/src/frequenz/sdk/timeseries/_quantities.py +++ b/src/frequenz/sdk/timeseries/_quantities.py @@ -7,33 +7,18 @@ from __future__ import annotations +import abc import math from datetime import timedelta -from typing import Any, NoReturn, Self, TypeVar, overload - -QuantityT = TypeVar( - "QuantityT", - "Quantity", - "Power", - "Current", - "Voltage", - "Energy", - "Frequency", - "Percentage", - "Temperature", -) -"""Type variable for representing various quantity types.""" - - -class Quantity: +from typing import Self, overload + + +class Quantity(abc.ABC): """A quantity with a unit. Quantities try to behave like float and are also immutable. """ - _base_value: float - """The value of this quantity in the base unit.""" - _exponent_unit_map: dict[int, str] | None = None """A mapping from the exponent of the base unit to the unit symbol. @@ -41,14 +26,27 @@ class Quantity: class. Sub-classes must define this. """ - def __init__(self, value: float, exponent: int = 0) -> None: - """Initialize a new quantity. + def __init__(self) -> None: + """Initialize this instance. - Args: - value: The value of this quantity in a given exponent of the base unit. - exponent: The exponent of the base unit the given value is in. + This constructor is not meant to be used directly. Use one of the `from_*()` + methods instead. + + Raises: + TypeError: Every time this constructor is called. """ - self._base_value = value * 10.0**exponent + # This is for documentation purposes and to hint to mypy that this + # attribute exists. + self._base_value: float = 0.0 + """The value of this quantity in the base unit.""" + + cls = type(self) + raise TypeError( + "Use of default constructor is not allowed for " + f"{cls.__module__}.{cls.__qualname__}, " + f"use {cls.__name__}.zero() or one of the " + f"`{cls.__name__}.from_*()` constructors instead." + ) @classmethod def _new(cls, value: float, *, exponent: int = 0) -> Self: @@ -97,7 +95,14 @@ def zero(cls) -> Self: Returns: A quantity with value 0.0. + + Raises: + TypeError: If called on the base + [`Quantity`][frequenz.sdk.timeseries.Quantity] class. """ + if cls is Quantity: + raise TypeError("Cannot instantiate a base Quantity class.") + _zero = cls._zero_cache.get(cls, None) if _zero is None: _zero = cls.__new__(cls) @@ -118,8 +123,12 @@ def from_string(cls, string: str) -> Self: Raises: ValueError: If the string does not match the expected format. - + TypeError: If called on the base + [`Quantity`][frequenz.sdk.timeseries.Quantity] class. """ + if cls is Quantity: + raise TypeError("Cannot instantiate a base Quantity class.") + split_string = string.split(" ") if len(split_string) != 2: @@ -142,43 +151,65 @@ def from_string(cls, string: str) -> Self: raise ValueError(f"Unknown unit {split_string[1]}") - @property - def base_value(self) -> float: + def __float__(self) -> float: """Return the value of this quantity in the base unit. + Warning: + Normally you should convert to float by using the `as_*()` methods, which + makes it clear what unit you are converting to. + + This method is provided only for generics, where using + a [`Quantity`][frequenz.sdk.timeseries.Quantity] as + a [`SupportsFloat`][typing.SupportsFloat] is required. + Returns: - The value of this quantity in the base unit. + The value of this quantity in the [base + unit][frequenz.sdk.timeseries.Quantity.base_unit]. """ return self._base_value - @property - def base_unit(self) -> str | None: - """Return the base unit of this quantity. + def __round__(self, ndigits: int | None = None) -> Self: + """Round this quantity to the given number of digits. - None if this quantity has no unit. + Args: + ndigits: The number of digits to round to. Returns: - The base unit of this quantity. + The rounded quantity. """ - if not self._exponent_unit_map: - return None - return self._exponent_unit_map[0] + return self._new(round(self._base_value, ndigits)) - def isnan(self) -> bool: - """Return whether this quantity is NaN. + def __pos__(self) -> Self: + """Return this quantity. Returns: - Whether this quantity is NaN. + This quantity. """ - return math.isnan(self._base_value) + return self - def isinf(self) -> bool: - """Return whether this quantity is infinite. + def __mod__(self, other: Self) -> Self: + """Return the remainder of this quantity and another. + + Args: + other: The other quantity. Returns: - Whether this quantity is infinite. + The remainder of this quantity and another. """ - return math.isinf(self._base_value) + return self._new(self._base_value % other._base_value) + + @property + def base_unit(self) -> str | None: + """Return the base unit of this quantity. + + None if this quantity has no unit. + + Returns: + The base unit of this quantity. + """ + if not self._exponent_unit_map: + return None + return self._exponent_unit_map[0] def isclose(self, other: Self, rel_tol: float = 1e-9, abs_tol: float = 0.0) -> bool: """Return whether this quantity is close to another. @@ -463,29 +494,8 @@ def __abs__(self) -> Self: return absolute -class _NoDefaultConstructible(type): - """A metaclass that disables the default constructor.""" - - def __call__(cls, *_args: Any, **_kwargs: Any) -> NoReturn: - """Raise a TypeError when the default constructor is called. - - Args: - *_args: ignored positional arguments. - **_kwargs: ignored keyword arguments. - - Raises: - TypeError: Always. - """ - raise TypeError( - "Use of default constructor NOT allowed for " - f"{cls.__module__}.{cls.__qualname__}, " - f"use one of the `{cls.__name__}.from_*()` methods instead." - ) - - class Temperature( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={ 0: "°C", }, @@ -515,7 +525,6 @@ def as_celsius(self) -> float: class Power( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={ -3: "mW", 0: "W", @@ -709,7 +718,6 @@ def __truediv__(self, other: Current | Voltage) -> Voltage | Current: class Current( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={ -3: "mA", 0: "A", @@ -820,7 +828,6 @@ def __mul__(self, other: float | Percentage | Voltage, /) -> Self | Power: class Voltage( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={0: "V", -3: "mV", 3: "kV"}, ): """A voltage quantity. @@ -948,7 +955,6 @@ def __mul__(self, other: float | Percentage | Current, /) -> Self | Power: class Energy( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={ 0: "Wh", 3: "kWh", @@ -1088,7 +1094,6 @@ def __truediv__(self, other: timedelta | Power) -> Power | timedelta: class Frequency( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={0: "Hz", 3: "kHz", 6: "MHz", 9: "GHz"}, ): """A frequency quantity. @@ -1193,7 +1198,6 @@ def period(self) -> timedelta: class Percentage( Quantity, - metaclass=_NoDefaultConstructible, exponent_unit_map={0: "%"}, ): """A percentage quantity. diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 26369da8f..8f7b37f62 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -14,18 +14,16 @@ from collections.abc import AsyncIterator, Callable, Coroutine, Sequence from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import cast +from typing import Generic, cast from frequenz.channels.util import Timer from frequenz.channels.util._timer import _to_microseconds from .._internal._asyncio import cancel_and_await -from ._base_types import UNIX_EPOCH, Sample -from ._quantities import Quantity, QuantityT +from ._base_types import UNIX_EPOCH, Sample, SupportsFloatT _logger = logging.getLogger(__name__) - DEFAULT_BUFFER_LEN_INIT = 16 """Default initial buffer length. @@ -50,14 +48,14 @@ """ -Source = AsyncIterator[Sample[Quantity]] +Source = AsyncIterator[Sample[SupportsFloatT]] """A source for a timeseries. A timeseries can be received sample by sample in a streaming way using a source. """ -Sink = Callable[[Sample[Quantity]], Coroutine[None, None, None]] +Sink = Callable[[Sample[SupportsFloatT]], Coroutine[None, None, None]] """A sink for a timeseries. A new timeseries can be generated by sending samples to a sink. @@ -70,12 +68,17 @@ ``` Args: - sample (Sample): A sample to be sent out. + sample: A sample to be sent out. """ ResamplingFunction = Callable[ - [Sequence[Sample[Quantity]], "ResamplerConfig", "SourceProperties"], float + [ + Sequence[Sample[SupportsFloatT]], + "ResamplerConfig[SupportsFloatT]", + "SourceProperties", + ], + float, ] """Resampling function type. @@ -91,21 +94,19 @@ timestamp in the input data). Args: - input_samples (Sequence[Sample]): The sequence of pre-existing samples. - resampler_config (ResamplerConfig): The configuration of the resampling - calling this function. - source_properties (SourceProperties): The properties of the source being - resampled. + input_samples: The sequence of pre-existing samples. + resampler_config: The configuration of the resampling calling this function. + source_properties: The properties of the source being resampled. Returns: - new_sample (float): The value of new sample produced after the resampling. + The value of new sample produced after the resampling. """ # pylint: disable=unused-argument def average( - samples: Sequence[Sample[QuantityT]], - resampler_config: ResamplerConfig, + samples: Sequence[Sample[SupportsFloatT]], + resampler_config: ResamplerConfig[SupportsFloatT], source_properties: SourceProperties, ) -> float: """Calculate average of all the provided values. @@ -120,14 +121,12 @@ def average( The average of all `samples` values. """ assert len(samples) > 0, "Average cannot be given an empty list of samples" - values = list( - sample.value.base_value for sample in samples if sample.value is not None - ) + values = list(float(sample.value) for sample in samples if sample.value is not None) return sum(values) / len(values) @dataclass(frozen=True) -class ResamplerConfig: +class ResamplerConfig(Generic[SupportsFloatT]): """Resampler configuration.""" resampling_period: timedelta @@ -138,6 +137,9 @@ class ResamplerConfig: It must be a positive time span. """ + value_constructor: Callable[[float], SupportsFloatT] + """The function to construct new values for the resampled timeseries.""" + max_data_age_in_periods: float = 3.0 """The maximum age a sample can have to be considered *relevant* for resampling. @@ -160,7 +162,7 @@ class ResamplerConfig: passed to the resampling function. """ - resampling_function: ResamplingFunction = average + resampling_function: ResamplingFunction[SupportsFloatT] = average """The resampling function. This function will be applied to the sequence of relevant samples at @@ -255,17 +257,17 @@ def __post_init__(self) -> None: ) -class SourceStoppedError(RuntimeError): +class SourceStoppedError(RuntimeError, Generic[SupportsFloatT]): """A timeseries stopped producing samples.""" - def __init__(self, source: Source) -> None: + def __init__(self, source: Source[SupportsFloatT]) -> None: """Create an instance. Args: source: The source of the timeseries that stopped producing samples. """ super().__init__(f"Timeseries stopped producing samples, source: {source}") - self.source = source + self.source: Source[SupportsFloatT] = source """The source of the timeseries that stopped producing samples.""" def __repr__(self) -> str: @@ -277,7 +279,7 @@ def __repr__(self) -> str: return f"{self.__class__.__name__}({self.source!r})" -class ResamplingError(RuntimeError): +class ResamplingError(RuntimeError, Generic[SupportsFloatT]): """An Error ocurred while resampling. This error is a container for errors raised by the underlying sources and @@ -286,7 +288,7 @@ class ResamplingError(RuntimeError): def __init__( self, - exceptions: dict[Source, Exception | asyncio.CancelledError], + exceptions: dict[Source[SupportsFloatT], Exception | asyncio.CancelledError], ) -> None: """Create an instance. @@ -301,7 +303,9 @@ def __init__( the actual source of the exception. """ super().__init__(f"Some error were found while resampling: {exceptions}") - self.exceptions = exceptions + self.exceptions: dict[ + Source[SupportsFloatT], Exception | asyncio.CancelledError + ] = exceptions """A mapping of timeseries source and the exception encountered. Note that the error could be raised by the sink, while trying to send @@ -344,7 +348,7 @@ class SourceProperties: """ -class Resampler: +class Resampler(Generic[SupportsFloatT]): """A timeseries resampler. In general timeseries [`Source`][frequenz.sdk.timeseries.Source]s don't @@ -359,16 +363,18 @@ class Resampler: no way to produce meaningful samples with the available data. """ - def __init__(self, config: ResamplerConfig) -> None: + def __init__(self, config: ResamplerConfig[SupportsFloatT]) -> None: """Initialize an instance. Args: config: The configuration for the resampler. """ - self._config = config + self._config: ResamplerConfig[SupportsFloatT] = config """The configuration for this resampler.""" - self._resamplers: dict[Source, _StreamingHelper] = {} + self._resamplers: dict[ + Source[SupportsFloatT], _StreamingHelper[SupportsFloatT] + ] = {} """A mapping between sources and the streaming helper handling that source.""" window_end, start_delay_time = self._calculate_window_end() @@ -396,7 +402,7 @@ def __init__(self, config: ResamplerConfig) -> None: ) # pylint: disable=protected-access @property - def config(self) -> ResamplerConfig: + def config(self) -> ResamplerConfig[SupportsFloatT]: """Get the resampler configuration. Returns: @@ -404,7 +410,7 @@ def config(self) -> ResamplerConfig: """ return self._config - def get_source_properties(self, source: Source) -> SourceProperties: + def get_source_properties(self, source: Source[SupportsFloatT]) -> SourceProperties: """Get the properties of a timeseries source. Args: @@ -419,7 +425,9 @@ async def stop(self) -> None: """Cancel all receiving tasks.""" await asyncio.gather(*[helper.stop() for helper in self._resamplers.values()]) - def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: + def add_timeseries( + self, name: str, source: Source[SupportsFloatT], sink: Sink[SupportsFloatT] + ) -> bool: """Start resampling a new timeseries. Args: @@ -441,7 +449,7 @@ def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: self._resamplers[source] = resampler return True - def remove_timeseries(self, source: Source) -> bool: + def remove_timeseries(self, source: Source[SupportsFloatT]) -> bool: """Stop resampling the timeseries produced by `source`. Args: @@ -506,7 +514,7 @@ async def resample(self, *, one_shot: bool = False) -> None: # contain Exception | CancelledError because of the condition in the list # comprehension below. exceptions = cast( - dict[Source, Exception | asyncio.CancelledError], + dict[Source[SupportsFloatT], Exception | asyncio.CancelledError], { source: results[i] for i, source in enumerate(self._resamplers) @@ -559,7 +567,7 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: ) -class _ResamplingHelper: +class _ResamplingHelper(Generic[SupportsFloatT]): """Keeps track of *relevant* samples to pass them to the resampling function. Samples are stored in an internal ring buffer. All collected samples that @@ -569,16 +577,18 @@ class _ResamplingHelper: All older samples are discarded. """ - def __init__(self, name: str, config: ResamplerConfig) -> None: + def __init__(self, name: str, config: ResamplerConfig[SupportsFloatT]) -> None: """Initialize an instance. Args: name: The name of this resampler helper (for logging purposes). config: The configuration for this resampler helper. """ - self._name = name - self._config = config - self._buffer: deque[Sample[Quantity]] = deque(maxlen=config.initial_buffer_len) + self._name: str = name + self._config: ResamplerConfig[SupportsFloatT] = config + self._buffer: deque[Sample[SupportsFloatT]] = deque( + maxlen=config.initial_buffer_len + ) self._source_properties: SourceProperties = SourceProperties() @property @@ -590,7 +600,7 @@ def source_properties(self) -> SourceProperties: """ return self._source_properties - def add_sample(self, sample: Sample[Quantity]) -> None: + def add_sample(self, sample: Sample[SupportsFloatT]) -> None: """Add a new sample to the internal buffer. Args: @@ -703,7 +713,7 @@ def _update_buffer_len(self) -> bool: return True - def resample(self, timestamp: datetime) -> Sample[Quantity]: + def resample(self, timestamp: datetime) -> Sample[SupportsFloatT]: """Generate a new sample based on all the current *relevant* samples. Args: @@ -751,17 +761,19 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) - return Sample(timestamp, None if value is None else Quantity(value)) + if value is None or math.isnan(value): + return Sample(timestamp, None) + return Sample(timestamp, self._config.value_constructor(value)) -class _StreamingHelper: +class _StreamingHelper(Generic[SupportsFloatT]): """Resample data coming from a source, sending the results to a sink.""" def __init__( self, - helper: _ResamplingHelper, - source: Source, - sink: Sink, + helper: _ResamplingHelper[SupportsFloatT], + source: Source[SupportsFloatT], + sink: Sink[SupportsFloatT], ) -> None: """Initialize an instance. @@ -770,9 +782,9 @@ def __init__( source: The source to use to get the samples to be resampled. sink: The sink to use to send the resampled data. """ - self._helper: _ResamplingHelper = helper - self._source: Source = source - self._sink: Sink = sink + self._helper: _ResamplingHelper[SupportsFloatT] = helper + self._source: Source[SupportsFloatT] = source + self._sink: Sink[SupportsFloatT] = sink self._receiving_task: asyncio.Task[None] = asyncio.create_task( self._receive_samples() ) @@ -797,7 +809,7 @@ async def _receive_samples(self) -> None: error). """ async for sample in self._source: - if sample.value is not None and not sample.value.isnan(): + if sample.value is not None and not math.isnan(sample.value): self._helper.add_sample(sample) async def resample(self, timestamp: datetime) -> None: diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index f431ab039..ab0164a46 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -4,6 +4,7 @@ """Ringbuffer implementation with focus on time & memory efficiency.""" +import math from copy import deepcopy from dataclasses import dataclass from datetime import datetime, timedelta, timezone @@ -12,8 +13,7 @@ import numpy as np import numpy.typing as npt -from .._base_types import UNIX_EPOCH, Sample -from .._quantities import QuantityT +from .._base_types import UNIX_EPOCH, Sample, SupportsFloatT FloatArray = TypeVar("FloatArray", list[float], npt.NDArray[np.float64]) """Type variable of the buffer container.""" @@ -106,7 +106,7 @@ def gaps(self) -> list[Gap]: """ return self._gaps - def has_value(self, sample: Sample[QuantityT]) -> bool: + def has_value(self, sample: Sample[SupportsFloatT]) -> bool: """Check if a sample has a value and it's not NaN. Args: @@ -115,7 +115,7 @@ def has_value(self, sample: Sample[QuantityT]) -> bool: Returns: True if the sample has a value and it's not NaN. """ - return not (sample.value is None or sample.value.isnan()) + return not (sample.value is None or math.isnan(sample.value)) @property def maxlen(self) -> int: @@ -126,7 +126,7 @@ def maxlen(self) -> int: """ return len(self._buffer) - def update(self, sample: Sample[QuantityT]) -> None: + def update(self, sample: Sample[SupportsFloatT]) -> None: """Update the buffer with a new value for the given timestamp. Missing values are written as NaN. Be advised that when @@ -165,7 +165,7 @@ def update(self, sample: Sample[QuantityT]) -> None: # Update data if self.has_value(sample): assert sample.value is not None - value = sample.value.base_value + value = float(sample.value) else: value = np.nan self._buffer[self.to_internal_index(timestamp)] = value diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index d7fee8c91..813f5fa7f 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -19,7 +19,7 @@ from ..microgrid import connection_manager from ..microgrid.component import Component, ComponentCategory, ComponentMetricId from ..timeseries._base_types import Sample, Sample3Phase -from ..timeseries._quantities import Quantity, Voltage +from ..timeseries._quantities import Voltage if TYPE_CHECKING: # Imported here to avoid a circular import. @@ -141,7 +141,7 @@ async def _send_request(self) -> None: ComponentMetricId.VOLTAGE_PHASE_2, ComponentMetricId.VOLTAGE_PHASE_3, ) - phases_rx: list[Receiver[Sample[Quantity]]] = [] + phases_rx: list[Receiver[Sample[float]]] = [] for metric_id in metric_ids: req = ComponentMetricRequest( self._namespace, self._source_component.component_id, metric_id, None @@ -151,7 +151,7 @@ async def _send_request(self) -> None: phases_rx.append( self._channel_registry.get_or_create( - Sample[Quantity], req.get_channel_name() + Sample[float], req.get_channel_name() ).new_receiver() ) @@ -179,7 +179,7 @@ async def _send_request(self) -> None: phases[0].timestamp, *map( lambda p: ( - Voltage.from_volts(p.value.base_value) if p.value else None + Voltage.from_volts(float(p.value)) if p.value else None ), phases, ), diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 940ee7602..8e8d9c60f 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -19,7 +19,7 @@ from ...microgrid import connection_manager from ...microgrid.component import ComponentCategory, ComponentMetricId from .. import Sample, Sample3Phase -from .._quantities import Current, Power, Quantity +from .._quantities import Current, Power from ..formula_engine import FormulaEngine, FormulaEngine3Phase from ..formula_engine._formula_engine_pool import FormulaEnginePool from ..formula_engine._formula_generators import ( @@ -253,9 +253,9 @@ async def stop(self) -> None: await cancel_and_await(task) async def _get_current_streams(self, component_id: int) -> tuple[ - Receiver[Sample[Quantity]], - Receiver[Sample[Quantity]], - Receiver[Sample[Quantity]], + Receiver[Sample[float]], + Receiver[Sample[float]], + Receiver[Sample[float]], ]: """Fetch current streams from the resampler for each phase. @@ -269,7 +269,7 @@ async def _get_current_streams(self, component_id: int) -> tuple[ async def resampler_subscribe( metric_id: ComponentMetricId, - ) -> Receiver[Sample[Quantity]]: + ) -> Receiver[Sample[float]]: request = ComponentMetricRequest( namespace="ev-pool", component_id=component_id, @@ -278,7 +278,7 @@ async def resampler_subscribe( ) await self._resampler_subscription_sender.send(request) return self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() + Sample[float], request.get_channel_name() ).new_receiver() return ( @@ -323,17 +323,17 @@ async def _stream_component_data( value_p1=( None if phase_1.value is None - else Current.from_amperes(phase_1.value.base_value) + else Current.from_amperes(phase_1.value) ), value_p2=( None if phase_2.value is None - else Current.from_amperes(phase_2.value.base_value) + else Current.from_amperes(phase_2.value) ), value_p3=( None if phase_3.value is None - else Current.from_amperes(phase_3.value.base_value) + else Current.from_amperes(phase_3.value) ), ) diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py index 45bc03505..7dba7f3e3 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py @@ -12,13 +12,13 @@ from abc import ABC from collections import deque from collections.abc import Callable -from typing import Generic, TypeVar, Union, overload +from typing import Generic, SupportsFloat, TypeVar, Union, overload from frequenz.channels import Broadcast, Receiver from ..._internal._asyncio import cancel_and_await -from .. import Sample, Sample3Phase -from .._quantities import Quantity, QuantityT +from .._base_types import Sample, Sample3Phase +from .._quantities import Quantity, SupportsFloatT from ._formula_evaluator import FormulaEvaluator from ._formula_formatter import format_formula from ._formula_steps import ( @@ -40,6 +40,12 @@ _logger = logging.Logger(__name__) +SupportsFloatInputT = TypeVar("SupportsFloatInputT", bound=SupportsFloat) +"""Type variable for inputs that support conversion to float.""" + +SupportsFloatOutputT = TypeVar("SupportsFloatOutputT", bound=SupportsFloat) +"""Type variable for outputs that support conversion to float.""" + _operator_precedence = { "max": 0, "min": 1, @@ -95,11 +101,17 @@ # but mypy doesn't support that, so we need to use `# type: ignore` in several places in # this, and subsequent classes, to avoid mypy errors. class _ComposableFormulaEngine( - ABC, Generic[_GenericEngine, _GenericHigherOrderBuilder, QuantityT] + ABC, + Generic[ + _GenericEngine, + _GenericHigherOrderBuilder, + SupportsFloatInputT, + SupportsFloatOutputT, + ], ): """A base class for formula engines.""" - _create_method: Callable[[float], QuantityT] + _create_method: Callable[[float], SupportsFloatOutputT] _higher_order_builder: type[_GenericHigherOrderBuilder] _task: asyncio.Task[None] | None = None @@ -110,8 +122,7 @@ async def _stop(self) -> None: await cancel_and_await(self._task) def __add__( - self, - other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT, + self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT ) -> _GenericHigherOrderBuilder: """Return a formula builder that adds (data in) `other` to `self`. @@ -126,7 +137,7 @@ def __add__( return self._higher_order_builder(self, self._create_method) + other # type: ignore def __sub__( - self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT + self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT ) -> _GenericHigherOrderBuilder: """Return a formula builder that subtracts (data in) `other` from `self`. @@ -171,12 +182,12 @@ def __truediv__( return self._higher_order_builder(self, self._create_method) / other # type: ignore def max( - self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT + self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT ) -> _GenericHigherOrderBuilder: """Return a formula engine that outputs the maximum of `self` and `other`. Args: - other: A formula receiver, a formula builder or a QuantityT instance + other: A formula receiver, a formula builder or a SupportsFloatT instance corresponding to a sub-expression. Returns: @@ -186,12 +197,12 @@ def max( return self._higher_order_builder(self, self._create_method).max(other) # type: ignore def min( - self, other: _GenericEngine | _GenericHigherOrderBuilder | QuantityT + self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT ) -> _GenericHigherOrderBuilder: """Return a formula engine that outputs the minimum of `self` and `other`. Args: - other: A formula receiver, a formula builder or a QuantityT instance + other: A formula receiver, a formula builder or a SupportsFloatT instance corresponding to a sub-expression. @@ -221,11 +232,11 @@ def production(self) -> _GenericHigherOrderBuilder: class FormulaEngine( - Generic[QuantityT], + Generic[SupportsFloatInputT, SupportsFloatOutputT], _ComposableFormulaEngine[ "FormulaEngine", # type: ignore[type-arg] "HigherOrderFormulaBuilder", # type: ignore[type-arg] - QuantityT, + SupportsFloatOutputT, ], ): """[`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine]s are a @@ -294,8 +305,8 @@ class FormulaEngine( def __init__( self, - builder: FormulaBuilder[QuantityT], - create_method: Callable[[float], QuantityT], + builder: FormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT], + create_method: Callable[[float], SupportsFloatOutputT], ) -> None: """Create a `FormulaEngine` instance. @@ -308,19 +319,21 @@ def __init__( """ self._higher_order_builder = HigherOrderFormulaBuilder self._name: str = builder.name - self._builder: FormulaBuilder[QuantityT] = builder + self._builder: FormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT] = ( + builder + ) self._create_method = create_method - self._channel: Broadcast[Sample[QuantityT]] = Broadcast(self._name) + self._channel: Broadcast[Sample[SupportsFloatInputT]] = Broadcast(self._name) @classmethod def from_receiver( cls, name: str, - receiver: Receiver[Sample[QuantityT]], - create_method: Callable[[float], QuantityT], + receiver: Receiver[Sample[SupportsFloatInputT]], + create_method: Callable[[float], SupportsFloatOutputT], *, nones_are_zeros: bool = False, - ) -> FormulaEngine[QuantityT]: + ) -> FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]: """ Create a formula engine from a receiver. @@ -370,7 +383,7 @@ async def run() -> None: async def _run(self) -> None: await self._builder.subscribe() steps, metric_fetchers = self._builder.finalize() - evaluator = FormulaEvaluator[QuantityT]( + evaluator = FormulaEvaluator[SupportsFloatInputT, SupportsFloatOutputT]( self._name, steps, metric_fetchers, self._create_method ) sender = self._channel.new_sender() @@ -402,7 +415,7 @@ def __str__(self) -> str: def new_receiver( self, name: str | None = None, max_size: int = 50 - ) -> Receiver[Sample[QuantityT]]: + ) -> Receiver[Sample[SupportsFloatT]]: """Create a new receiver that streams the output of the formula engine. Args: @@ -434,7 +447,7 @@ class FormulaEngine3Phase( _ComposableFormulaEngine[ "FormulaEngine3Phase", # type: ignore[type-arg] "HigherOrderFormulaBuilder3Phase", # type: ignore[type-arg] - QuantityT, + SupportsFloatT, ] ): """A @@ -488,11 +501,11 @@ class FormulaEngine3Phase( def __init__( self, name: str, - create_method: Callable[[float], QuantityT], + create_method: Callable[[float], SupportsFloatT], phase_streams: tuple[ - FormulaEngine[QuantityT], - FormulaEngine[QuantityT], - FormulaEngine[QuantityT], + FormulaEngine[SupportsFloatT], + FormulaEngine[SupportsFloatT], + FormulaEngine[SupportsFloatT], ], ) -> None: """Create a `FormulaEngine3Phase` instance. @@ -507,12 +520,12 @@ def __init__( self._higher_order_builder = HigherOrderFormulaBuilder3Phase self._name: str = name self._create_method = create_method - self._channel: Broadcast[Sample3Phase[QuantityT]] = Broadcast(self._name) + self._channel: Broadcast[Sample3Phase[SupportsFloatT]] = Broadcast(self._name) self._task: asyncio.Task[None] | None = None self._streams: tuple[ - FormulaEngine[QuantityT], - FormulaEngine[QuantityT], - FormulaEngine[QuantityT], + FormulaEngine[SupportsFloatT], + FormulaEngine[SupportsFloatT], + FormulaEngine[SupportsFloatT], ] = phase_streams async def _run(self) -> None: @@ -540,7 +553,7 @@ async def _run(self) -> None: def new_receiver( self, name: str | None = None, max_size: int = 50 - ) -> Receiver[Sample3Phase[QuantityT]]: + ) -> Receiver[Sample3Phase[SupportsFloatT]]: """Create a new receiver that streams the output of the formula engine. Args: @@ -556,7 +569,7 @@ def new_receiver( return self._channel.new_receiver(name, max_size) -class FormulaBuilder(Generic[QuantityT]): +class FormulaBuilder(Generic[SupportsFloatInputT, SupportsFloatOutputT]): """Builds a post-fix formula engine that operates on `Sample` receivers. Operators and metrics need to be pushed in in-fix order, and they get rearranged @@ -584,7 +597,9 @@ class FormulaBuilder(Generic[QuantityT]): add the values and return the result. """ - def __init__(self, name: str, create_method: Callable[[float], QuantityT]) -> None: + def __init__( + self, name: str, create_method: Callable[[float], SupportsFloatOutputT] + ) -> None: """Create a `FormulaBuilder` instance. Args: @@ -594,10 +609,10 @@ def __init__(self, name: str, create_method: Callable[[float], QuantityT]) -> No `Power.from_watts`, for example. """ self._name = name - self._create_method: Callable[[float], QuantityT] = create_method + self._create_method: Callable[[float], SupportsFloatOutputT] = create_method self._build_stack: list[FormulaStep] = [] self._steps: list[FormulaStep] = [] - self._metric_fetchers: dict[str, MetricFetcher[QuantityT]] = {} + self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]] = {} def push_oper(self, oper: str) -> None: # pylint: disable=too-many-branches """Push an operator into the engine. @@ -641,7 +656,7 @@ def push_oper(self, oper: str) -> None: # pylint: disable=too-many-branches def push_metric( self, name: str, - data_stream: Receiver[Sample[QuantityT]], + data_stream: Receiver[Sample[SupportsFloatInputT]], *, nones_are_zeros: bool, ) -> None: @@ -733,7 +748,7 @@ async def subscribe(self) -> None: def finalize( self, - ) -> tuple[list[FormulaStep], dict[str, MetricFetcher[QuantityT]]]: + ) -> tuple[list[FormulaStep], dict[str, MetricFetcher[SupportsFloatInputT]]]: """Finalize and return the steps and fetchers for the formula. Returns: @@ -753,7 +768,7 @@ def __str__(self) -> str: steps = self._steps if len(self._steps) > 0 else self._build_stack return format_formula(steps) - def build(self) -> FormulaEngine[QuantityT]: + def build(self) -> FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]: """Create a formula engine with the steps and fetchers that have been pushed. Returns: @@ -763,13 +778,16 @@ def build(self) -> FormulaEngine[QuantityT]: return FormulaEngine(self, create_method=self._create_method) -class _BaseHOFormulaBuilder(ABC, Generic[QuantityT]): +class _BaseHOFormulaBuilder(ABC, Generic[SupportsFloatInputT, SupportsFloatOutputT]): """Provides a way to build formulas from the outputs of other formulas.""" def __init__( self, - engine: FormulaEngine[QuantityT] | FormulaEngine3Phase[QuantityT], - create_method: Callable[[float], QuantityT], + engine: ( + FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT] + | FormulaEngine3Phase[SupportsFloatT] + ), + create_method: Callable[[float], SupportsFloatOutputT], ) -> None: """Create a `GenericHigherOrderFormulaBuilder` instance. @@ -783,31 +801,31 @@ def __init__( self._steps: deque[ tuple[ TokenType, - FormulaEngine[QuantityT] - | FormulaEngine3Phase[QuantityT] - | QuantityT + FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT] + | FormulaEngine3Phase[SupportsFloatT] + | Quantity | float | str, ] ] = deque() self._steps.append((TokenType.COMPONENT_METRIC, engine)) - self._create_method: Callable[[float], QuantityT] = create_method + self._create_method: Callable[[float], SupportsFloatOutputT] = create_method @overload def _push( self, oper: str, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT]: ... @overload def _push( - self, oper: str, other: _CompositionType3Phase | QuantityT | float - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + self, oper: str, other: _CompositionType3Phase | SupportsFloatT | float + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def _push( - self, oper: str, other: _CompositionType | QuantityT | float + self, oper: str, other: _CompositionType | SupportsFloatT | float ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): self._steps.appendleft((TokenType.OPER, "(")) self._steps.append((TokenType.OPER, ")")) @@ -843,18 +861,18 @@ def _push( @overload def __add__( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def __add__( - self, other: _CompositionType3Phase | QuantityT - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + self, other: _CompositionType3Phase | SupportsFloatT + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def __add__( - self, other: _CompositionType | QuantityT + self, other: _CompositionType | SupportsFloatT ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that adds (data in) `other` to `self`. @@ -871,19 +889,19 @@ def __add__( @overload def __sub__( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def __sub__( - self, other: _CompositionType3Phase | QuantityT - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + self, other: _CompositionType3Phase | SupportsFloatT + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def __sub__( self, - other: _CompositionType | QuantityT, + other: _CompositionType | SupportsFloatT, ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that subtracts (data in) `other` from `self`. @@ -900,19 +918,19 @@ def __sub__( @overload def __mul__( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def __mul__( self, other: _CompositionType3Phase | float - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def __mul__( self, other: _CompositionType | float, ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that multiplies (data in) `self` with `other`. @@ -929,19 +947,19 @@ def __mul__( @overload def __truediv__( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def __truediv__( self, other: _CompositionType3Phase | float - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def __truediv__( self, other: _CompositionType | float, ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that divides (data in) `self` by `other`. @@ -958,18 +976,18 @@ def __truediv__( @overload def max( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def max( - self, other: _CompositionType3Phase | QuantityT - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + self, other: _CompositionType3Phase | SupportsFloatT + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def max( - self, other: _CompositionType | QuantityT + self, other: _CompositionType | SupportsFloatT ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that calculates the maximum of `self` and `other`. @@ -986,18 +1004,18 @@ def max( @overload def min( self, other: _CompositionType1Phase - ) -> HigherOrderFormulaBuilder[QuantityT]: ... + ) -> HigherOrderFormulaBuilder[SupportsFloatT]: ... @overload def min( - self, other: _CompositionType3Phase | QuantityT - ) -> HigherOrderFormulaBuilder3Phase[QuantityT]: ... + self, other: _CompositionType3Phase | SupportsFloatT + ) -> HigherOrderFormulaBuilder3Phase[SupportsFloatT]: ... def min( - self, other: _CompositionType | QuantityT + self, other: _CompositionType | SupportsFloatT ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Return a formula builder that calculates the minimum of `self` and `other`. @@ -1014,8 +1032,8 @@ def min( def consumption( self, ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Apply the Consumption Operator. @@ -1037,8 +1055,8 @@ def consumption( def production( self, ) -> ( - HigherOrderFormulaBuilder[QuantityT] - | HigherOrderFormulaBuilder3Phase[QuantityT] + HigherOrderFormulaBuilder[SupportsFloatT] + | HigherOrderFormulaBuilder3Phase[SupportsFloatT] ): """Apply the Production Operator. @@ -1058,12 +1076,14 @@ def production( return self -class HigherOrderFormulaBuilder(Generic[QuantityT], _BaseHOFormulaBuilder[QuantityT]): +class HigherOrderFormulaBuilder( + _BaseHOFormulaBuilder[SupportsFloatInputT, SupportsFloatT] +): """A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver`.""" def build( self, name: str, *, nones_are_zeros: bool = False - ) -> FormulaEngine[QuantityT]: + ) -> FormulaEngine[SupportsFloatInputT, SupportsFloatT]: """Build a `FormulaEngine` instance from the builder. Args: @@ -1089,19 +1109,19 @@ def build( elif typ == TokenType.CONSTANT: assert isinstance(value, (Quantity, float)) builder.push_constant( - value.base_value if isinstance(value, Quantity) else value + float(value) if isinstance(value, Quantity) else value ) return builder.build() class HigherOrderFormulaBuilder3Phase( - Generic[QuantityT], _BaseHOFormulaBuilder[QuantityT] + _BaseHOFormulaBuilder[SupportsFloatInputT, SupportsFloatT] ): """A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver3Phase`.""" def build( self, name: str, *, nones_are_zeros: bool = False - ) -> FormulaEngine3Phase[QuantityT]: + ) -> FormulaEngine3Phase[SupportsFloatT]: """Build a `FormulaEngine3Phase` instance from the builder. Args: diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py index 2ecfe21e6..090a36dee 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py @@ -5,12 +5,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable, cast from frequenz.channels import Sender from ...microgrid.component import ComponentMetricId -from .._quantities import Current, Power, Quantity +from .._quantities import Current, Power, Quantity, SupportsFloatT from ._formula_generators._formula_generator import ( FormulaGenerator, FormulaGeneratorConfig, @@ -59,14 +59,16 @@ def from_string( formula: str, component_metric_id: ComponentMetricId, *, + value_constructor: Callable[[float], SupportsFloatT], nones_are_zeros: bool = False, - ) -> FormulaEngine[Quantity]: + ) -> FormulaEngine[SupportsFloatT]: """Get a receiver for a manual formula. Args: formula: formula to execute. component_metric_id: The metric ID to use when fetching receivers from the resampling actor. + value_constructor: A function to create new values with. nones_are_zeros: Whether to treat None values from the stream as 0s. If False, the returned value will be a None. @@ -75,18 +77,22 @@ def from_string( """ channel_key = formula + component_metric_id.value if channel_key in self._string_engines: - return self._string_engines[channel_key] + return cast( + FormulaEngine[SupportsFloatT], self._string_engines[channel_key] + ) - builder = ResampledFormulaBuilder( + builder: ResampledFormulaBuilder[SupportsFloatT] = ResampledFormulaBuilder( self._namespace, formula, self._channel_registry, self._resampler_subscription_sender, component_metric_id, - Quantity, + value_constructor, ) formula_engine = builder.from_string(formula, nones_are_zeros=nones_are_zeros) - self._string_engines[channel_key] = formula_engine + self._string_engines[channel_key] = cast( + FormulaEngine[Quantity], formula_engine + ) return formula_engine diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py index 3e32f9d4f..ed2eb68b2 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py @@ -7,24 +7,33 @@ from collections.abc import Callable from datetime import datetime from math import isinf, isnan -from typing import Generic +from typing import Generic, SupportsFloat, TypeVar -from .. import Sample -from .._quantities import QuantityT +from .._base_types import Sample, SupportsFloatT from ._formula_steps import FormulaStep, MetricFetcher +SupportsFloatInputT = TypeVar("SupportsFloatInputT", bound=SupportsFloat) +"""Type variable for inputs that support conversion to float.""" -class FormulaEvaluator(Generic[QuantityT]): - """A post-fix formula evaluator that operates on `Sample` receivers.""" +SupportsFloatOutputT = TypeVar("SupportsFloatOutputT", bound=SupportsFloat) +"""Type variable for outputs that support conversion to float.""" + + +class FormulaEvaluator(Generic[SupportsFloatInputT, SupportsFloatOutputT]): + """A post-fix formula evaluator that operates on `Sample` receivers. + + This formula evaluator takes [`float`][] samples as input and produces + `SupportFloatT` samples. + """ def __init__( self, name: str, steps: list[FormulaStep], - metric_fetchers: dict[str, MetricFetcher[QuantityT]], - create_method: Callable[[float], QuantityT], + metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]], + create_method: Callable[[float], SupportsFloatOutputT], ) -> None: - """Create a `FormulaEngine` instance. + """Initialize this instance. Args: name: A name for the formula. @@ -36,12 +45,14 @@ def __init__( """ self._name = name self._steps = steps - self._metric_fetchers: dict[str, MetricFetcher[QuantityT]] = metric_fetchers + self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]] = ( + metric_fetchers + ) self._first_run = True - self._create_method: Callable[[float], QuantityT] = create_method + self._create_method: Callable[[float], SupportsFloatOutputT] = create_method async def _synchronize_metric_timestamps( - self, metrics: set[asyncio.Task[Sample[QuantityT] | None]] + self, metrics: set[asyncio.Task[Sample[SupportsFloatInputT] | None]] ) -> datetime: """Synchronize the metric streams. @@ -88,7 +99,7 @@ async def _synchronize_metric_timestamps( self._first_run = False return latest_ts - async def apply(self) -> Sample[QuantityT]: + async def apply(self) -> Sample[SupportsFloatOutputT]: """Fetch the latest metrics, apply the formula once and return the result. Returns: diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py index 33c1c4b06..ac8f1a968 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py @@ -16,7 +16,7 @@ from ....microgrid import component, connection_manager from ....microgrid.component import ComponentMetricId -from ..._quantities import QuantityT +from ..._base_types import SupportsFloatT from .._formula_engine import FormulaEngine, FormulaEngine3Phase from .._resampled_formula_builder import ResampledFormulaBuilder @@ -52,7 +52,7 @@ class FormulaGeneratorConfig: """The component IDs to use for generating the formula.""" -class FormulaGenerator(ABC, Generic[QuantityT]): +class FormulaGenerator(ABC, Generic[SupportsFloatT]): """A class for generating formulas from the component graph.""" def __init__( @@ -83,8 +83,8 @@ def _get_builder( self, name: str, component_metric_id: ComponentMetricId, - create_method: Callable[[float], QuantityT], - ) -> ResampledFormulaBuilder[QuantityT]: + create_method: Callable[[float], SupportsFloatT], + ) -> ResampledFormulaBuilder[SupportsFloatT]: builder = ResampledFormulaBuilder( self._namespace, name, @@ -140,5 +140,5 @@ def _get_grid_component_successors(self) -> set[component.Component]: @abstractmethod def generate( self, - ) -> FormulaEngine[QuantityT] | FormulaEngine3Phase[QuantityT]: + ) -> FormulaEngine[SupportsFloatT] | FormulaEngine3Phase[SupportsFloatT]: """Generate a formula engine, based on the component graph.""" diff --git a/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py b/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py index ec37ceb3c..f26086541 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py @@ -7,12 +7,14 @@ import math from abc import ABC, abstractmethod -from typing import Generic +from typing import Generic, SupportsFloat, TypeVar from frequenz.channels import Receiver from .. import Sample -from .._quantities import QuantityT + +SupportsFloatT = TypeVar("SupportsFloatT", bound=SupportsFloat) +"""Type variable for types that support conversion to float.""" class FormulaStep(ABC): @@ -343,13 +345,13 @@ def apply(self, eval_stack: list[float]) -> None: eval_stack.append(val) -class MetricFetcher(Generic[QuantityT], FormulaStep): +class MetricFetcher(Generic[SupportsFloatT], FormulaStep): """A formula step for fetching a value from a metric Receiver.""" def __init__( self, name: str, - stream: Receiver[Sample[QuantityT]], + stream: Receiver[Sample[SupportsFloatT]], *, nones_are_zeros: bool, ) -> None: @@ -361,12 +363,12 @@ def __init__( nones_are_zeros: Whether to treat None values from the stream as 0s. """ self._name = name - self._stream: Receiver[Sample[QuantityT]] = stream - self._next_value: Sample[QuantityT] | None = None + self._stream: Receiver[Sample[SupportsFloatT]] = stream + self._next_value: Sample[SupportsFloatT] | None = None self._nones_are_zeros = nones_are_zeros @property - def stream(self) -> Receiver[Sample[QuantityT]]: + def stream(self) -> Receiver[Sample[SupportsFloatT]]: """Return the stream from which to fetch values. Returns: @@ -382,7 +384,7 @@ def stream_name(self) -> str: """ return str(self._stream.__doc__) - async def fetch_next(self) -> Sample[QuantityT] | None: + async def fetch_next(self) -> Sample[SupportsFloatT] | None: """Fetch the next value from the stream. To be called before each call to `apply`. @@ -394,7 +396,7 @@ async def fetch_next(self) -> Sample[QuantityT] | None: return self._next_value @property - def value(self) -> Sample[QuantityT] | None: + def value(self) -> Sample[SupportsFloatT] | None: """Get the next value in the stream. Returns: @@ -423,10 +425,10 @@ def apply(self, eval_stack: list[float]) -> None: raise RuntimeError("No next value available to append.") next_value = self._next_value.value - if next_value is None or next_value.isnan() or next_value.isinf(): + if next_value is None or math.isnan(next_value) or math.isinf(next_value): if self._nones_are_zeros: eval_stack.append(0.0) else: eval_stack.append(math.nan) else: - eval_stack.append(next_value.base_value) + eval_stack.append(float(next_value)) diff --git a/src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py b/src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py index 792615b10..b9354e7cf 100644 --- a/src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py +++ b/src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py @@ -6,13 +6,12 @@ from __future__ import annotations from collections.abc import Callable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, get_type_hints from frequenz.channels import Receiver, Sender from ...microgrid.component import ComponentMetricId -from .. import Sample -from .._quantities import Quantity, QuantityT +from .._base_types import Sample, SupportsFloatT from ._formula_engine import FormulaBuilder, FormulaEngine from ._tokenizer import Tokenizer, TokenType @@ -21,7 +20,7 @@ from ...actor import ChannelRegistry, ComponentMetricRequest -class ResampledFormulaBuilder(FormulaBuilder[QuantityT]): +class ResampledFormulaBuilder(FormulaBuilder[SupportsFloatT]): """Provides a way to build a FormulaEngine from resampled data streams.""" def __init__( # pylint: disable=too-many-arguments @@ -31,7 +30,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], metric_id: ComponentMetricId, - create_method: Callable[[float], QuantityT], + create_method: Callable[[float], SupportsFloatT], ) -> None: """Create a `ResampledFormulaBuilder` instance. @@ -55,11 +54,20 @@ def __init__( # pylint: disable=too-many-arguments self._namespace: str = namespace self._metric_id: ComponentMetricId = metric_id self._resampler_requests: list[ComponentMetricRequest] = [] - super().__init__(formula_name, create_method) # type: ignore[arg-type] + # We need to store the runtime value type of the formula, so that we can + # create the correct channel in the channel registry, as we need to pass the + # runtime type to the channel registry. + # Since invoking the function seems to be the only reliable way to do this + # (trying to get it from the type hints doesn't work because usually `Self` + # is used as the return type), we do it only once in the constructor to avoid + # unnecessary runtime cost. + self._value_type = type(create_method(0.0)) + + super().__init__(formula_name, create_method) def _get_resampled_receiver( self, component_id: int, metric_id: ComponentMetricId - ) -> Receiver[Sample[QuantityT]]: + ) -> Receiver[Sample[SupportsFloatT]]: """Get a receiver with the resampled data for the given component id. Args: @@ -75,13 +83,13 @@ def _get_resampled_receiver( request = ComponentMetricRequest(self._namespace, component_id, metric_id, None) self._resampler_requests.append(request) resampled_channel = self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() + Sample[float], request.get_channel_name() ) resampled_receiver = resampled_channel.new_receiver().map( lambda sample: Sample( sample.timestamp, ( - self._create_method(sample.value.base_value) + self._create_method(float(sample.value)) if sample.value is not None else None ), @@ -112,7 +120,7 @@ def from_string( formula: str, *, nones_are_zeros: bool, - ) -> FormulaEngine[QuantityT]: + ) -> FormulaEngine[SupportsFloatT]: """Construct a `FormulaEngine` from the given formula string. Formulas can have Component IDs that are preceeded by a pound symbol("#"), and diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index 41122b083..05ac66a9d 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -5,12 +5,13 @@ import uuid +from typing import Callable from frequenz.channels import Sender from ...actor import ChannelRegistry, ComponentMetricRequest from ...microgrid.component import ComponentMetricId -from .._quantities import Power, Quantity +from .._quantities import Power, SupportsFloatT from ..formula_engine import FormulaEngine from ..formula_engine._formula_engine_pool import FormulaEnginePool from ..formula_engine._formula_generators import CHPPowerFormula, PVPowerFormula @@ -99,8 +100,9 @@ def start_formula( formula: str, component_metric_id: ComponentMetricId, *, + value_constructor: Callable[[float], SupportsFloatT], nones_are_zeros: bool = False, - ) -> FormulaEngine[Quantity]: + ) -> FormulaEngine[SupportsFloatT]: """Start execution of the given formula. Formulas can have Component IDs that are preceeded by a pound symbol("#"), and @@ -113,6 +115,7 @@ def start_formula( formula: formula to execute. component_metric_id: The metric ID to use when fetching receivers from the resampling actor. + value_constructor: A function to create new values with. nones_are_zeros: Whether to treat None values from the stream as 0s. If False, the returned value will be a None. @@ -120,7 +123,10 @@ def start_formula( A FormulaEngine that applies the formula and streams values. """ return self._formula_pool.from_string( - formula, component_metric_id, nones_are_zeros=nones_are_zeros + formula, + component_metric_id, + value_constructor=value_constructor, + nones_are_zeros=nones_are_zeros, ) @property diff --git a/tests/actor/power_distributing/test_power_distributing.py b/tests/actor/power_distributing/test_power_distributing.py index 6de6e50b6..8e4052e76 100644 --- a/tests/actor/power_distributing/test_power_distributing.py +++ b/tests/actor/power_distributing/test_power_distributing.py @@ -19,7 +19,7 @@ from pytest_mock import MockerFixture from frequenz.sdk import microgrid -from frequenz.sdk.actor import ResamplerConfig +from frequenz.sdk.actor import ResamplingActorConfig from frequenz.sdk.actor.power_distributing import ( ComponentPoolStatus, PowerDistributingActor, @@ -90,7 +90,7 @@ async def new( if microgrid._data_pipeline._DATA_PIPELINE is not None: microgrid._data_pipeline._DATA_PIPELINE = None await microgrid._data_pipeline.initialize( - ResamplerConfig(resampling_period=timedelta(seconds=0.1)) + ResamplingActorConfig(resampling_period=timedelta(seconds=0.1)) ) streamer = MockComponentDataStreamer(mockgrid.mock_client) diff --git a/tests/actor/test_data_sourcing.py b/tests/actor/test_data_sourcing.py index 491fe5780..7329e4705 100644 --- a/tests/actor/test_data_sourcing.py +++ b/tests/actor/test_data_sourcing.py @@ -13,7 +13,7 @@ ) from frequenz.sdk.microgrid import connection_manager from frequenz.sdk.microgrid.component import ComponentMetricId -from frequenz.sdk.timeseries import Quantity, Sample +from frequenz.sdk.timeseries import Sample from tests.microgrid import mock_api # pylint: disable=no-member @@ -61,7 +61,7 @@ async def test_data_sourcing_actor(self) -> None: "test-namespace", 4, ComponentMetricId.ACTIVE_POWER, None ) active_power_recv = registry.get_or_create( - Sample[Quantity], active_power_request.get_channel_name() + Sample[float], active_power_request.get_channel_name() ).new_receiver() await req_sender.send(active_power_request) @@ -69,7 +69,7 @@ async def test_data_sourcing_actor(self) -> None: "test-namespace", 9, ComponentMetricId.SOC, None ) soc_recv = registry.get_or_create( - Sample[Quantity], soc_request.get_channel_name() + Sample[float], soc_request.get_channel_name() ).new_receiver() await req_sender.send(soc_request) @@ -77,22 +77,22 @@ async def test_data_sourcing_actor(self) -> None: "test-namespace", 9, ComponentMetricId.SOC, None ) soc2_recv = registry.get_or_create( - Sample[Quantity], soc2_request.get_channel_name() + Sample[float], soc2_request.get_channel_name() ).new_receiver() await req_sender.send(soc2_request) for _ in range(3): sample = await soc_recv.receive() assert sample.value is not None - assert 9.0 == sample.value.base_value + assert 9.0 == sample.value sample = await soc2_recv.receive() assert sample.value is not None - assert 9.0 == sample.value.base_value + assert 9.0 == sample.value sample = await active_power_recv.receive() assert sample.value is not None - assert 100.0 == sample.value.base_value + assert 100.0 == sample.value assert await server.graceful_shutdown() connection_manager._CONNECTION_MANAGER = ( # pylint: disable=protected-access diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 43f2164a0..941692b99 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -16,11 +16,10 @@ ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, - ResamplerConfig, + ResamplingActorConfig, ) from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._quantities import Quantity # pylint: disable=too-many-locals,redefined-outer-name # @@ -47,10 +46,10 @@ async def _assert_resampling_works( data_source_chan_name: str, ) -> None: timeseries_receiver = channel_registry.get_or_create( - Sample[Quantity], resampling_chan_name + Sample[float], resampling_chan_name ).new_receiver() timeseries_sender = channel_registry.get_or_create( - Sample[Quantity], data_source_chan_name + Sample[float], data_source_chan_name ).new_sender() fake_time.shift(0.2) @@ -58,37 +57,37 @@ async def _assert_resampling_works( assert new_sample == Sample(_now(), None) fake_time.shift(0.1) - sample = Sample(_now(), Quantity(3)) # ts = 0.3s + sample = Sample(_now(), 3.0) # ts = 0.3s await timeseries_sender.send(sample) fake_time.shift(0.1) new_sample = await timeseries_receiver.receive() # At 0.4s (timer) assert new_sample is not None and new_sample.value is not None - assert new_sample.value.base_value == 3 + assert new_sample.value == 3.0 assert new_sample.timestamp >= sample.timestamp assert new_sample.timestamp == _now() fake_time.shift(0.05) - sample = Sample(_now(), Quantity(4)) # ts = 0.45s + sample = Sample(_now(), 4.0) # ts = 0.45s await timeseries_sender.send(sample) fake_time.shift(0.15) new_sample = await timeseries_receiver.receive() # At 0.6s (timer) assert new_sample is not None and new_sample.value is not None - assert new_sample.value.base_value == 3.5 # avg(3, 4) + assert new_sample.value == 3.5 # avg(3, 4) assert new_sample.timestamp >= sample.timestamp assert new_sample.timestamp == _now() fake_time.shift(0.05) - await timeseries_sender.send(Sample(_now(), Quantity(8))) # ts = 0.65s + await timeseries_sender.send(Sample(_now(), 8.0)) # ts = 0.65s fake_time.shift(0.05) - await timeseries_sender.send(Sample(_now(), Quantity(1))) # ts = 0.7s + await timeseries_sender.send(Sample(_now(), 1.0)) # ts = 0.7s fake_time.shift(0.05) - sample = Sample(_now(), Quantity(9)) # ts = 0.75s + sample = Sample(_now(), 9.0) # ts = 0.75s await timeseries_sender.send(sample) fake_time.shift(0.05) new_sample = await timeseries_receiver.receive() # At 0.8s (timer) assert new_sample is not None and new_sample.value is not None - assert new_sample.value.base_value == 5.5 # avg(4, 8, 1, 9) + assert new_sample.value == 5.5 # avg(4, 8, 1, 9) assert new_sample.timestamp >= sample.timestamp assert new_sample.timestamp == _now() @@ -96,7 +95,7 @@ async def _assert_resampling_works( fake_time.shift(0.2) new_sample = await timeseries_receiver.receive() # At 1.0s (timer) assert new_sample is not None and new_sample.value is not None - assert new_sample.value.base_value == 6 # avg(8, 1, 9) + assert new_sample.value == 6.0 # avg(8, 1, 9) assert new_sample.timestamp >= sample.timestamp assert new_sample.timestamp == _now() @@ -122,7 +121,7 @@ async def test_single_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - config=ResamplerConfig( + config=ResamplingActorConfig( resampling_period=timedelta(seconds=0.2), max_data_age_in_periods=2, ), @@ -165,7 +164,7 @@ async def test_duplicate_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - config=ResamplerConfig( + config=ResamplingActorConfig( resampling_period=timedelta(seconds=0.2), max_data_age_in_periods=2, ), diff --git a/tests/microgrid/test_datapipeline.py b/tests/microgrid/test_datapipeline.py index 7763ff412..00c2ac8d2 100644 --- a/tests/microgrid/test_datapipeline.py +++ b/tests/microgrid/test_datapipeline.py @@ -12,10 +12,10 @@ import time_machine from pytest_mock import MockerFixture +from frequenz.sdk.actor import ResamplingActorConfig from frequenz.sdk.microgrid._data_pipeline import _DataPipeline from frequenz.sdk.microgrid.client import Connection from frequenz.sdk.microgrid.component import Component, ComponentCategory, InverterType -from frequenz.sdk.timeseries._resampling import ResamplerConfig from ..utils.mock_microgrid_client import MockMicrogridClient @@ -34,7 +34,7 @@ async def test_actors_started( ) -> None: """Test that the datasourcing, resampling and power distributing actors are started.""" datapipeline = _DataPipeline( - resampler_config=ResamplerConfig(resampling_period=timedelta(seconds=1)) + resampler_config=ResamplingActorConfig(resampling_period=timedelta(seconds=1)) ) await asyncio.sleep(1) diff --git a/tests/microgrid/test_grid.py b/tests/microgrid/test_grid.py index 51f5550e2..9637b9805 100644 --- a/tests/microgrid/test_grid.py +++ b/tests/microgrid/test_grid.py @@ -16,7 +16,7 @@ ComponentMetricId, GridMetadata, ) -from frequenz.sdk.timeseries import Current, Fuse, Power, Quantity +from frequenz.sdk.timeseries import Current, Fuse, Power from ..timeseries._formula_engine.utils import equal_float_lists, get_resampled_stream from ..timeseries.mock_microgrid import MockMicrogrid @@ -123,6 +123,7 @@ async def test_grid_power_1(mocker: MockerFixture) -> None: grid_power_recv = grid.power.new_receiver() + # TODO: REMOVE THIS and validate the test against hardcoded values grid_meter_recv = get_resampled_stream( grid._formula_pool._namespace, # pylint: disable=protected-access mockgrid.meter_ids[0], @@ -157,8 +158,8 @@ async def test_grid_power_2(mocker: MockerFixture) -> None: mockgrid.add_batteries(1, no_meter=True) mockgrid.add_solar_inverters(1) - results: list[Quantity] = [] - meter_sums: list[Quantity] = [] + results: list[float] = [] + meter_sums: list[float] = [] async with mockgrid, AsyncExitStack() as stack: grid = microgrid.grid() assert grid, "Grid is not initialized" @@ -195,8 +196,8 @@ async def test_grid_power_2(mocker: MockerFixture) -> None: val = await grid_power_recv.receive() assert val is not None and val.value is not None - results.append(val.value) - meter_sums.append(Quantity(meter_sum)) + results.append(float(val.value)) + meter_sums.append(meter_sum) assert len(results) == 10 assert equal_float_lists(results, meter_sums) diff --git a/tests/timeseries/_battery_pool/test_battery_pool.py b/tests/timeseries/_battery_pool/test_battery_pool.py index 625223a93..76563c491 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool.py +++ b/tests/timeseries/_battery_pool/test_battery_pool.py @@ -26,7 +26,7 @@ MAX_BATTERY_DATA_AGE_SEC, WAIT_FOR_COMPONENT_DATA_SEC, ) -from frequenz.sdk.actor import ResamplerConfig +from frequenz.sdk.actor import ResamplingActorConfig from frequenz.sdk.actor.power_distributing import ComponentPoolStatus from frequenz.sdk.actor.power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, @@ -146,7 +146,7 @@ async def setup_all_batteries(mocker: MockerFixture) -> AsyncIterator[SetupArgs] # pylint: disable=protected-access microgrid._data_pipeline._DATA_PIPELINE = None await microgrid._data_pipeline.initialize( - ResamplerConfig(resampling_period=timedelta(seconds=min_update_interval)) + ResamplingActorConfig(resampling_period=timedelta(seconds=min_update_interval)) ) streamer = MockComponentDataStreamer(mock_microgrid) @@ -198,7 +198,7 @@ async def setup_batteries_pool(mocker: MockerFixture) -> AsyncIterator[SetupArgs # pylint: disable=protected-access microgrid._data_pipeline._DATA_PIPELINE = None await microgrid._data_pipeline.initialize( - ResamplerConfig(resampling_period=timedelta(seconds=min_update_interval)) + ResamplingActorConfig(resampling_period=timedelta(seconds=min_update_interval)) ) # We don't use status channel from the sdk interface to limit diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index ba7c76e4d..adebdda24 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -14,7 +14,7 @@ from pytest_mock import MockerFixture from frequenz.sdk import microgrid, timeseries -from frequenz.sdk.actor import ResamplerConfig, power_distributing +from frequenz.sdk.actor import ResamplingActorConfig, power_distributing from frequenz.sdk.actor.power_distributing import ComponentPoolStatus from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( ComponentPoolStatusTracker, @@ -56,7 +56,7 @@ async def mocks(mocker: MockerFixture) -> typing.AsyncIterator[Mocks]: if microgrid._data_pipeline._DATA_PIPELINE is not None: microgrid._data_pipeline._DATA_PIPELINE = None await microgrid._data_pipeline.initialize( - ResamplerConfig(resampling_period=timedelta(seconds=0.1)) + ResamplingActorConfig(resampling_period=timedelta(seconds=0.1)) ) streamer = MockComponentDataStreamer(mockgrid.mock_client) diff --git a/tests/timeseries/_formula_engine/utils.py b/tests/timeseries/_formula_engine/utils.py index 921403ef6..2dc6661bb 100644 --- a/tests/timeseries/_formula_engine/utils.py +++ b/tests/timeseries/_formula_engine/utils.py @@ -12,7 +12,7 @@ from frequenz.sdk.microgrid import _data_pipeline from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._quantities import QuantityT +from frequenz.sdk.timeseries._quantities import SupportsFloatT from frequenz.sdk.timeseries.formula_engine._resampled_formula_builder import ( ResampledFormulaBuilder, ) @@ -22,8 +22,8 @@ def get_resampled_stream( namespace: str, comp_id: int, metric_id: ComponentMetricId, - create_method: Callable[[float], QuantityT], -) -> Receiver[Sample[QuantityT]]: + create_method: Callable[[float], SupportsFloatT], +) -> Receiver[Sample[SupportsFloatT]]: """Return the resampled data stream for the given component.""" # Create a `FormulaBuilder` instance, just in order to reuse its # `_get_resampled_receiver` function implementation. @@ -37,24 +37,17 @@ def get_resampled_stream( metric_id, create_method, ) - # Resampled data is always `Quantity` type, so we need to convert it to the desired - # output type. return builder._get_resampled_receiver( comp_id, metric_id, - ).map( - lambda sample: Sample( - sample.timestamp, - None if sample.value is None else create_method(sample.value.base_value), - ) ) # pylint: enable=protected-access -def equal_float_lists(list1: list[QuantityT], list2: list[QuantityT]) -> bool: +def equal_float_lists(list1: list[SupportsFloatT], list2: list[SupportsFloatT]) -> bool: """Compare two float lists with `math.isclose()`.""" return ( len(list1) > 0 and len(list1) == len(list2) - and all(isclose(v1.base_value, v2.base_value) for v1, v2 in zip(list1, list2)) + and all(isclose(v1, v2) for v1, v2 in zip(list1, list2)) ) diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index 983758242..c4e35925d 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -14,7 +14,7 @@ from frequenz.sdk import microgrid from frequenz.sdk._internal._asyncio import cancel_and_await -from frequenz.sdk.actor import ResamplerConfig +from frequenz.sdk.actor import ResamplingActorConfig from frequenz.sdk.microgrid import _data_pipeline from frequenz.sdk.microgrid.client import Connection from frequenz.sdk.microgrid.component import ( @@ -195,7 +195,7 @@ async def start(self, mocker: MockerFixture | None = None) -> None: self.init_mock_client(lambda mock_client: mock_client.initialize(local_mocker)) self.mock_resampler = MockResampler( mocker, - ResamplerConfig(timedelta(seconds=self._sample_rate_s)), + ResamplingActorConfig(timedelta(seconds=self._sample_rate_s)), bat_inverter_ids=self.battery_inverter_ids, pv_inverter_ids=self.pv_inverter_ids, evc_ids=self.evc_ids, diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 5d19ab0eb..49592b682 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -12,11 +12,10 @@ from pytest_mock import MockerFixture from frequenz.sdk._internal._asyncio import cancel_and_await -from frequenz.sdk.actor import ComponentMetricRequest, ResamplerConfig +from frequenz.sdk.actor import ComponentMetricRequest, ResamplingActorConfig from frequenz.sdk.microgrid._data_pipeline import _DataPipeline from frequenz.sdk.microgrid.component import ComponentMetricId -from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._quantities import Quantity +from frequenz.sdk.timeseries import Current, Power, Sample from frequenz.sdk.timeseries.formula_engine._formula_generators._formula_generator import ( NON_EXISTING_COMPONENT_ID, ) @@ -30,7 +29,7 @@ class MockResampler: def __init__( # pylint: disable=too-many-arguments self, mocker: MockerFixture, - resampler_config: ResamplerConfig, + resampler_config: ResamplingActorConfig, bat_inverter_ids: list[int], pv_inverter_ids: list[int], evc_ids: list[int], @@ -45,22 +44,22 @@ def __init__( # pylint: disable=too-many-arguments self._resampler_request_channel = Broadcast[ComponentMetricRequest]( "resampler-request" ) - self._input_channels_receivers: dict[str, list[Receiver[Sample[Quantity]]]] = {} + self._input_channels_receivers: dict[str, list[Receiver[Sample[float]]]] = {} def power_senders( comp_ids: list[int], - ) -> list[Sender[Sample[Quantity]]]: - senders: list[Sender[Sample[Quantity]]] = [] + ) -> list[Sender[Sample[float]]]: + senders: list[Sender[Sample[float]]] = [] for comp_id in comp_ids: name = f"{comp_id}:{ComponentMetricId.ACTIVE_POWER}" senders.append( self._channel_registry.get_or_create( - Sample[Quantity], name + Sample[float], name ).new_sender() ) self._input_channels_receivers[name] = [ self._channel_registry.get_or_create( - Sample[Quantity], name + Sample[float], name ).new_receiver() for _ in range(namespaces) ] @@ -68,18 +67,18 @@ def power_senders( def frequency_senders( comp_ids: list[int], - ) -> list[Sender[Sample[Quantity]]]: - senders: list[Sender[Sample[Quantity]]] = [] + ) -> list[Sender[Sample[float]]]: + senders: list[Sender[Sample[float]]] = [] for comp_id in comp_ids: name = f"{comp_id}:{ComponentMetricId.FREQUENCY}" senders.append( self._channel_registry.get_or_create( - Sample[Quantity], name + Sample[float], name ).new_sender() ) self._input_channels_receivers[name] = [ self._channel_registry.get_or_create( - Sample[Quantity], name + Sample[float], name ).new_receiver(name) for _ in range(namespaces) ] @@ -99,8 +98,8 @@ def frequency_senders( def multi_phase_senders( ids: list[int], metrics: tuple[ComponentMetricId, ComponentMetricId, ComponentMetricId], - ) -> list[list[Sender[Sample[Quantity]]]]: - senders: list[list[Sender[Sample[Quantity]]]] = [] + ) -> list[list[Sender[Sample[float]]]]: + senders: list[list[Sender[Sample[float]]]] = [] for comp_id in ids: p1_name = f"{comp_id}:{metrics[0]}" p2_name = f"{comp_id}:{metrics[1]}" @@ -109,37 +108,39 @@ def multi_phase_senders( senders.append( [ self._channel_registry.get_or_create( - Sample[Quantity], p1_name + Sample[float], p1_name ).new_sender(), self._channel_registry.get_or_create( - Sample[Quantity], p2_name + Sample[float], p2_name ).new_sender(), self._channel_registry.get_or_create( - Sample[Quantity], p3_name + Sample[float], p3_name ).new_sender(), ] ) self._input_channels_receivers[p1_name] = [ self._channel_registry.get_or_create( - Sample[Quantity], p1_name + Sample[float], p1_name ).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p2_name] = [ self._channel_registry.get_or_create( - Sample[Quantity], p2_name + Sample[float], p2_name ).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p3_name] = [ self._channel_registry.get_or_create( - Sample[Quantity], p3_name + Sample[float], p3_name ).new_receiver() for _ in range(namespaces) ] return senders - def current_senders(ids: list[int]) -> list[list[Sender[Sample[Quantity]]]]: + def current_senders( + ids: list[int], + ) -> list[list[Sender[Sample[float]]]]: return multi_phase_senders( ids, ( @@ -149,7 +150,9 @@ def current_senders(ids: list[int]) -> list[list[Sender[Sample[Quantity]]]]: ), ) - def voltage_senders(ids: list[int]) -> list[list[Sender[Sample[Quantity]]]]: + def voltage_senders( + ids: list[int], + ) -> list[list[Sender[Sample[float]]]]: return multi_phase_senders( ids, ( @@ -212,7 +215,9 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]: return self._resampler_request_channel.new_sender() async def _channel_forward_messages( - self, receiver: Receiver[Sample[Quantity]], sender: Sender[Sample[Quantity]] + self, + receiver: Receiver[Sample[float]], + sender: Sender[Sample[float]], ) -> None: async for sample in receiver: await sender.send(sample) @@ -225,10 +230,8 @@ async def _handle_resampling_requests(self) -> None: input_chan_recv_name = f"{request.component_id}:{request.metric_id}" input_chan_recv = self._input_channels_receivers[input_chan_recv_name].pop() assert input_chan_recv is not None - output_chan_sender: Sender[Sample[Quantity]] = ( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() + output_chan_sender: Sender[Sample[float]] = ( + self._channel_registry.get_or_create(Sample[float], name).new_sender() ) task = asyncio.create_task( self._channel_forward_messages( @@ -245,11 +248,11 @@ def _done_callback(task: asyncio.Task[None]) -> None: task.add_done_callback(_done_callback) self._forward_tasks[name] = task - def make_sample(self, value: float | None) -> Sample[Quantity]: + def make_sample(self, value: float | None) -> Sample[float]: """Create a sample with the given value.""" return Sample( self._next_ts, - None if value is None or math.isnan(value) else Quantity(value), + None if value is None or math.isnan(value) else value, ) async def send_meter_power(self, values: list[float | None]) -> None: diff --git a/tests/timeseries/test_formula_engine.py b/tests/timeseries/test_formula_engine.py index f3a12deb6..4f5e8f3b2 100644 --- a/tests/timeseries/test_formula_engine.py +++ b/tests/timeseries/test_formula_engine.py @@ -10,7 +10,7 @@ from frequenz.channels import Broadcast, Receiver from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._quantities import Power, Quantity +from frequenz.sdk.timeseries._quantities import Power from frequenz.sdk.timeseries.formula_engine._formula_engine import ( FormulaBuilder, FormulaEngine, @@ -56,8 +56,8 @@ async def run_test( # pylint: disable=too-many-locals nones_are_zeros: bool = False, ) -> None: """Run a formula test.""" - channels: dict[str, Broadcast[Sample[Quantity]]] = {} - builder = FormulaBuilder("test_formula", Quantity) + channels: dict[str, Broadcast[Sample[float]]] = {} + builder = FormulaBuilder("test_formula", Power.from_watts) for token in Tokenizer(formula): if token.type == TokenType.COMPONENT_METRIC: if token.value not in channels: @@ -80,9 +80,7 @@ async def run_test( # pylint: disable=too-many-locals io_input, io_output = io_pair await asyncio.gather( *[ - chan.new_sender().send( - Sample(now, None if not value else Quantity(value)) - ) + chan.new_sender().send(Sample(now, None if not value else value)) for chan, value in zip(channels.values(), io_input) ] ) @@ -92,7 +90,7 @@ async def run_test( # pylint: disable=too-many-locals else: assert ( next_val.value is not None - and next_val.value.base_value == io_output + and Power.as_watts(next_val.value) == io_output ) tests_passed += 1 await engine._stop() # pylint: disable=protected-access @@ -301,17 +299,17 @@ class TestFormulaEngineComposition: """Tests for formula channels.""" def make_engine( - self, stream_id: int, data: Receiver[Sample[Quantity]] - ) -> FormulaEngine[Quantity]: + self, stream_id: int, data: Receiver[Sample[float]] + ) -> FormulaEngine[Power]: """Make a basic FormulaEngine.""" name = f"#{stream_id}" - builder = FormulaBuilder(name, create_method=Quantity) + builder = FormulaBuilder(name, create_method=Power.from_watts) builder.push_metric( name, data, nones_are_zeros=False, ) - return FormulaEngine(builder, create_method=Quantity) + return FormulaEngine(builder, create_method=Power.from_watts) async def run_test( # pylint: disable=too-many-locals self, @@ -319,40 +317,40 @@ async def run_test( # pylint: disable=too-many-locals make_builder: ( Callable[ [ - FormulaEngine[Quantity], + FormulaEngine[Power], ], - HigherOrderFormulaBuilder[Quantity], + HigherOrderFormulaBuilder[Power], ] | Callable[ [ - FormulaEngine[Quantity], - FormulaEngine[Quantity], + FormulaEngine[Power], + FormulaEngine[Power], ], - HigherOrderFormulaBuilder[Quantity], + HigherOrderFormulaBuilder[Power], ] | Callable[ [ - FormulaEngine[Quantity], - FormulaEngine[Quantity], - FormulaEngine[Quantity], + FormulaEngine[Power], + FormulaEngine[Power], + FormulaEngine[Power], ], - HigherOrderFormulaBuilder[Quantity], + HigherOrderFormulaBuilder[Power], ] | Callable[ [ - FormulaEngine[Quantity], - FormulaEngine[Quantity], - FormulaEngine[Quantity], - FormulaEngine[Quantity], + FormulaEngine[Power], + FormulaEngine[Power], + FormulaEngine[Power], + FormulaEngine[Power], ], - HigherOrderFormulaBuilder[Quantity], + HigherOrderFormulaBuilder[Power], ] ), io_pairs: list[tuple[list[float | None], float | None]], nones_are_zeros: bool = False, ) -> None: """Run a test with the specs provided.""" - channels = [Broadcast[Sample[Quantity]](str(ctr)) for ctr in range(num_items)] + channels = [Broadcast[Sample[float]](str(ctr)) for ctr in range(num_items)] l1_engines = [ self.make_engine(ctr, channels[ctr].new_receiver()) for ctr in range(num_items) @@ -367,9 +365,7 @@ async def run_test( # pylint: disable=too-many-locals io_input, io_output = io_pair await asyncio.gather( *[ - chan.new_sender().send( - Sample(now, None if not value else Quantity(value)) - ) + chan.new_sender().send(Sample(now, None if not value else value)) for chan, value in zip(channels, io_input) ] ) @@ -379,7 +375,7 @@ async def run_test( # pylint: disable=too-many-locals else: assert ( next_val.value is not None - and next_val.value.base_value == io_output + and Power.as_watts(next_val.value) == io_output ) tests_passed += 1 await engine._stop() # pylint: disable=protected-access diff --git a/tests/timeseries/test_quantities.py b/tests/timeseries/test_quantities.py index 040e433b1..a28145934 100644 --- a/tests/timeseries/test_quantities.py +++ b/tests/timeseries/test_quantities.py @@ -5,7 +5,7 @@ import inspect from datetime import timedelta -from typing import Callable +from typing import Callable, Self import hypothesis import pytest @@ -33,6 +33,18 @@ class Fz1( ): """Frequency quantity with narrow exponent unit map.""" + @classmethod + def new(cls, value: float, exponent: int = 0) -> Self: + """Create a frequency quantity from a value in Hz.""" + return cls._new(value, exponent=exponent) + + +assert Fz1._exponent_unit_map is not None + + +class Fz1Subclass(Fz1, exponent_unit_map=Fz1._exponent_unit_map): + """Subclass of Fz1.""" + class Fz2( Quantity, @@ -47,6 +59,11 @@ class Fz2( ): """Frequency quantity with broad exponent unit map.""" + @classmethod + def new(cls, value: float, exponent: int = 0) -> Self: + """Create a frequency quantity from a value in Hz.""" + return cls._new(value, exponent=exponent) + _CtorType = Callable[[float], Quantity] @@ -92,16 +109,20 @@ class Fz2( def test_zero() -> None: """Test the zero value for quantity.""" - assert Quantity(0.0) == Quantity.zero() - assert Quantity(0.0, exponent=100) == Quantity.zero() - assert Quantity.zero() is Quantity.zero() # It is a "singleton" - assert Quantity.zero().base_value == 0.0 + # Quantity can't be instantiated, even with zero() + with pytest.raises(TypeError): + Quantity.zero() + + assert Power.from_watts(0.0) == Power.zero() + assert Power.from_kilowatts(0.0) == Power.zero() + assert Power.zero() is Power.zero() # It is a "singleton" + assert float(Power.zero()) == 0.0 # Test the singleton is immutable - one = Quantity.zero() - one += Quantity(1.0) - assert one != Quantity.zero() - assert Quantity.zero() == Quantity(0.0) + one = Power.zero() + one += Power.from_megawatts(1.0) + assert one != Power.zero() + assert Power.zero() == Power.from_milliwatts(0.0) assert Power.from_watts(0.0) == Power.zero() assert Power.from_kilowatts(0.0) == Power.zero() @@ -173,53 +194,40 @@ def test_base_value_from_string_is_float( def test_string_representation() -> None: """Test the string representation of the quantities.""" - assert str(Quantity(1.024445, exponent=0)) == "1.024" - assert ( - repr(Quantity(1.024445, exponent=0)) == "Quantity(value=1.024445, exponent=0)" - ) - assert f"{Quantity(0.50001, exponent=0):.0}" == "1" - assert f"{Quantity(1.024445, exponent=0)}" == "1.024" - assert f"{Quantity(1.024445, exponent=0):.0}" == "1" - assert f"{Quantity(0.124445, exponent=0):.0}" == "0" - assert f"{Quantity(0.50001, exponent=0):.0}" == "1" - assert f"{Quantity(1.024445, exponent=0):.6}" == "1.024445" - - assert f"{Quantity(1.024445, exponent=3)}" == "1024.445" - - assert str(Fz1(1.024445, exponent=0)) == "1.024 Hz" - assert repr(Fz1(1.024445, exponent=0)) == "Fz1(value=1.024445, exponent=0)" - assert f"{Fz1(1.024445, exponent=0)}" == "1.024 Hz" - assert f"{Fz1(1.024445, exponent=0):.0}" == "1 Hz" - assert f"{Fz1(1.024445, exponent=0):.1}" == "1 Hz" - assert f"{Fz1(1.024445, exponent=0):.2}" == "1.02 Hz" - assert f"{Fz1(1.024445, exponent=0):.9}" == "1.024445 Hz" - assert f"{Fz1(1.024445, exponent=0):0.0}" == "1 Hz" - assert f"{Fz1(1.024445, exponent=0):0.1}" == "1.0 Hz" - assert f"{Fz1(1.024445, exponent=0):0.2}" == "1.02 Hz" - assert f"{Fz1(1.024445, exponent=0):0.9}" == "1.024445000 Hz" - - assert f"{Fz1(1.024445, exponent=3)}" == "1.024 kHz" - assert f"{Fz2(1.024445, exponent=3)}" == "1.024 kHz" - - assert f"{Fz1(1.024445, exponent=6)}" == "1024.445 kHz" - assert f"{Fz2(1.024445, exponent=6)}" == "1.024 MHz" - assert f"{Fz1(1.024445, exponent=9)}" == "1024445 kHz" - assert f"{Fz2(1.024445, exponent=9)}" == "1.024 GHz" - - assert f"{Fz1(1.024445, exponent=-3)}" == "0.001 Hz" - assert f"{Fz2(1.024445, exponent=-3)}" == "1.024 mHz" - - assert f"{Fz1(1.024445, exponent=-6)}" == "0 Hz" - assert f"{Fz1(1.024445, exponent=-6):.6}" == "0.000001 Hz" - assert f"{Fz2(1.024445, exponent=-6)}" == "1.024 uHz" - - assert f"{Fz1(1.024445, exponent=-12)}" == "0 Hz" - assert f"{Fz2(1.024445, exponent=-12)}" == "0 Hz" - - assert f"{Fz1(0)}" == "0 Hz" - - assert f"{Fz1(-20)}" == "-20 Hz" - assert f"{Fz1(-20000)}" == "-20 kHz" + assert str(Fz1.new(1.024445, exponent=0)) == "1.024 Hz" + assert repr(Fz1.new(1.024445, exponent=0)) == "Fz1(value=1.024445, exponent=0)" + assert f"{Fz1.new(1.024445, exponent=0)}" == "1.024 Hz" + assert f"{Fz1.new(1.024445, exponent=0):.0}" == "1 Hz" + assert f"{Fz1.new(1.024445, exponent=0):.1}" == "1 Hz" + assert f"{Fz1.new(1.024445, exponent=0):.2}" == "1.02 Hz" + assert f"{Fz1.new(1.024445, exponent=0):.9}" == "1.024445 Hz" + assert f"{Fz1.new(1.024445, exponent=0):0.0}" == "1 Hz" + assert f"{Fz1.new(1.024445, exponent=0):0.1}" == "1.0 Hz" + assert f"{Fz1.new(1.024445, exponent=0):0.2}" == "1.02 Hz" + assert f"{Fz1.new(1.024445, exponent=0):0.9}" == "1.024445000 Hz" + + assert f"{Fz1.new(1.024445, exponent=3)}" == "1.024 kHz" + assert f"{Fz2.new(1.024445, exponent=3)}" == "1.024 kHz" + + assert f"{Fz1.new(1.024445, exponent=6)}" == "1024.445 kHz" + assert f"{Fz2.new(1.024445, exponent=6)}" == "1.024 MHz" + assert f"{Fz1.new(1.024445, exponent=9)}" == "1024445 kHz" + assert f"{Fz2.new(1.024445, exponent=9)}" == "1.024 GHz" + + assert f"{Fz1.new(1.024445, exponent=-3)}" == "0.001 Hz" + assert f"{Fz2.new(1.024445, exponent=-3)}" == "1.024 mHz" + + assert f"{Fz1.new(1.024445, exponent=-6)}" == "0 Hz" + assert f"{Fz1.new(1.024445, exponent=-6):.6}" == "0.000001 Hz" + assert f"{Fz2.new(1.024445, exponent=-6)}" == "1.024 uHz" + + assert f"{Fz1.new(1.024445, exponent=-12)}" == "0 Hz" + assert f"{Fz2.new(1.024445, exponent=-12)}" == "0 Hz" + + assert f"{Fz1.new(0)}" == "0 Hz" + + assert f"{Fz1.new(-20)}" == "-20 Hz" + assert f"{Fz1.new(-20000)}" == "-20 kHz" assert f"{Power.from_watts(0.000124445):.0}" == "0 W" assert f"{Energy.from_watt_hours(0.124445):.0}" == "0 Wh" @@ -230,91 +238,82 @@ def test_string_representation() -> None: def test_isclose() -> None: """Test the isclose method of the quantities.""" - assert Fz1(1.024445).isclose(Fz1(1.024445)) - assert not Fz1(1.024445).isclose(Fz1(1.0)) + assert Fz1.new(1.024445).isclose(Fz1.new(1.024445)) + assert not Fz1.new(1.024445).isclose(Fz1.new(1.0)) def test_addition_subtraction() -> None: """Test the addition and subtraction of the quantities.""" - assert Quantity(1) + Quantity(1, exponent=0) == Quantity(2, exponent=0) - assert Quantity(1) + Quantity(1, exponent=3) == Quantity(1001, exponent=0) - assert Quantity(1) - Quantity(1, exponent=0) == Quantity(0, exponent=0) + assert Fz1.new(1) + Fz1.new(1, exponent=0) == Fz1.new(2, exponent=0) + assert Fz1.new(1) + Fz1.new(1, exponent=3) == Fz1.new(1001, exponent=0) + assert Fz1.new(1) - Fz1.new(1, exponent=0) == Fz1.new(0, exponent=0) - assert Fz1(1) + Fz1(1) == Fz1(2) + assert Fz1.new(1) + Fz1.new(1) == Fz1.new(2) with pytest.raises(TypeError) as excinfo: - assert Fz1(1) + Fz2(1) # type: ignore + assert Fz1.new(1) + Fz2.new(1) # type: ignore assert excinfo.value.args[0] == "unsupported operand type(s) for +: 'Fz1' and 'Fz2'" with pytest.raises(TypeError) as excinfo: - assert Fz1(1) - Fz2(1) # type: ignore + assert Fz1.new(1) - Fz2.new(1) # type: ignore assert excinfo.value.args[0] == "unsupported operand type(s) for -: 'Fz1' and 'Fz2'" - fz1 = Fz1(1.0) - fz1 += Fz1(4.0) - assert fz1 == Fz1(5.0) - fz1 -= Fz1(9.0) - assert fz1 == Fz1(-4.0) + fz1 = Fz1.new(1.0) + fz1 += Fz1.new(4.0) + assert fz1 == Fz1.new(5.0) + fz1 -= Fz1.new(9.0) + assert fz1 == Fz1.new(-4.0) with pytest.raises(TypeError) as excinfo: - fz1 += Fz2(1.0) # type: ignore + fz1 += Fz2.new(1.0) # type: ignore def test_comparison() -> None: """Test the comparison of the quantities.""" - assert Quantity(1.024445, exponent=0) == Quantity(1.024445, exponent=0) - assert Quantity(1.024445, exponent=0) != Quantity(1.024445, exponent=3) - assert Quantity(1.024445, exponent=0) < Quantity(1.024445, exponent=3) - assert Quantity(1.024445, exponent=0) <= Quantity(1.024445, exponent=3) - assert Quantity(1.024445, exponent=0) <= Quantity(1.024445, exponent=0) - assert Quantity(1.024445, exponent=0) > Quantity(1.024445, exponent=-3) - assert Quantity(1.024445, exponent=0) >= Quantity(1.024445, exponent=-3) - assert Quantity(1.024445, exponent=0) >= Quantity(1.024445, exponent=0) - - assert Fz1(1.024445, exponent=0) == Fz1(1.024445, exponent=0) - assert Fz1(1.024445, exponent=0) != Fz1(1.024445, exponent=3) - assert Fz1(1.024445, exponent=0) < Fz1(1.024445, exponent=3) - assert Fz1(1.024445, exponent=0) <= Fz1(1.024445, exponent=3) - assert Fz1(1.024445, exponent=0) <= Fz1(1.024445, exponent=0) - assert Fz1(1.024445, exponent=0) > Fz1(1.024445, exponent=-3) - assert Fz1(1.024445, exponent=0) >= Fz1(1.024445, exponent=-3) - assert Fz1(1.024445, exponent=0) >= Fz1(1.024445, exponent=0) - - assert Fz1(1.024445, exponent=0) != Fz2(1.024445, exponent=0) + assert Fz1.new(1.024445, exponent=0) == Fz1.new(1.024445, exponent=0) + assert Fz1.new(1.024445, exponent=0) != Fz1.new(1.024445, exponent=3) + assert Fz1.new(1.024445, exponent=0) < Fz1.new(1.024445, exponent=3) + assert Fz1.new(1.024445, exponent=0) <= Fz1.new(1.024445, exponent=3) + assert Fz1.new(1.024445, exponent=0) <= Fz1.new(1.024445, exponent=0) + assert Fz1.new(1.024445, exponent=0) > Fz1.new(1.024445, exponent=-3) + assert Fz1.new(1.024445, exponent=0) >= Fz1.new(1.024445, exponent=-3) + assert Fz1.new(1.024445, exponent=0) >= Fz1.new(1.024445, exponent=0) + + assert Fz1.new(1.024445, exponent=0) != Fz2.new(1.024445, exponent=0) with pytest.raises(TypeError) as excinfo: # unfortunately, mypy does not identify this as an error, when comparing a child # type against a base type, but they should still fail, because base-type # instances are being used as dimension-less quantities, whereas all child types # have dimensions/units. - assert Fz1(1.024445, exponent=0) <= Quantity(1.024445, exponent=0) + assert Fz1.new(1.024445, exponent=0) <= Fz1Subclass.new(1.024445, exponent=0) assert ( excinfo.value.args[0] - == "'<=' not supported between instances of 'Fz1' and 'Quantity'" + == "'<=' not supported between instances of 'Fz1' and 'Fz1Subclass'" ) with pytest.raises(TypeError) as excinfo: - assert Quantity(1.024445, exponent=0) <= Fz1(1.024445, exponent=0) + assert Fz1Subclass.new(1.024445, exponent=0) <= Fz1.new(1.024445, exponent=0) assert ( excinfo.value.args[0] - == "'<=' not supported between instances of 'Quantity' and 'Fz1'" + == "'<=' not supported between instances of 'Fz1Subclass' and 'Fz1'" ) with pytest.raises(TypeError) as excinfo: - assert Fz1(1.024445, exponent=0) < Fz2(1.024445, exponent=3) # type: ignore + assert Fz1.new(1.024445, exponent=0) < Fz2.new(1.024445, exponent=3) # type: ignore assert ( excinfo.value.args[0] == "'<' not supported between instances of 'Fz1' and 'Fz2'" ) with pytest.raises(TypeError) as excinfo: - assert Fz1(1.024445, exponent=0) <= Fz2(1.024445, exponent=3) # type: ignore + assert Fz1.new(1.024445, exponent=0) <= Fz2.new(1.024445, exponent=3) # type: ignore assert ( excinfo.value.args[0] == "'<=' not supported between instances of 'Fz1' and 'Fz2'" ) with pytest.raises(TypeError) as excinfo: - assert Fz1(1.024445, exponent=0) > Fz2(1.024445, exponent=-3) # type: ignore + assert Fz1.new(1.024445, exponent=0) > Fz2.new(1.024445, exponent=-3) # type: ignore assert ( excinfo.value.args[0] == "'>' not supported between instances of 'Fz1' and 'Fz2'" ) with pytest.raises(TypeError) as excinfo: - assert Fz1(1.024445, exponent=0) >= Fz2(1.024445, exponent=-3) # type: ignore + assert Fz1.new(1.024445, exponent=0) >= Fz2.new(1.024445, exponent=-3) # type: ignore assert ( excinfo.value.args[0] == "'>=' not supported between instances of 'Fz1' and 'Fz2'" @@ -338,7 +337,7 @@ def test_power() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Power(1.0, exponent=0) + Power() def test_current() -> None: @@ -357,7 +356,7 @@ def test_current() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Current(1.0, exponent=0) + Current() def test_voltage() -> None: @@ -378,7 +377,7 @@ def test_voltage() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Voltage(1.0, exponent=0) + Voltage() def test_energy() -> None: @@ -398,7 +397,7 @@ def test_energy() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Energy(1.0, exponent=0) + Energy() def test_temperature() -> None: @@ -411,7 +410,7 @@ def test_temperature() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Temperature(1.0, exponent=0) + Temperature() def test_quantity_compositions() -> None: @@ -447,7 +446,7 @@ def test_frequency() -> None: with pytest.raises(TypeError): # using the default constructor should raise. - Frequency(1.0, exponent=0) + Frequency() def test_percentage() -> None: @@ -682,6 +681,12 @@ def test_to_and_from_string( assert f"{from_string:.{exponent}}" == quantity_str except AssertionError as error: pytest.fail( - f"Failed for {quantity.base_value} != from_string({from_string.base_value}) " + f"Failed for {float(quantity)} != from_string({float(from_string)}) " + f"with exponent {exponent} and source value '{value}': {error}" ) + + +def test_cant_use_Quantity_from_string() -> None: + """Test string parsing and formatting is not allowed for the base Quantity class.""" + with pytest.raises(TypeError): + Quantity.from_string("1.0")