From ab73e1e3ea52105f9c7091a18b1a079dff86ea16 Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Fri, 1 Dec 2023 11:18:27 +0100 Subject: [PATCH 1/6] Remove unnecessary condition Signed-off-by: Matthias Wende --- src/frequenz/sdk/timeseries/_resampling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 9f5104d8a..301a8b125 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -548,7 +548,7 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: # enough samples before the first resampling, otherwise the initial window # to collect samples could be too small. now + period * 2 - elapsed, - period - elapsed if elapsed else timedelta(0), + period - elapsed, ) From 841e9b5f46ab916692319d1d8bf53d4d0bf7b8bc Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Sat, 2 Dec 2023 15:15:54 +0100 Subject: [PATCH 2/6] Enhance window_end calculation function Adds a boolean parameter if an extra period should be added to the calculated window end or not. Signed-off-by: Matthias Wende --- src/frequenz/sdk/timeseries/_resampling.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 301a8b125..dcabb6b0f 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -513,7 +513,9 @@ async def resample(self, *, one_shot: bool = False) -> None: if one_shot: break - def _calculate_window_end(self) -> tuple[datetime, timedelta]: + def _calculate_window_end( + self, extra_period: bool = True + ) -> tuple[datetime, timedelta]: """Calculate the end of the current resampling window. The calculated resampling window end is a multiple of @@ -525,10 +527,14 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: the end of the current resampling window will be more than one period away, to make sure to have some time to collect samples if the misalignment is too big. + Args: + extra_period: Add an extra period when it is not aligned to make sure we + collected enough samples before the first resampling, otherwise the + initial window to collect samples could be too small. Returns: A tuple with the end of the current resampling window aligned to `self._config.align_to` as the first item and the time we need to - delay the timer start to make sure it is also aligned. + delay the timer to make sure it is also aligned. """ now = datetime.now(timezone.utc) period = self._config.resampling_period @@ -543,11 +549,9 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: if not elapsed: return (now + period, timedelta(0)) + extra_period_factor = 2 if extra_period else 1 return ( - # We add an extra period when it is not aligned to make sure we collected - # enough samples before the first resampling, otherwise the initial window - # to collect samples could be too small. - now + period * 2 - elapsed, + now + period * extra_period_factor - elapsed, period - elapsed, ) From 9c08b5e54bacf05f7bfa4f4949482efcd23cae78 Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Fri, 17 Nov 2023 17:02:11 +0100 Subject: [PATCH 3/6] Move timer sync code to function Signed-off-by: Matthias Wende --- src/frequenz/sdk/timeseries/_resampling.py | 44 ++++++++++++++++------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index dcabb6b0f..87ed093a8 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -370,8 +370,10 @@ def __init__(self, config: ResamplerConfig) -> None: self._resamplers: dict[Source, _StreamingHelper] = {} """A mapping between sources and the streaming helper handling that source.""" - window_end, start_delay_time = self._calculate_window_end() - self._window_end: datetime = window_end + self._timer: Timer = Timer.periodic(config.resampling_period) + """The timer used to trigger the resampling windows.""" + + self._window_end = self._sync_timer() """The time in which the current window ends. This is used to make sure every resampling window is generated at @@ -384,16 +386,6 @@ def __init__(self, config: ResamplerConfig) -> None: the window end is deterministic. """ - self._timer: Timer = Timer.periodic(config.resampling_period) - """The timer used to trigger the resampling windows.""" - - # Hack to align the timer, this should be implemented in the Timer class - self._timer._next_tick_time = _to_microseconds( - timedelta(seconds=asyncio.get_running_loop().time()) - + config.resampling_period - + start_delay_time - ) # pylint: disable=protected-access - @property def config(self) -> ResamplerConfig: """Get the resampler configuration. @@ -457,6 +449,34 @@ def remove_timeseries(self, source: Source) -> bool: return False return True + def _sync_timer(self, extra_period: bool = True) -> datetime: + """Resync the timer. + + This method will resync the timer to the current time, so the next + resampling window will start at the next multiple of + `self._config.resampling_period` starting from now. + + Args: + extra_period: Add an extra period when it is not aligned to make sure we + collected enough samples before the first resampling, otherwise the + initial window to collect samples could be too small. + + Returns: + The end time of the resampling window. + """ + window_end, start_delay_time = self._calculate_window_end(extra_period) + + # Hack to align the timer, this should be implemented in the Timer class + self._timer._next_tick_time = ( # pylint: disable=protected-access + _to_microseconds( + timedelta(seconds=asyncio.get_running_loop().time()) + + self.config.resampling_period + + start_delay_time + ) + ) + + return window_end + async def resample(self, *, one_shot: bool = False) -> None: """Start resampling all known timeseries. From b270af7331f2f84a4f2f994d67a8827261086701 Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Fri, 17 Nov 2023 20:31:27 +0100 Subject: [PATCH 4/6] Add resync on system clock time changes to resampler Signed-off-by: Matthias Wende --- src/frequenz/sdk/timeseries/_resampling.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 87ed093a8..9da5b76a2 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -503,6 +503,14 @@ async def resample(self, *, one_shot: bool = False) -> None: async for drift in self._timer: now = datetime.now(tz=timezone.utc) + # If the system time changes, then `self._window_end` might drift to + # far away, such that the resampling bucket might be empty although + # new samples aligned to the new system time have been received. + # Thus we resync `self._window_end` to the new system time in case + # it drifted more then one resampling period away from the system time. + if abs(self._window_end - now) - drift > self._config.resampling_period: + self._window_end = self._sync_timer(extra_period=False) + if drift > tolerance: _logger.warning( "The resampling task woke up too late. Resampling should have " From 78e22b1efc5082374f92952d2c9c97e1a88fcf36 Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Tue, 28 Nov 2023 16:52:21 +0100 Subject: [PATCH 5/6] Add system clock changed resampler tests Signed-off-by: Matthias Wende --- tests/timeseries/test_resampling.py | 153 +++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 1 deletion(-) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index e017611f0..6010e648d 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -15,7 +15,7 @@ import time_machine from frequenz.channels import Broadcast, SenderError -from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries import UNIX_EPOCH, Sample from frequenz.sdk.timeseries._quantities import Quantity from frequenz.sdk.timeseries._resampling import ( DEFAULT_BUFFER_LEN_MAX, @@ -1195,6 +1195,157 @@ async def test_timer_is_aligned( resampling_fun_mock.reset_mock() +async def test_system_clock_changed_backwards( + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test that the resampler is able to handle system clock changes.""" + await _advance_time(fake_time, 600) + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=2.0, + initial_buffer_len=5, + ) + + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + await source_sender.send( + Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + # go back in time by 10 minutes + travaller = time_machine.travel(UNIX_EPOCH) + travaller.start() + timestamp = datetime.now(timezone.utc) + + await resampler.resample(one_shot=True) + + # The resampler will trigger two periods from now since when the resync + # happens some time is already passed and the next calculated `window_end` + # will always be more then one period in the future + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + None, + ) + ) + + # we sending some samples and run the resampler again without advancing the time + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller.stop() + + +async def test_system_clock_changed_forwards( + fake_time: time_machine.Coordinates, # pylint: disable=unused-argument + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test that the resampler is able to handle system clock changes.""" + timestamp = datetime.now(timezone.utc) + print(timestamp) + + resampling_period_s = 2 + + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=2.0, + initial_buffer_len=5, + ) + + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + await source_sender.send( + Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller = time_machine.travel(timestamp + timedelta(minutes=10)) + travaller.start() + timestamp = datetime.now(timezone.utc) + + await resampler.resample(one_shot=True) + + # The resampler will trigger two periods from now since when the resync + # happens some time is already passed and the next calculated `window_end` + # will always be more then one period in the future + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + None, + ) + ) + + # we sending some samples and run the resampler again without advancing the time + await source_sender.send( + Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0)) + ) + await source_sender.send( + Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0)) + ) + + await resampler.resample(one_shot=True) + sink_mock.assert_called_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(1.0), + ) + ) + + travaller.stop() + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From 4cc590235eac97be32f691f29f7fdb45e31622fe Mon Sep 17 00:00:00 2001 From: Matthias Wende Date: Sat, 2 Dec 2023 21:05:39 +0100 Subject: [PATCH 6/6] Update release notes Signed-off-by: Matthias Wende --- RELEASE_NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 9c0c6df9c..2a8fe2a3a 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -53,3 +53,4 @@ ## Bug Fixes - 0W power requests are now not adjusted to exclusion bounds by the `PowerManager` and `PowerDistributor`, and are sent over to the microgrid API directly. +- `timeseries.resampling` will now resync to the system time if it drifts away for more then a resample period.