diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 9da5b76a2..5c209d266 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -15,8 +15,8 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from frequenz.channels.util import Timer -from frequenz.channels.util._timer import _to_microseconds +from frequenz.channels import Receiver, ReceiverStoppedError +from typing_extensions import override from .._internal._asyncio import cancel_and_await from ._base_types import UNIX_EPOCH, Sample @@ -343,6 +343,239 @@ class SourceProperties: """ +@dataclass(frozen=True, kw_only=True) +class TimerInfo: + expected_tick_time: datetime + """The expected time when the timer should have triggered.""" + + drift: timedelta + """The difference between when the timer should have and did trigger.""" + + +class ResamplerTimer(Receiver[TimerInfo]): + """A timer to keep track of resampling windows.""" + + def __init__( # pylint: disable=too-many-arguments + self, + interval: timedelta, + *, + align_to: datetime | None, + ) -> None: + """Initialize this timer. + + See the class documentation for details. + + Args: + interval: The time between timer ticks. Must be at least + 1 microsecond. + + Raises: + RuntimeError: If it was called without a loop and there is no + running loop. + ValueError: If `interval` is not positive or is smaller than 1 + microsecond; if `start_delay` is negative or `start_delay` was specified + but `auto_start` is `False`. + """ + if interval < timedelta(microseconds=1): + raise ValueError( + f"The `interval` must be positive and at least 1 microsecond, not {interval}" + ) + + self._interval: timedelta = interval + """The time to between timer ticks.""" + + self._stopped: bool = True + """Whether the timer was requested to stop. + + If this is `False`, then the timer is running. + + If this is `True`, then it is stopped or there is a request to stop it + or it was not started yet: + + * If `_next_msg_time` is `None`, it means it wasn't started yet (it was + created with `auto_start=False`). Any receiving method will start + it by calling `reset()` in this case. + + * If `_next_msg_time` is not `None`, it means there was a request to + stop it. In this case receiving methods will raise + a `ReceiverClosedError`. + """ + + self._next_tick_time: datetime | None = None + """The absolute (monotonic) time when the timer should trigger. + + If this is `None`, it means the timer didn't start yet, but it should + be started as soon as it is used. + """ + + self._current_info: TimerInfo | None = None + """The difference between `_next_msg_time` and the triggered time. + + This is calculated by `ready()` but is returned by `consume()`. If + `None` it means `ready()` wasn't called and `consume()` will assert. + `consume()` will set it back to `None` to tell `ready()` that it needs + to wait again. + """ + + self.reset(align_to=align_to) + + @property + def interval(self) -> timedelta: + """The interval between timer ticks.""" + return self._interval + + @property + def is_running(self) -> bool: + """Whether the timer is running.""" + return not self._stopped + + def reset(self, *, align_to: datetime | None = None) -> None: + """Reset the timer to start timing from now (plus an optional delay). + + If the timer was stopped, or not started yet, it will be started. + + This can only be called with a running loop, see the class documentation for + more details. + + Args: + start_delay: The delay before the timer should start. This has microseconds + resolution, anything smaller than a microsecond means no delay. + + Raises: + RuntimeError: If it was called without a running loop. + ValueError: If `start_delay` is negative. + """ + self._stopped = False + + now = self._now() + interval = self._interval + next_tick = now + interval + + if align_to is not None: + elapsed = (now - align_to) % interval + next_tick = now + interval - elapsed + + self._next_tick_time = next_tick + self._current_info = None + + def stop(self) -> None: + """Stop the timer. + + Once `stop` has been called, all subsequent calls to `ready()` will + immediately return False and calls to `consume()` / `receive()` or any + use of the async iterator interface will raise + a `ReceiverStoppedError`. + + You can restart the timer with `reset()`. + """ + self._stopped = True + # We need to make sure it's not None, otherwise `ready()` will start it + self._next_tick_time = self._now() + + # We need a noqa here because the docs have a Raises section but the documented + # exceptions are raised indirectly. + @override + async def ready(self) -> bool: # noqa: DOC502 + """Wait until the timer `interval` passed. + + Once a call to `ready()` has finished, the resulting tick information + must be read with a call to `consume()` (`receive()` or iterated over) + to tell the timer it should wait for the next interval. + + The timer will remain ready (this method will return immediately) + until it is consumed. + + Returns: + Whether the timer was started and it is still running. + + Raises: + RuntimeError: If it was called without a running loop. + """ + # If there are messages waiting to be consumed, return immediately. + if self._current_info is not None: + return True + + # If `_next_tick_time` is `None`, it means it was created with + # `auto_start=False` and should be started. + if self._next_tick_time is None: + self.reset() + assert ( + self._next_tick_time is not None + ), "This should be assigned by reset()" + + # If a stop was explicitly requested, we bail out. + if self._stopped: + return False + + now = self._now() + time_to_next_tick = self._next_tick_time - now + + # If we didn't reach the tick yet, sleep until we do. + # We need to do this in a loop also reacting to the reset event, as the timer + # could be reset while we are sleeping, in which case we need to recalculate + # the time to the next tick and try again. + # It could also happen that the hardware clock is lagging, and when the wall + # clock is synced by NTP we could end up with the wall clock being in the *past* + while time_to_next_tick > timedelta(0): + await asyncio.sleep(time_to_next_tick.total_seconds()) + now = self._now() + time_to_next_tick = self._next_tick_time - now + + # If a stop was explicitly requested during the sleep, we bail out. + if self._stopped: + return False + + self._current_info = TimerInfo( + expected_tick_time=self._next_tick_time, drift=now - self._next_tick_time + ) + self._next_tick_time += self._interval + + return True + + @override + def consume(self) -> TimerInfo: + """Return the latest drift once `ready()` is complete. + + Once the timer has triggered (`ready()` is done), this method returns the + difference between when the timer should have triggered and the time when + it actually triggered. See the class documentation for more details. + + Returns: + The difference between when the timer should have triggered and the + time when it actually did. + + Raises: + ReceiverStoppedError: If the timer was stopped via `stop()`. + """ + # If it was stopped and there it no pending result, we raise + # (if there is a pending result, then we still want to return it first) + if self._stopped and self._current_info is None: + raise ReceiverStoppedError(self) + + assert ( + self._current_info is not None + ), "calls to `consume()` must be follow a call to `ready()`" + info = self._current_info + self._current_info = None + return info + + def _now(self) -> datetime: + """Return the current monotonic clock time in microseconds. + + Returns: + The current monotonic clock time in microseconds. + """ + return datetime.now(timezone.utc) + + def __str__(self) -> str: + """Return a string representation of this timer.""" + return f"{type(self).__name__}({self.interval})" + + def __repr__(self) -> str: + """Return a string representation of this timer.""" + return f"{type(self).__name__}<{self.interval=}, {self.is_running=}>" + + class Resampler: """A timeseries resampler. @@ -370,22 +603,11 @@ def __init__(self, config: ResamplerConfig) -> None: self._resamplers: dict[Source, _StreamingHelper] = {} """A mapping between sources and the streaming helper handling that source.""" - self._timer: Timer = Timer.periodic(config.resampling_period) + self._timer: ResamplerTimer = ResamplerTimer( + config.resampling_period, align_to=config.align_to + ) """The timer used to trigger the resampling windows.""" - self._window_end = self._sync_timer() - """The time in which the current window ends. - - This is used to make sure every resampling window is generated at - precise times. We can't rely on the timer timestamp because timers will - never fire at the exact requested time, so if we don't use a precise - time for the end of the window, the resampling windows we produce will - have different sizes. - - The window end will also be aligned to the `config.align_to` time, so - the window end is deterministic. - """ - @property def config(self) -> ResamplerConfig: """Get the resampler configuration. @@ -449,34 +671,6 @@ def remove_timeseries(self, source: Source) -> bool: return False return True - def _sync_timer(self, extra_period: bool = True) -> datetime: - """Resync the timer. - - This method will resync the timer to the current time, so the next - resampling window will start at the next multiple of - `self._config.resampling_period` starting from now. - - Args: - extra_period: Add an extra period when it is not aligned to make sure we - collected enough samples before the first resampling, otherwise the - initial window to collect samples could be too small. - - Returns: - The end time of the resampling window. - """ - window_end, start_delay_time = self._calculate_window_end(extra_period) - - # Hack to align the timer, this should be implemented in the Timer class - self._timer._next_tick_time = ( # pylint: disable=protected-access - _to_microseconds( - timedelta(seconds=asyncio.get_running_loop().time()) - + self.config.resampling_period - + start_delay_time - ) - ) - - return window_end - async def resample(self, *, one_shot: bool = False) -> None: """Start resampling all known timeseries. @@ -500,35 +694,31 @@ async def resample(self, *, one_shot: bool = False) -> None: seconds=self._config.resampling_period.total_seconds() / 10.0 ) - async for drift in self._timer: - now = datetime.now(tz=timezone.utc) - - # If the system time changes, then `self._window_end` might drift to - # far away, such that the resampling bucket might be empty although - # new samples aligned to the new system time have been received. - # Thus we resync `self._window_end` to the new system time in case - # it drifted more then one resampling period away from the system time. - if abs(self._window_end - now) - drift > self._config.resampling_period: - self._window_end = self._sync_timer(extra_period=False) + async for timer_info in self._timer: + now = datetime.now(timezone.utc) - if drift > tolerance: + if timer_info.drift > tolerance: _logger.warning( "The resampling task woke up too late. Resampling should have " - "started at %s, but it started at %s (tolerance: %s, " - "difference: %s; resampling period: %s)", - self._window_end, + "started at %s, but it was triggered at %s and was processed at %s" + "(tolerance: %s, trigger drift: %s, processing drift: %s; " + "resampling period: %s)", + timer_info.expected_tick_time, + timer_info.expected_tick_time + timer_info.drift, now, tolerance, - drift, + timer_info.drift, + now - timer_info.expected_tick_time, self._config.resampling_period, ) results = await asyncio.gather( - *[r.resample(self._window_end) for r in self._resamplers.values()], + *[ + r.resample(timer_info.expected_tick_time) + for r in self._resamplers.values() + ], return_exceptions=True, ) - - self._window_end += self._config.resampling_period exceptions = { source: results[i] for i, source in enumerate(self._resamplers) @@ -541,48 +731,6 @@ async def resample(self, *, one_shot: bool = False) -> None: if one_shot: break - def _calculate_window_end( - self, extra_period: bool = True - ) -> tuple[datetime, timedelta]: - """Calculate the end of the current resampling window. - - The calculated resampling window end is a multiple of - `self._config.resampling_period` starting at `self._config.align_to`. - - if `self._config.align_to` is `None`, the current time is used. - - If the current time is not aligned to `self._config.resampling_period`, then - the end of the current resampling window will be more than one period away, to - make sure to have some time to collect samples if the misalignment is too big. - - Args: - extra_period: Add an extra period when it is not aligned to make sure we - collected enough samples before the first resampling, otherwise the - initial window to collect samples could be too small. - Returns: - A tuple with the end of the current resampling window aligned to - `self._config.align_to` as the first item and the time we need to - delay the timer to make sure it is also aligned. - """ - now = datetime.now(timezone.utc) - period = self._config.resampling_period - align_to = self._config.align_to - - if align_to is None: - return (now + period, timedelta(0)) - - elapsed = (now - align_to) % period - - # If we are already in sync, we don't need to add an extra period - if not elapsed: - return (now + period, timedelta(0)) - - extra_period_factor = 2 if extra_period else 1 - return ( - now + period * extra_period_factor - elapsed, - period - elapsed, - ) - class _ResamplingHelper: """Keeps track of *relevant* samples to pass them to the resampling function.