diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9c0c6df9c..2a8fe2a3a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -53,3 +53,4 @@ ## Bug Fixes - 0W power requests are now not adjusted to exclusion bounds by the `PowerManager` and `PowerDistributor`, and are sent over to the microgrid API directly. +- `timeseries.resampling` will now resync to the system time if it drifts away for more then a resample period. diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 9f5104d8a..9da5b76a2 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -370,8 +370,10 @@ def __init__(self, config: ResamplerConfig) -> None: self._resamplers: dict[Source, _StreamingHelper] = {} """A mapping between sources and the streaming helper handling that source.""" - window_end, start_delay_time = self._calculate_window_end() - self._window_end: datetime = window_end + self._timer: Timer = Timer.periodic(config.resampling_period) + """The timer used to trigger the resampling windows.""" + + self._window_end = self._sync_timer() """The time in which the current window ends. This is used to make sure every resampling window is generated at @@ -384,16 +386,6 @@ def __init__(self, config: ResamplerConfig) -> None: the window end is deterministic. """ - self._timer: Timer = Timer.periodic(config.resampling_period) - """The timer used to trigger the resampling windows.""" - - # Hack to align the timer, this should be implemented in the Timer class - self._timer._next_tick_time = _to_microseconds( - timedelta(seconds=asyncio.get_running_loop().time()) - + config.resampling_period - + start_delay_time - ) # pylint: disable=protected-access - @property def config(self) -> ResamplerConfig: """Get the resampler configuration. @@ -457,6 +449,34 @@ def remove_timeseries(self, source: Source) -> bool: return False return True + def _sync_timer(self, extra_period: bool = True) -> datetime: + """Resync the timer. + + This method will resync the timer to the current time, so the next + resampling window will start at the next multiple of + `self._config.resampling_period` starting from now. + + Args: + extra_period: Add an extra period when it is not aligned to make sure we + collected enough samples before the first resampling, otherwise the + initial window to collect samples could be too small. + + Returns: + The end time of the resampling window. + """ + window_end, start_delay_time = self._calculate_window_end(extra_period) + + # Hack to align the timer, this should be implemented in the Timer class + self._timer._next_tick_time = ( # pylint: disable=protected-access + _to_microseconds( + timedelta(seconds=asyncio.get_running_loop().time()) + + self.config.resampling_period + + start_delay_time + ) + ) + + return window_end + async def resample(self, *, one_shot: bool = False) -> None: """Start resampling all known timeseries. @@ -483,6 +503,14 @@ async def resample(self, *, one_shot: bool = False) -> None: async for drift in self._timer: now = datetime.now(tz=timezone.utc) + # If the system time changes, then `self._window_end` might drift to + # far away, such that the resampling bucket might be empty although + # new samples aligned to the new system time have been received. + # Thus we resync `self._window_end` to the new system time in case + # it drifted more then one resampling period away from the system time. + if abs(self._window_end - now) - drift > self._config.resampling_period: + self._window_end = self._sync_timer(extra_period=False) + if drift > tolerance: _logger.warning( "The resampling task woke up too late. Resampling should have " @@ -513,7 +541,9 @@ async def resample(self, *, one_shot: bool = False) -> None: if one_shot: break - def _calculate_window_end(self) -> tuple[datetime, timedelta]: + def _calculate_window_end( + self, extra_period: bool = True + ) -> tuple[datetime, timedelta]: """Calculate the end of the current resampling window. The calculated resampling window end is a multiple of @@ -525,10 +555,14 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: the end of the current resampling window will be more than one period away, to make sure to have some time to collect samples if the misalignment is too big. + Args: + extra_period: Add an extra period when it is not aligned to make sure we + collected enough samples before the first resampling, otherwise the + initial window to collect samples could be too small. Returns: A tuple with the end of the current resampling window aligned to `self._config.align_to` as the first item and the time we need to - delay the timer start to make sure it is also aligned. + delay the timer to make sure it is also aligned. """ now = datetime.now(timezone.utc) period = self._config.resampling_period @@ -543,12 +577,10 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: if not elapsed: return (now + period, timedelta(0)) + extra_period_factor = 2 if extra_period else 1 return ( - # We add an extra period when it is not aligned to make sure we collected - # enough samples before the first resampling, otherwise the initial window - # to collect samples could be too small. - now + period * 2 - elapsed, - period - elapsed if elapsed else timedelta(0), + now + period * extra_period_factor - elapsed, + period - elapsed, ) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index e017611f0..6010e648d 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -15,7 +15,7 @@ import time_machine from frequenz.channels import Broadcast, SenderError -from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries import UNIX_EPOCH, Sample from frequenz.sdk.timeseries._quantities import Quantity from frequenz.sdk.timeseries._resampling import ( DEFAULT_BUFFER_LEN_MAX, @@ -1195,6 +1195,157 @@ async def test_timer_is_aligned( resampling_fun_mock.reset_mock() +async def test_system_clock_changed_backwards( + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test that the resampler is able to handle system clock changes.""" + await _advance_time(fake_time, 600) + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=2.0, + initial_buffer_len=5, + ) + + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + await source_sender.send( + Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + # go back in time by 10 minutes + travaller = time_machine.travel(UNIX_EPOCH) + travaller.start() + timestamp = datetime.now(timezone.utc) + + await resampler.resample(one_shot=True) + + # The resampler will trigger two periods from now since when the resync + # happens some time is already passed and the next calculated `window_end` + # will always be more then one period in the future + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + None, + ) + ) + + # we sending some samples and run the resampler again without advancing the time + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller.stop() + + +async def test_system_clock_changed_forwards( + fake_time: time_machine.Coordinates, # pylint: disable=unused-argument + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test that the resampler is able to handle system clock changes.""" + timestamp = datetime.now(timezone.utc) + print(timestamp) + + resampling_period_s = 2 + + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=2.0, + initial_buffer_len=5, + ) + + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + await source_sender.send( + Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller = time_machine.travel(timestamp + timedelta(minutes=10)) + travaller.start() + timestamp = datetime.now(timezone.utc) + + await resampler.resample(one_shot=True) + + # The resampler will trigger two periods from now since when the resync + # happens some time is already passed and the next calculated `window_end` + # will always be more then one period in the future + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + None, + ) + ) + + # we sending some samples and run the resampler again without advancing the time + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller.stop() + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen