Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@
## Bug Fixes

- 0W power requests are now not adjusted to exclusion bounds by the `PowerManager` and `PowerDistributor`, and are sent over to the microgrid API directly.
- `timeseries.resampling` will now resync to the system time if it drifts away for more then a resample period.
70 changes: 51 additions & 19 deletions src/frequenz/sdk/timeseries/_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,10 @@ def __init__(self, config: ResamplerConfig) -> None:
self._resamplers: dict[Source, _StreamingHelper] = {}
"""A mapping between sources and the streaming helper handling that source."""

window_end, start_delay_time = self._calculate_window_end()
self._window_end: datetime = window_end
self._timer: Timer = Timer.periodic(config.resampling_period)
"""The timer used to trigger the resampling windows."""

self._window_end = self._sync_timer()
"""The time in which the current window ends.

This is used to make sure every resampling window is generated at
Expand All @@ -384,16 +386,6 @@ def __init__(self, config: ResamplerConfig) -> None:
the window end is deterministic.
"""

self._timer: Timer = Timer.periodic(config.resampling_period)
"""The timer used to trigger the resampling windows."""

# Hack to align the timer, this should be implemented in the Timer class
self._timer._next_tick_time = _to_microseconds(
timedelta(seconds=asyncio.get_running_loop().time())
+ config.resampling_period
+ start_delay_time
) # pylint: disable=protected-access

@property
def config(self) -> ResamplerConfig:
"""Get the resampler configuration.
Expand Down Expand Up @@ -457,6 +449,34 @@ def remove_timeseries(self, source: Source) -> bool:
return False
return True

def _sync_timer(self, extra_period: bool = True) -> datetime:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silly, but I would either remove this extra period, as is it only a startup glitch we are trying to fix (I guess it made sense in the original code because it was free), or to switch the default, as the normal situation seems to be not adding an extra period.

"""Resync the timer.

This method will resync the timer to the current time, so the next
resampling window will start at the next multiple of
`self._config.resampling_period` starting from now.

Args:
extra_period: Add an extra period when it is not aligned to make sure we
collected enough samples before the first resampling, otherwise the
initial window to collect samples could be too small.

Returns:
The end time of the resampling window.
"""
window_end, start_delay_time = self._calculate_window_end(extra_period)

# Hack to align the timer, this should be implemented in the Timer class
self._timer._next_tick_time = ( # pylint: disable=protected-access
_to_microseconds(
timedelta(seconds=asyncio.get_running_loop().time())
+ self.config.resampling_period
+ start_delay_time
)
)

return window_end

async def resample(self, *, one_shot: bool = False) -> None:
"""Start resampling all known timeseries.

Expand All @@ -483,6 +503,14 @@ async def resample(self, *, one_shot: bool = False) -> None:
async for drift in self._timer:
now = datetime.now(tz=timezone.utc)

# If the system time changes, then `self._window_end` might drift to
# far away, such that the resampling bucket might be empty although
# new samples aligned to the new system time have been received.
# Thus we resync `self._window_end` to the new system time in case
# it drifted more then one resampling period away from the system time.
if abs(self._window_end - now) - drift > self._config.resampling_period:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly this is still not enough. Both drifts need to be handled independently, the mixing of both could end up hiding issues. For example:

monotonic clock
0         1         2         3         4         5         6 
|---------|---------|---------|---------|---------|---------|---------> time
o         o         o         o         o         o
          t1        t2        t3        t4        t5                    expected ticks
            T1       T2       T3                     T4                 observed ticks
O           *        *        *                      *
|------------|------------|------------|------------|------------|-------> time
0            1            2            3            4            5
wall clock

resampling_period = 1.0, and the wall clock has an accumulating drift of 0.3s per second.

T1:
* now = ~0.9
* window_end = 1.0
* drift = 0.2
* abs(self._window_end - now) - drift > self._config.resampling_period
  abs(             1.0 - 0.9) - 0.2   > 1.0  
                          0.1 - 0.2   > 1.0
                               -0.1   > 1.0 => False

T2:
* now = ~1.7
* window_end = 2.0
* drift = 0.1
* abs(self._window_end - now) - drift > self._config.resampling_period
  abs(             2.0 - 1.7) - 0.1   > 1.0  
                          0.3 - 0.1   > 1.0
                                0.2   > 1.0 => False

T3:
* now = ~2.4
* window_end = 3.0
* drift = 0
* abs(self._window_end - now) - drift > self._config.resampling_period
  abs(             3.0 - 2.4) - 0.0   > 1.0  
                          0.6 - 0.0   > 1.0
                                0.6   > 1.0 => False

T4:
* now = ~4.1
* window_end = 4.0
* drift = 1.3
* abs(self._window_end - now) - drift > self._config.resampling_period
  abs(             4.0 - 4.1) - 1.3   > 1.0  
                          0.1 - 1.3   > 1.0
                               -1.2   > 1.0 => False

I don't know, from this maybe it's enough to use abs(abs(we - now) - abs(drift)) > period, I have to think about it a bit more. But before doing that, now I really wonder why we are doing all this... 🤔

Don't we just want to always sync to the wall clock, so effectively make the resampler count periods in terms of the wall clock instead of the monotonic clock?

O maybe we just want to emit warnings if the wall clock is drifting? Having a drifting wall clock is really an operational issue, so I'm not sure if we should try to fix it in the code, as it will probably bring all sorts of other issues too.

And actually I'm not even sure we should warn about it in the SDK, probably we need some system-level monitoring for this instead. Is like if there were a disk failure and we try to warn the user about it and cope with it by reallocating stuff. I think is beyond the SDK's responsibilities.

It really looks like we are trying to patch a broken system here, and I'm not sure if it is a good idea...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I quickly drafted a solution that I think makes everything much easier. I just created a new timer type that is attached to the wall clock especially for the Resampler. We could then polish it and move it to the channels repo if we think it is generally useful. For now it doesn't have all the complexity of the missing ticks policies of the channels timer, it only implements the trigger all missing policy that's needed by the resampler.

Here is the proof of concept PR, against this PR:

The resampler gets much simplied with this, as it doesn't need to care about any time management anymore.

self._window_end = self._sync_timer(extra_period=False)

if drift > tolerance:
_logger.warning(
"The resampling task woke up too late. Resampling should have "
Expand Down Expand Up @@ -513,7 +541,9 @@ async def resample(self, *, one_shot: bool = False) -> None:
if one_shot:
break

def _calculate_window_end(self) -> tuple[datetime, timedelta]:
def _calculate_window_end(
self, extra_period: bool = True
) -> tuple[datetime, timedelta]:
"""Calculate the end of the current resampling window.

The calculated resampling window end is a multiple of
Expand All @@ -525,10 +555,14 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]:
the end of the current resampling window will be more than one period away, to
make sure to have some time to collect samples if the misalignment is too big.

Args:
extra_period: Add an extra period when it is not aligned to make sure we
collected enough samples before the first resampling, otherwise the
initial window to collect samples could be too small.
Returns:
A tuple with the end of the current resampling window aligned to
`self._config.align_to` as the first item and the time we need to
delay the timer start to make sure it is also aligned.
delay the timer to make sure it is also aligned.
"""
now = datetime.now(timezone.utc)
period = self._config.resampling_period
Expand All @@ -543,12 +577,10 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]:
if not elapsed:
return (now + period, timedelta(0))

extra_period_factor = 2 if extra_period else 1
return (
# We add an extra period when it is not aligned to make sure we collected
# enough samples before the first resampling, otherwise the initial window
# to collect samples could be too small.
now + period * 2 - elapsed,
period - elapsed if elapsed else timedelta(0),
now + period * extra_period_factor - elapsed,
period - elapsed,
)


Expand Down
153 changes: 152 additions & 1 deletion tests/timeseries/test_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import time_machine
from frequenz.channels import Broadcast, SenderError

from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries import UNIX_EPOCH, Sample
from frequenz.sdk.timeseries._quantities import Quantity
from frequenz.sdk.timeseries._resampling import (
DEFAULT_BUFFER_LEN_MAX,
Expand Down Expand Up @@ -1195,6 +1195,157 @@ async def test_timer_is_aligned(
resampling_fun_mock.reset_mock()


async def test_system_clock_changed_backwards(
fake_time: time_machine.Coordinates,
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test that the resampler is able to handle system clock changes."""
await _advance_time(fake_time, 600)
timestamp = datetime.now(timezone.utc)

resampling_period_s = 2

config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=2.0,
initial_buffer_len=5,
)

resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)

await source_sender.send(
Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0))
)
await source_sender.send(
Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0))
)

await resampler.resample(one_shot=True)
sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(1.0),
)
)

# go back in time by 10 minutes
travaller = time_machine.travel(UNIX_EPOCH)
travaller.start()
timestamp = datetime.now(timezone.utc)

await resampler.resample(one_shot=True)

# The resampler will trigger two periods from now since when the resync
# happens some time is already passed and the next calculated `window_end`
# will always be more then one period in the future
sink_mock.assert_called_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
None,
)
)

# we sending some samples and run the resampler again without advancing the time
await source_sender.send(
Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0))
)
await source_sender.send(
Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0))
)

await resampler.resample(one_shot=True)
sink_mock.assert_called_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(1.0),
)
)

travaller.stop()


async def test_system_clock_changed_forwards(
fake_time: time_machine.Coordinates, # pylint: disable=unused-argument
source_chan: Broadcast[Sample[Quantity]],
) -> None:
"""Test that the resampler is able to handle system clock changes."""
timestamp = datetime.now(timezone.utc)
print(timestamp)

resampling_period_s = 2

config = ResamplerConfig(
resampling_period=timedelta(seconds=resampling_period_s),
max_data_age_in_periods=2.0,
initial_buffer_len=5,
)

resampler = Resampler(config)

source_receiver = source_chan.new_receiver()
source_sender = source_chan.new_sender()

sink_mock = AsyncMock(spec=Sink, return_value=True)

resampler.add_timeseries("test", source_receiver, sink_mock)

await source_sender.send(
Sample(timestamp + timedelta(seconds=1.0), value=Quantity(1.0))
)
await source_sender.send(
Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0))
)

await resampler.resample(one_shot=True)
sink_mock.assert_called_once_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(1.0),
)
)

travaller = time_machine.travel(timestamp + timedelta(minutes=10))
travaller.start()
timestamp = datetime.now(timezone.utc)

await resampler.resample(one_shot=True)

# The resampler will trigger two periods from now since when the resync
# happens some time is already passed and the next calculated `window_end`
# will always be more then one period in the future
sink_mock.assert_called_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
None,
)
)

# we sending some samples and run the resampler again without advancing the time
await source_sender.send(
Sample(timestamp + timedelta(seconds=2.0), value=Quantity(1.0))
)
await source_sender.send(
Sample(timestamp + timedelta(seconds=3.0), value=Quantity(1.0))
)

await resampler.resample(one_shot=True)
sink_mock.assert_called_with(
Sample(
timestamp + timedelta(seconds=resampling_period_s),
Quantity(1.0),
)
)

travaller.stop()


def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int:
# pylint: disable=protected-access
blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen
Expand Down