diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 459c16acb..2d494223b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -7,6 +7,8 @@ ## Upgrading * The microgrid client dependency has been updated to version 0.9.0 +* The resampling function now takes plain `float`s as values instead of `Quantity` objects. +* `frequenz.sdk.timeseries.UNIX_EPOCH` was removed, use [`frequenz.core.datetime.UNIX_EPOCH`](https://frequenz-floss.github.io/frequenz-core-python/latest/reference/frequenz/core/datetime/#frequenz.core.datetime.UNIX_EPOCH) instead. ## New Features diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index 53c70aa60..58d360c9c 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -7,18 +7,13 @@ from datetime import datetime, timedelta, timezone from timeit import timeit -from frequenz.quantities import Quantity - -from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import ( - ResamplerConfig, - SourceProperties, - _ResamplingHelper, -) +from frequenz.sdk.timeseries import ResamplerConfig +from frequenz.sdk.timeseries._resampling._base_types import SourceProperties +from frequenz.sdk.timeseries._resampling._resampler import _ResamplingHelper def nop( # pylint: disable=unused-argument - samples: Sequence[Sample[Quantity]], + samples: Sequence[tuple[datetime, float]], resampler_config: ResamplerConfig, source_properties: SourceProperties, ) -> float: @@ -43,10 +38,11 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None: def _do_work() -> None: nonlocal now + delta = timedelta(seconds=1 / samples) for _n_resample in range(resamples): for _n_sample in range(samples): - now = now + timedelta(seconds=1 / samples) - helper.add_sample(Sample(now, Quantity(0.0))) + now = now + delta + helper.add_sample((now, 0.0)) helper.resample(now) print(timeit(_do_work, number=5)) diff --git a/examples/battery_pool.py b/examples/battery_pool.py index d2f856442..405b658ec 100644 --- a/examples/battery_pool.py +++ b/examples/battery_pool.py @@ -33,9 +33,8 @@ async def main() -> None: receivers = [ battery_pool.soc.new_receiver(limit=1), battery_pool.capacity.new_receiver(limit=1), - # pylint: disable=protected-access + # pylint: disable-next=protected-access battery_pool._system_power_bounds.new_receiver(limit=1), - # pylint: enable=protected-access ] async for metric in merge(*receivers): diff --git a/mkdocs.yml b/mkdocs.yml index d7a711a50..2c2937232 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -117,8 +117,10 @@ plugins: import: # See https://mkdocstrings.github.io/python/usage/#import for details - https://docs.python.org/3/objects.inv - - https://frequenz-floss.github.io/frequenz-channels-python/v1.1/objects.inv - - https://frequenz-floss.github.io/frequenz-client-microgrid-python/v0.7/objects.inv + - https://frequenz-floss.github.io/frequenz-channels-python/v1/objects.inv + - https://frequenz-floss.github.io/frequenz-client-common-python/v0.3/objects.inv + - https://frequenz-floss.github.io/frequenz-client-microgrid-python/v0.9/objects.inv + - https://frequenz-floss.github.io/frequenz-core-python/v1/objects.inv - https://frequenz-floss.github.io/frequenz-quantities-python/v1/objects.inv - https://lovasoa.github.io/marshmallow_dataclass/html/objects.inv - https://marshmallow.readthedocs.io/en/stable/objects.inv diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index d0b00f323..19615aa04 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -597,7 +597,7 @@ async def main() -> None: # (6)! [_run]: #the-_run-method """ -from ..timeseries._resampling import ResamplerConfig +from ..timeseries._resampling._config import ResamplerConfig from ._actor import Actor from ._background_service import BackgroundService from ._run_utils import run diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py index ec3d4b992..77ea251c6 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_battery_status_tracker.py @@ -394,8 +394,8 @@ def _is_inverter_state_correct(self, msg: InverterData) -> bool: True if inverter is in correct state. False otherwise. """ # Component state is not exposed to the user. - # pylint: disable=protected-access state = msg.component_state + # pylint: disable-next=protected-access if state not in BatteryStatusTracker._inverter_valid_state: if self._last_status == ComponentStatusEnum.WORKING: _logger.warning( @@ -404,7 +404,6 @@ def _is_inverter_state_correct(self, msg: InverterData) -> bool: state.name, ) return False - # pylint: enable=protected-access return True def _is_battery_state_correct(self, msg: BatteryData) -> bool: @@ -417,8 +416,8 @@ def _is_battery_state_correct(self, msg: BatteryData) -> bool: True if battery is in correct state. False otherwise. """ # Component state is not exposed to the user. - # pylint: disable=protected-access state = msg.component_state + # pylint: disable-next=protected-access if state not in BatteryStatusTracker._battery_valid_state: if self._last_status == ComponentStatusEnum.WORKING: _logger.warning( @@ -439,7 +438,6 @@ def _is_battery_state_correct(self, msg: BatteryData) -> bool: ) return False return True - # pylint: enable=protected-access def _is_timestamp_outdated(self, timestamp: datetime) -> bool: """Return if timestamp is to old. diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py b/src/frequenz/sdk/microgrid/_power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py index 918af5a9d..5a8568163 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_distribution_algorithm/_battery_distribution_algorithm.py @@ -461,7 +461,8 @@ def _compute_battery_availability_ratio( return battery_availability_ratio, total_battery_availability_ratio - def _distribute_power( # pylint: disable=too-many-arguments + # pylint: disable-next=too-many-arguments,too-many-locals,too-many-branches,too-many-statements + def _distribute_power( self, *, components: list[InvBatPair], @@ -470,7 +471,6 @@ def _distribute_power( # pylint: disable=too-many-arguments incl_bounds: dict[ComponentId, Power], excl_bounds: dict[ComponentId, Power], ) -> DistributionResult: - # pylint: disable=too-many-locals,too-many-branches,too-many-statements """Distribute power between given components. After this method power should be distributed between batteries diff --git a/src/frequenz/sdk/microgrid/_power_distributing/power_distributing.py b/src/frequenz/sdk/microgrid/_power_distributing/power_distributing.py index 6653e2fec..30a624cd2 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/power_distributing.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/power_distributing.py @@ -33,8 +33,7 @@ _logger = logging.getLogger(__name__) -class PowerDistributingActor(Actor): - # pylint: disable=too-many-instance-attributes +class PowerDistributingActor(Actor): # pylint: disable=too-many-instance-attributes """Actor to distribute the power between components in a microgrid. One instance of the actor can handle only one component category and type, diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 05a9d4e42..1246bd6c4 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -158,16 +158,17 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N NotImplementedError: When the pool type is not supported. """ bounds_receiver: Receiver[SystemBounds] - # pylint: disable=protected-access if self._component_category is ComponentCategory.BATTERY: battery_pool = _data_pipeline.new_battery_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) + # pylint: disable-next=protected-access bounds_receiver = battery_pool._system_power_bounds.new_receiver() elif self._component_category is ComponentCategory.EV_CHARGER: ev_charger_pool = _data_pipeline.new_ev_charger_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) + # pylint: disable-next=protected-access bounds_receiver = ev_charger_pool._system_power_bounds.new_receiver() elif ( self._component_category is ComponentCategory.INVERTER @@ -176,8 +177,8 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[ComponentId]) -> N pv_pool = _data_pipeline.new_pv_pool( priority=-sys.maxsize - 1, component_ids=component_ids ) + # pylint: disable-next=protected-access bounds_receiver = pv_pool._system_power_bounds.new_receiver() - # pylint: enable=protected-access else: err = ( "PowerManagingActor: Unsupported component category: " diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index be9a97d4b..7dcc2c4a0 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -9,14 +9,16 @@ from datetime import timedelta from frequenz.channels import Broadcast - -# pylint seems to think this is a cyclic import, but it is not. -# -# pylint: disable=cyclic-import from frequenz.client.microgrid import ComponentCategory, ComponentType from .._internal._channels import ChannelRegistry, ReceiverFetcher + +# pylint seems to think this is a cyclic import, but it is not. +# +# pylint: disable-next=cyclic-import from . import _power_managing, connection_manager + +# pylint: disable-next=cyclic-import from ._power_distributing import ( ComponentPoolStatus, PowerDistributingActor, diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index e16ca8d01..b5c554513 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -13,9 +13,11 @@ from .._internal._asyncio import cancel_and_await from .._internal._channels import ChannelRegistry -from ..actor import Actor +from ..actor._actor import Actor from ..timeseries import Sample -from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError +from ..timeseries._resampling._config import ResamplerConfig +from ..timeseries._resampling._exceptions import ResamplingError +from ..timeseries._resampling._resampler import Resampler from ._data_sourcing import ComponentMetricRequest _logger = logging.getLogger(__name__) diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py index 99d30c746..97af5a478 100644 --- a/src/frequenz/sdk/timeseries/__init__.py +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -9,8 +9,8 @@ # Periodicity and alignment -All the data produced by this package is always periodic and aligned to the -`UNIX_EPOCH` (by default). +All the data produced by this package is always periodic, in UTC, and aligned to the +[Epoch](https://en.wikipedia.org/wiki/Epoch_(computing)) (by default). Classes normally take a (re)sampling period as and argument and, optionally, an `align_to` argument. @@ -36,11 +36,12 @@ """ from .._internal._channels import ReceiverFetcher -from ._base_types import UNIX_EPOCH, Bounds, Sample, Sample3Phase +from ._base_types import Bounds, Sample, Sample3Phase from ._fuse import Fuse from ._moving_window import MovingWindow from ._periodic_feature_extractor import PeriodicFeatureExtractor -from ._resampling import ResamplerConfig +from ._resampling._base_types import SourceProperties +from ._resampling._config import ResamplerConfig, ResamplingFunction __all__ = [ "Bounds", @@ -49,7 +50,8 @@ "PeriodicFeatureExtractor", "ResamplerConfig", "ReceiverFetcher", + "ResamplingFunction", "Sample", "Sample3Phase", - "UNIX_EPOCH", + "SourceProperties", ] diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index 4960a09e1..07383981c 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -7,14 +7,11 @@ import functools from collections.abc import Callable, Iterator from dataclasses import dataclass -from datetime import datetime, timezone +from datetime import datetime from typing import Any, Generic, Protocol, Self, TypeVar, cast, overload from frequenz.quantities import Power, Quantity -UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc) -"""The UNIX epoch (in UTC).""" - QuantityT = TypeVar("QuantityT", bound=Quantity) """Type variable for representing various quantity types.""" diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index e5e31f0c7..1e7eb220d 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -13,12 +13,14 @@ import numpy as np from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.core.datetime import UNIX_EPOCH from frequenz.quantities import Quantity from numpy.typing import ArrayLike from ..actor._background_service import BackgroundService -from ._base_types import UNIX_EPOCH, Sample -from ._resampling import Resampler, ResamplerConfig +from ._base_types import Sample +from ._resampling._config import ResamplerConfig +from ._resampling._resampler import Resampler from ._ringbuffer import OrderedRingBuffer _logger = logging.getLogger(__name__) diff --git a/src/frequenz/sdk/timeseries/_resampling/__init__.py b/src/frequenz/sdk/timeseries/_resampling/__init__.py new file mode 100644 index 000000000..bf233f1ab --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/__init__.py @@ -0,0 +1,4 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Timeseries resampling package.""" diff --git a/src/frequenz/sdk/timeseries/_resampling/_base_types.py b/src/frequenz/sdk/timeseries/_resampling/_base_types.py new file mode 100644 index 000000000..ac7791098 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_base_types.py @@ -0,0 +1,58 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Resampler base types.""" + +from collections.abc import AsyncIterator, Callable, Coroutine +from dataclasses import dataclass +from datetime import datetime, timedelta + +from frequenz.quantities import Quantity + +from .._base_types import Sample + +Source = AsyncIterator[Sample[Quantity]] +"""A source for a timeseries. + +A timeseries can be received sample by sample in a streaming way +using a source. +""" + +Sink = Callable[[Sample[Quantity]], Coroutine[None, None, None]] +"""A sink for a timeseries. + +A new timeseries can be generated by sending samples to a sink. + +This should be an `async` callable, for example: + +```python +async some_sink(Sample) -> None: + ... +``` + +Args: + sample (Sample): A sample to be sent out. +""" + + +@dataclass +class SourceProperties: + """Properties of a resampling source.""" + + sampling_start: datetime | None = None + """The time when resampling started for this source. + + `None` means it didn't started yet. + """ + + received_samples: int = 0 + """Total samples received by this source so far.""" + + sampling_period: timedelta | None = None + """The sampling period of this source. + + This means we receive (on average) one sample for this source every + `sampling_period` time. + + `None` means it is unknown. + """ diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py new file mode 100644 index 000000000..9be665c6e --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -0,0 +1,212 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Resampler configuration.""" + +from __future__ import annotations + +import logging +import statistics +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Protocol + +from frequenz.core.datetime import UNIX_EPOCH + +from ._base_types import SourceProperties + +_logger = logging.getLogger(__name__) + + +DEFAULT_BUFFER_LEN_INIT = 16 +"""Default initial buffer length. + +Buffers will be created initially with this length, but they could grow or +shrink depending on the source properties, like sampling rate, to make +sure all the requested past sampling periods can be stored. +""" + + +DEFAULT_BUFFER_LEN_MAX = 1024 +"""Default maximum allowed buffer length. + +If a buffer length would get bigger than this, it will be truncated to this +length. +""" + + +DEFAULT_BUFFER_LEN_WARN = 128 +"""Default minimum buffer length that will produce a warning. + +If a buffer length would get bigger than this, a warning will be logged. +""" + + +class ResamplingFunction(Protocol): + """Combine multiple samples into a new one. + + A resampling function produces a new sample based on a list of pre-existing + samples. It can do "upsampling" when the data rate of the `input_samples` + period is smaller than the `resampling_period`, or "downsampling" if it is + bigger. + + In general, a resampling window is the same as the `resampling_period`, and + this function might receive input samples from multiple windows in the past to + enable extrapolation, but no samples from the future (so the timestamp of the + new sample that is going to be produced will always be bigger than the biggest + timestamp in the input data). + """ + + def __call__( + self, + input_samples: Sequence[tuple[datetime, float]], + resampler_config: ResamplerConfig, + source_properties: SourceProperties, + /, + ) -> float: + """Call the resampling function. + + Args: + input_samples: The sequence of pre-existing samples, where the first item is + the timestamp of the sample, and the second is the value of the sample. + The sequence must be non-empty. + resampler_config: The configuration of the resampler calling this + function. + source_properties: The properties of the source being resampled. + + Returns: + The value of new sample produced after the resampling. + """ + ... # pylint: disable=unnecessary-ellipsis + + +@dataclass(frozen=True) +class ResamplerConfig: + """Resampler configuration.""" + + resampling_period: timedelta + """The resampling period. + + This is the time it passes between resampled data should be calculated. + + It must be a positive time span. + """ + + max_data_age_in_periods: float = 3.0 + """The maximum age a sample can have to be considered *relevant* for resampling. + + Expressed in number of periods, where period is the `resampling_period` + if we are downsampling (resampling period bigger than the input period) or + the *input sampling period* if we are upsampling (input period bigger than + the resampling period). + + It must be bigger than 1.0. + + Example: + If `resampling_period` is 3 seconds, the input sampling period is + 1 and `max_data_age_in_periods` is 2, then data older than 3*2 + = 6 seconds will be discarded when creating a new sample and never + passed to the resampling function. + + If `resampling_period` is 3 seconds, the input sampling period is + 5 and `max_data_age_in_periods` is 2, then data older than 5*2 + = 10 seconds will be discarded when creating a new sample and never + passed to the resampling function. + """ + + resampling_function: ResamplingFunction = lambda samples, _, __: statistics.fmean( + s[1] for s in samples + ) + """The resampling function. + + This function will be applied to the sequence of relevant samples at + a given time. The result of the function is what is sent as the resampled + value. + """ + + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT + """The initial length of the resampling buffer. + + The buffer could grow or shrink depending on the source properties, + like sampling rate, to make sure all the requested past sampling periods + can be stored. + + It must be at least 1 and at most `max_buffer_len`. + """ + + warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN + """The minimum length of the resampling buffer that will emit a warning. + + If a buffer grows bigger than this value, it will emit a warning in the + logs, so buffers don't grow too big inadvertently. + + It must be at least 1 and at most `max_buffer_len`. + """ + + max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX + """The maximum length of the resampling buffer. + + Buffers won't be allowed to grow beyond this point even if it would be + needed to keep all the requested past sampling periods. An error will be + emitted in the logs if the buffer length needs to be truncated to this + value. + + It must be at bigger than `warn_buffer_len`. + """ + + align_to: datetime | None = UNIX_EPOCH + """The time to align the resampling period to. + + The resampling period will be aligned to this time, so the first resampled + sample will be at the first multiple of `resampling_period` starting from + `align_to`. It must be an aware datetime and can be in the future too. + + If `align_to` is `None`, the resampling period will be aligned to the + time the resampler is created. + """ + + def __post_init__(self) -> None: + """Check that config values are valid. + + Raises: + ValueError: If any value is out of range. + """ + if self.resampling_period.total_seconds() < 0.0: + raise ValueError( + f"resampling_period ({self.resampling_period}) must be positive" + ) + if self.max_data_age_in_periods < 1.0: + raise ValueError( + f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0" + ) + if self.warn_buffer_len < 1: + raise ValueError( + f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1" + ) + if self.max_buffer_len <= self.warn_buffer_len: + raise ValueError( + f"max_buffer_len ({self.max_buffer_len}) should " + f"be bigger than warn_buffer_len ({self.warn_buffer_len})" + ) + + if self.initial_buffer_len < 1: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) should at least 1" + ) + if self.initial_buffer_len > self.max_buffer_len: + raise ValueError( + f"initial_buffer_len ({self.initial_buffer_len}) is bigger " + f"than max_buffer_len ({self.max_buffer_len}), use a smaller " + "initial_buffer_len or a bigger max_buffer_len" + ) + if self.initial_buffer_len > self.warn_buffer_len: + _logger.warning( + "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)", + self.initial_buffer_len, + self.warn_buffer_len, + ) + if self.align_to is not None and self.align_to.tzinfo is None: + raise ValueError( + f"align_to ({self.align_to}) should be a timezone aware datetime" + ) diff --git a/src/frequenz/sdk/timeseries/_resampling/_exceptions.py b/src/frequenz/sdk/timeseries/_resampling/_exceptions.py new file mode 100644 index 000000000..ee8b291b8 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_exceptions.py @@ -0,0 +1,74 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +"""Resampler exceptions.""" + +import asyncio + +from ._base_types import Source + + +class SourceStoppedError(RuntimeError): + """A timeseries stopped producing samples.""" + + def __init__(self, source: Source) -> None: + """Create an instance. + + Args: + source: The source of the timeseries that stopped producing samples. + """ + super().__init__(f"Timeseries stopped producing samples, source: {source}") + self.source = source + """The source of the timeseries that stopped producing samples.""" + + def __repr__(self) -> str: + """Return the representation of the instance. + + Returns: + The representation of the instance. + """ + return f"{self.__class__.__name__}({self.source!r})" + + +class ResamplingError(RuntimeError): + """An Error occurred while resampling. + + This error is a container for errors raised by the underlying sources and + or sinks. + """ + + def __init__( + self, + exceptions: dict[Source, Exception | asyncio.CancelledError], + ) -> None: + """Create an instance. + + Args: + exceptions: A mapping of timeseries source and the exception + encountered while resampling that timeseries. Note that the + error could be raised by the sink, while trying to send + a resampled data for this timeseries, the source key is only + used to identify the timeseries with the issue, it doesn't + necessarily mean that the error was raised by the source. The + underlying exception should provide information about what was + the actual source of the exception. + """ + super().__init__(f"Some error were found while resampling: {exceptions}") + self.exceptions = exceptions + """A mapping of timeseries source and the exception encountered. + + Note that the error could be raised by the sink, while trying to send + a resampled data for this timeseries, the source key is only used to + identify the timeseries with the issue, it doesn't necessarily mean + that the error was raised by the source. The underlying exception + should provide information about what was the actual source of the + exception. + """ + + def __repr__(self) -> str: + """Return the representation of the instance. + + Returns: + The representation of the instance. + """ + return f"{self.__class__.__name__}({self.exceptions=})" diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py similarity index 62% rename from src/frequenz/sdk/timeseries/_resampling.py rename to src/frequenz/sdk/timeseries/_resampling/_resampler.py index b2307a926..f779f8ed3 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -11,337 +11,20 @@ import math from bisect import bisect from collections import deque -from collections.abc import AsyncIterator, Callable, Coroutine, Sequence -from dataclasses import dataclass from datetime import datetime, timedelta, timezone from frequenz.channels.timer import Timer, TriggerAllMissed, _to_microseconds from frequenz.quantities import Quantity -from .._internal._asyncio import cancel_and_await -from ._base_types import UNIX_EPOCH, QuantityT, Sample +from ..._internal._asyncio import cancel_and_await +from .._base_types import Sample +from ._base_types import Sink, Source, SourceProperties +from ._config import ResamplerConfig +from ._exceptions import ResamplingError, SourceStoppedError _logger = logging.getLogger(__name__) -DEFAULT_BUFFER_LEN_INIT = 16 -"""Default initial buffer length. - -Buffers will be created initially with this length, but they could grow or -shrink depending on the source properties, like sampling rate, to make -sure all the requested past sampling periods can be stored. -""" - - -DEFAULT_BUFFER_LEN_MAX = 1024 -"""Default maximum allowed buffer length. - -If a buffer length would get bigger than this, it will be truncated to this -length. -""" - - -DEFAULT_BUFFER_LEN_WARN = 128 -"""Default minimum buffer length that will produce a warning. - -If a buffer length would get bigger than this, a warning will be logged. -""" - - -Source = AsyncIterator[Sample[Quantity]] -"""A source for a timeseries. - -A timeseries can be received sample by sample in a streaming way -using a source. -""" - -Sink = Callable[[Sample[Quantity]], Coroutine[None, None, None]] -"""A sink for a timeseries. - -A new timeseries can be generated by sending samples to a sink. - -This should be an `async` callable, for example: - -```python -async some_sink(Sample) -> None: - ... -``` - -Args: - sample (Sample): A sample to be sent out. -""" - - -ResamplingFunction = Callable[ - [Sequence[Sample[Quantity]], "ResamplerConfig", "SourceProperties"], float -] -"""Resampling function type. - -A resampling function produces a new sample based on a list of pre-existing -samples. It can do "upsampling" when the data rate of the `input_samples` -period is smaller than the `resampling_period`, or "downsampling" if it is -bigger. - -In general a resampling window is the same as the `resampling_period`, and -this function might receive input samples from multiple windows in the past to -enable extrapolation, but no samples from the future (so the timestamp of the -new sample that is going to be produced will always be bigger than the biggest -timestamp in the input data). - -Args: - input_samples (Sequence[Sample]): The sequence of pre-existing samples. - resampler_config (ResamplerConfig): The configuration of the resampling - calling this function. - source_properties (SourceProperties): The properties of the source being - resampled. - -Returns: - new_sample (float): The value of new sample produced after the resampling. -""" - - -# pylint: disable=unused-argument -def average( - samples: Sequence[Sample[QuantityT]], - resampler_config: ResamplerConfig, - source_properties: SourceProperties, -) -> float: - """Calculate average of all the provided values. - - Args: - samples: The samples to apply the average to. It must be non-empty. - resampler_config: The configuration of the resampler calling this - function. - source_properties: The properties of the source being resampled. - - Returns: - The average of all `samples` values. - """ - assert len(samples) > 0, "Average cannot be given an empty list of samples" - values = list( - sample.value.base_value for sample in samples if sample.value is not None - ) - return sum(values) / len(values) - - -@dataclass(frozen=True) -class ResamplerConfig: - """Resampler configuration.""" - - resampling_period: timedelta - """The resampling period. - - This is the time it passes between resampled data should be calculated. - - It must be a positive time span. - """ - - max_data_age_in_periods: float = 3.0 - """The maximum age a sample can have to be considered *relevant* for resampling. - - Expressed in number of periods, where period is the `resampling_period` - if we are downsampling (resampling period bigger than the input period) or - the *input sampling period* if we are upsampling (input period bigger than - the resampling period). - - It must be bigger than 1.0. - - Example: - If `resampling_period` is 3 seconds, the input sampling period is - 1 and `max_data_age_in_periods` is 2, then data older than 3*2 - = 6 seconds will be discarded when creating a new sample and never - passed to the resampling function. - - If `resampling_period` is 3 seconds, the input sampling period is - 5 and `max_data_age_in_periods` is 2, then data older than 5*2 - = 10 seconds will be discarded when creating a new sample and never - passed to the resampling function. - """ - - resampling_function: ResamplingFunction = average - """The resampling function. - - This function will be applied to the sequence of relevant samples at - a given time. The result of the function is what is sent as the resampled - value. - """ - - initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT - """The initial length of the resampling buffer. - - The buffer could grow or shrink depending on the source properties, - like sampling rate, to make sure all the requested past sampling periods - can be stored. - - It must be at least 1 and at most `max_buffer_len`. - """ - - warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN - """The minimum length of the resampling buffer that will emit a warning. - - If a buffer grows bigger than this value, it will emit a warning in the - logs, so buffers don't grow too big inadvertently. - - It must be at least 1 and at most `max_buffer_len`. - """ - - max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX - """The maximum length of the resampling buffer. - - Buffers won't be allowed to grow beyond this point even if it would be - needed to keep all the requested past sampling periods. An error will be - emitted in the logs if the buffer length needs to be truncated to this - value. - - It must be at bigger than `warn_buffer_len`. - """ - - align_to: datetime | None = UNIX_EPOCH - """The time to align the resampling period to. - - The resampling period will be aligned to this time, so the first resampled - sample will be at the first multiple of `resampling_period` starting from - `align_to`. It must be an aware datetime and can be in the future too. - - If `align_to` is `None`, the resampling period will be aligned to the - time the resampler is created. - """ - - def __post_init__(self) -> None: - """Check that config values are valid. - - Raises: - ValueError: If any value is out of range. - """ - if self.resampling_period.total_seconds() < 0.0: - raise ValueError( - f"resampling_period ({self.resampling_period}) must be positive" - ) - if self.max_data_age_in_periods < 1.0: - raise ValueError( - f"max_data_age_in_periods ({self.max_data_age_in_periods}) should be at least 1.0" - ) - if self.warn_buffer_len < 1: - raise ValueError( - f"warn_buffer_len ({self.warn_buffer_len}) should be at least 1" - ) - if self.max_buffer_len <= self.warn_buffer_len: - raise ValueError( - f"max_buffer_len ({self.max_buffer_len}) should " - f"be bigger than warn_buffer_len ({self.warn_buffer_len})" - ) - - if self.initial_buffer_len < 1: - raise ValueError( - f"initial_buffer_len ({self.initial_buffer_len}) should at least 1" - ) - if self.initial_buffer_len > self.max_buffer_len: - raise ValueError( - f"initial_buffer_len ({self.initial_buffer_len}) is bigger " - f"than max_buffer_len ({self.max_buffer_len}), use a smaller " - "initial_buffer_len or a bigger max_buffer_len" - ) - if self.initial_buffer_len > self.warn_buffer_len: - _logger.warning( - "initial_buffer_len (%s) is bigger than warn_buffer_len (%s)", - self.initial_buffer_len, - self.warn_buffer_len, - ) - if self.align_to is not None and self.align_to.tzinfo is None: - raise ValueError( - f"align_to ({self.align_to}) should be a timezone aware datetime" - ) - - -class SourceStoppedError(RuntimeError): - """A timeseries stopped producing samples.""" - - def __init__(self, source: Source) -> None: - """Create an instance. - - Args: - source: The source of the timeseries that stopped producing samples. - """ - super().__init__(f"Timeseries stopped producing samples, source: {source}") - self.source = source - """The source of the timeseries that stopped producing samples.""" - - def __repr__(self) -> str: - """Return the representation of the instance. - - Returns: - The representation of the instance. - """ - return f"{self.__class__.__name__}({self.source!r})" - - -class ResamplingError(RuntimeError): - """An Error ocurred while resampling. - - This error is a container for errors raised by the underlying sources and - or sinks. - """ - - def __init__( - self, - exceptions: dict[Source, Exception | asyncio.CancelledError], - ) -> None: - """Create an instance. - - Args: - exceptions: A mapping of timeseries source and the exception - encountered while resampling that timeseries. Note that the - error could be raised by the sink, while trying to send - a resampled data for this timeseries, the source key is only - used to identify the timeseries with the issue, it doesn't - necessarily mean that the error was raised by the source. The - underlying exception should provide information about what was - the actual source of the exception. - """ - super().__init__(f"Some error were found while resampling: {exceptions}") - self.exceptions = exceptions - """A mapping of timeseries source and the exception encountered. - - Note that the error could be raised by the sink, while trying to send - a resampled data for this timeseries, the source key is only used to - identify the timeseries with the issue, it doesn't necessarily mean - that the error was raised by the source. The underlying exception - should provide information about what was the actual source of the - exception. - """ - - def __repr__(self) -> str: - """Return the representation of the instance. - - Returns: - The representation of the instance. - """ - return f"{self.__class__.__name__}({self.exceptions=})" - - -@dataclass -class SourceProperties: - """Properties of a resampling source.""" - - sampling_start: datetime | None = None - """The time when resampling started for this source. - - `None` means it didn't started yet. - """ - - received_samples: int = 0 - """Total samples received by this source so far.""" - - sampling_period: timedelta | None = None - """The sampling period of this source. - - This means we receive (on average) one sample for this source every - `sampling_period` time. - - `None` means it is unknown. - """ - - class Resampler: """A timeseries resampler. @@ -575,7 +258,9 @@ def __init__(self, name: str, config: ResamplerConfig) -> None: """ self._name = name self._config = config - self._buffer: deque[Sample[Quantity]] = deque(maxlen=config.initial_buffer_len) + self._buffer: deque[tuple[datetime, float]] = deque( + maxlen=config.initial_buffer_len + ) self._source_properties: SourceProperties = SourceProperties() @property @@ -587,7 +272,7 @@ def source_properties(self) -> SourceProperties: """ return self._source_properties - def add_sample(self, sample: Sample[Quantity]) -> None: + def add_sample(self, sample: tuple[datetime, float]) -> None: """Add a new sample to the internal buffer. Args: @@ -595,7 +280,7 @@ def add_sample(self, sample: Sample[Quantity]) -> None: """ self._buffer.append(sample) if self._source_properties.sampling_start is None: - self._source_properties.sampling_start = sample.timestamp + self._source_properties.sampling_start = sample[0] self._source_properties.received_samples += 1 def _update_source_sample_period(self, now: datetime) -> bool: @@ -733,9 +418,9 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: min_index = bisect( self._buffer, minimum_relevant_timestamp, - key=lambda s: s.timestamp, + key=lambda s: s[0], ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s.timestamp) + max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: @@ -767,8 +452,8 @@ def _log_no_relevant_samples( if self._buffer: buffer_info = ( - f"{self._buffer[0].timestamp} - " - f"{self._buffer[-1].timestamp} ({len(self._buffer)} samples)" + f"{self._buffer[0][0]} - " + f"{self._buffer[-1][0]} ({len(self._buffer)} samples)" ) else: buffer_info = "Empty" @@ -826,7 +511,7 @@ async def _receive_samples(self) -> None: """ async for sample in self._source: if sample.value is not None and not sample.value.isnan(): - self._helper.add_sample(sample) + self._helper.add_sample((sample.timestamp, sample.value.base_value)) # We need the noqa because pydoclint can't figure out that `recv_exception` is an # `Exception` instance. diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index d4f9777b2..238b21245 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -11,8 +11,9 @@ import numpy as np import numpy.typing as npt +from frequenz.core.datetime import UNIX_EPOCH -from .._base_types import UNIX_EPOCH, QuantityT, Sample +from .._base_types import QuantityT, Sample FloatArray = TypeVar("FloatArray", list[float], npt.NDArray[np.float64]) """Type variable of the buffer container.""" diff --git a/src/frequenz/sdk/timeseries/battery_pool/_component_metric_fetcher.py b/src/frequenz/sdk/timeseries/battery_pool/_component_metric_fetcher.py index 676838f46..a6bba0c41 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_component_metric_fetcher.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_component_metric_fetcher.py @@ -104,15 +104,15 @@ async def async_new( """ self: Self = await super().async_new(component_id, metrics) + # pylint: disable=protected-access for metric in metrics: - # pylint: disable=protected-access if metric not in self._supported_metrics(): category = self._component_category() raise ValueError(f"Metric {metric} not supported for {category}") - # pylint: disable=protected-access self._receiver = await self._subscribe() self._max_waiting_time = MAX_BATTERY_DATA_AGE_SEC + # pylint: enable=protected-access return self async def fetch_next(self) -> ComponentMetricsData | None: diff --git a/tests/microgrid/test_datapipeline.py b/tests/microgrid/test_datapipeline.py index e9e688571..7a30163a6 100644 --- a/tests/microgrid/test_datapipeline.py +++ b/tests/microgrid/test_datapipeline.py @@ -19,7 +19,7 @@ from pytest_mock import MockerFixture from frequenz.sdk.microgrid._data_pipeline import _DataPipeline -from frequenz.sdk.timeseries._resampling import ResamplerConfig +from frequenz.sdk.timeseries import ResamplerConfig from ..utils.mock_microgrid_client import MockMicrogridClient diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index a1ddaf578..35687cac5 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -13,11 +13,14 @@ import pytest import time_machine from frequenz.channels import Broadcast, Sender +from frequenz.core.datetime import UNIX_EPOCH from frequenz.quantities import Quantity -from frequenz.sdk.timeseries import UNIX_EPOCH, Sample -from frequenz.sdk.timeseries._moving_window import MovingWindow -from frequenz.sdk.timeseries._resampling import ResamplerConfig +from frequenz.sdk.timeseries import ( + MovingWindow, + ResamplerConfig, + Sample, +) @pytest.fixture(autouse=True) diff --git a/tests/timeseries/test_periodic_feature_extractor.py b/tests/timeseries/test_periodic_feature_extractor.py index a61062ebb..2f29378ed 100644 --- a/tests/timeseries/test_periodic_feature_extractor.py +++ b/tests/timeseries/test_periodic_feature_extractor.py @@ -10,10 +10,10 @@ import numpy as np import pytest from frequenz.channels import Broadcast +from frequenz.core.datetime import UNIX_EPOCH from frequenz.quantities import Quantity from frequenz.sdk.timeseries import ( - UNIX_EPOCH, MovingWindow, PeriodicFeatureExtractor, Sample, diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 9d57ee458..dac74af86 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -17,19 +17,22 @@ from frequenz.quantities import Quantity from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import ( +from frequenz.sdk.timeseries._resampling._base_types import ( + Sink, + Source, + SourceProperties, +) +from frequenz.sdk.timeseries._resampling._config import ( DEFAULT_BUFFER_LEN_MAX, DEFAULT_BUFFER_LEN_WARN, - Resampler, ResamplerConfig, - ResamplingError, ResamplingFunction, - Sink, - Source, - SourceProperties, +) +from frequenz.sdk.timeseries._resampling._exceptions import ( + ResamplingError, SourceStoppedError, - _ResamplingHelper, ) +from frequenz.sdk.timeseries._resampling._resampler import Resampler, _ResamplingHelper from ..utils import a_sequence @@ -51,6 +54,12 @@ async def source_chan() -> AsyncIterator[Broadcast[Sample[Quantity]]]: await chan.close() +def as_float_tuple(sample: Sample[Quantity]) -> tuple[datetime, float]: + """Convert a sample to a tuple of datetime and float value.""" + assert sample.value is not None, "Sample value should not be None" + return (sample.timestamp, sample.value.base_value) + + async def _advance_time(fake_time: time_machine.Coordinates, seconds: float) -> None: """Advance the time by the given number of seconds. @@ -119,9 +128,11 @@ async def test_resampler_config_len_warn( ) assert config.initial_buffer_len == init_len # Ignore errors produced by wrongly finalized gRPC server in unrelated tests - assert _filter_logs(caplog.record_tuples) == [ + assert _filter_logs( + caplog.record_tuples, logger_name="frequenz.sdk.timeseries._resampling._config" + ) == [ ( - "frequenz.sdk.timeseries._resampling", + "frequenz.sdk.timeseries._resampling._config", logging.WARNING, f"initial_buffer_len ({init_len}) is bigger than " f"warn_buffer_len ({DEFAULT_BUFFER_LEN_WARN})", @@ -154,14 +165,14 @@ async def test_helper_buffer_too_big( helper = _ResamplingHelper("test", config) for i in range(DEFAULT_BUFFER_LEN_MAX + 1): - sample = Sample(datetime.now(timezone.utc), Quantity(i)) + sample = (datetime.now(timezone.utc), i) helper.add_sample(sample) await _advance_time(fake_time, 1) _ = helper.resample(datetime.now(timezone.utc)) # Ignore errors produced by wrongly finalized gRPC server in unrelated tests assert ( - "frequenz.sdk.timeseries._resampling", + "frequenz.sdk.timeseries._resampling._resampler", logging.ERROR, f"The new buffer length ({DEFAULT_BUFFER_LEN_MAX + 1}) " f"for timeseries test is too big, using {DEFAULT_BUFFER_LEN_MAX} instead", @@ -309,7 +320,7 @@ async def test_resampling_window_size_is_constant( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s), config, source_props + a_sequence(as_float_tuple(sample1s)), config, source_props ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -336,7 +347,13 @@ async def test_resampling_window_size_is_constant( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), config, source_props + a_sequence( + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), + config, + source_props, ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -402,7 +419,12 @@ async def test_timer_errors_are_logged( # pylint: disable=too-many-statements ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), config, source_props + a_sequence( + as_float_tuple(sample0s), + as_float_tuple(sample1s), + ), + config, + source_props, ) assert not [ *_filter_logs( @@ -436,7 +458,12 @@ async def test_timer_errors_are_logged( # pylint: disable=too-many-statements ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample2_5s, sample3s, sample4s), + a_sequence( + as_float_tuple(sample1s), + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), config, source_props, ) @@ -470,12 +497,18 @@ async def test_timer_errors_are_logged( # pylint: disable=too-many-statements ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample3s, sample4s, sample4_5s, sample5s, sample6s), + a_sequence( + as_float_tuple(sample3s), + as_float_tuple(sample4s), + as_float_tuple(sample4_5s), + as_float_tuple(sample5s), + as_float_tuple(sample6s), + ), config, source_props, ) assert ( - "frequenz.sdk.timeseries._resampling", + "frequenz.sdk.timeseries._resampling._resampler", logging.WARNING, "The resampling task woke up too late. Resampling should have started at " "1970-01-01 00:00:06+00:00, but it started at 1970-01-01 " @@ -541,7 +574,12 @@ async def test_future_samples_not_included( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), config, source_props # sample2_1s is not here + a_sequence( + as_float_tuple(sample0s), + as_float_tuple(sample1s), + ), + config, + source_props, # sample2_1s is not here ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=3, sampling_period=None @@ -566,7 +604,11 @@ async def test_future_samples_not_included( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample2_1s, sample3s), + a_sequence( + as_float_tuple(sample1s), + as_float_tuple(sample2_1s), + as_float_tuple(sample3s), + ), config, source_props, # sample4_1s is not here ) @@ -624,7 +666,11 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s), config, source_props + a_sequence( + as_float_tuple(sample1s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period=None @@ -651,7 +697,13 @@ async def test_resampling_with_one_window( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), config, source_props + a_sequence( + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), + config, + source_props, ) # By now we have a full buffer (5 samples and a buffer of length 4), which # we received in 4 seconds, so we have an input period of 0.8s. @@ -738,7 +790,12 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), config, source_props + a_sequence( + as_float_tuple(sample0s), + as_float_tuple(sample1s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period=None @@ -766,7 +823,13 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (1, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), config, source_props + a_sequence( + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period=None @@ -792,7 +855,13 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (3, 6] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample4s, sample5s, sample6s), config, source_props + a_sequence( + as_float_tuple(sample4s), + as_float_tuple(sample5s), + as_float_tuple(sample6s), + ), + config, + source_props, ) # By now we have a full buffer (7 samples and a buffer of length 6), which # we received in 4 seconds, so we have an input period of 6/7s. @@ -821,7 +890,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma ) # It should include samples in the interval (5, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample6s), + a_sequence(as_float_tuple(sample6s)), config, source_props, ) @@ -900,7 +969,12 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s, sample1s), config, source_props + a_sequence( + as_float_tuple(sample0s), + as_float_tuple(sample1s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period=None @@ -928,7 +1002,14 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (0, 4] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample2_5s, sample3s, sample4s), config, source_props + a_sequence( + as_float_tuple(sample1s), + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=5, sampling_period=None @@ -954,7 +1035,13 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (2, 6] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s, sample5s, sample6s), + a_sequence( + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + as_float_tuple(sample5s), + as_float_tuple(sample6s), + ), config, source_props, ) @@ -978,7 +1065,12 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen ) # It should include samples in the interval (4, 8] seconds resampling_fun_mock.assert_called_once_with( - a_sequence(sample5s, sample6s), config, source_props + a_sequence( + as_float_tuple(sample5s), + as_float_tuple(sample6s), + ), + config, + source_props, ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=7, sampling_period=None @@ -1043,7 +1135,7 @@ async def test_receiving_stopped_resampling_error( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample0s), config, source_props + a_sequence(as_float_tuple(sample0s)), config, source_props ) sink_mock.reset_mock() resampling_fun_mock.reset_mock() @@ -1178,7 +1270,13 @@ async def test_timer_is_aligned( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s, sample1_5s, sample2_5s, sample3s, sample4s), + a_sequence( + as_float_tuple(sample1s), + as_float_tuple(sample1_5s), + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), config, source_props, ) @@ -1244,7 +1342,7 @@ async def test_resampling_all_zeros( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample1s), config, source_props + a_sequence(as_float_tuple(sample1s)), config, source_props ) assert source_props == SourceProperties( sampling_start=timestamp, received_samples=2, sampling_period=None @@ -1271,7 +1369,13 @@ async def test_resampling_all_zeros( ) ) resampling_fun_mock.assert_called_once_with( - a_sequence(sample2_5s, sample3s, sample4s), config, source_props + a_sequence( + as_float_tuple(sample2_5s), + as_float_tuple(sample3s), + as_float_tuple(sample4s), + ), + config, + source_props, ) # By now we have a full buffer (5 samples and a buffer of length 4), which # we received in 4 seconds, so we have an input period of 0.8s. @@ -1313,7 +1417,7 @@ def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: def _filter_logs( record_tuples: list[tuple[str, int, str]], *, - logger_name: str = "frequenz.sdk.timeseries._resampling", + logger_name: str = "frequenz.sdk.timeseries._resampling._resampler", logger_level: int | None = None, ) -> list[tuple[str, int, str]]: return [