Skip to content

Commit a83b53b

Browse files
authored
Make resampling window size constant (#236)
The resampler was using the timer timestamp to do the resampling, but this is wrong because the timer could (and probably always will) fire in the *future* (after the resampling period has passed). The effect was that samples produced by the resampler were not perfectly spread using an exact resampling period, but they included the error produced by the timer firing This PR fixes this issue and also fixes another bug found, samples in the future could have been included in a resampling window. Now we only consider relevant samples that fit to the exact resampling window. Part of #170.
2 parents cda8210 + e8a6ecb commit a83b53b

File tree

3 files changed

+328
-21
lines changed

3 files changed

+328
-21
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616

1717
## Bug Fixes
1818

19-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
19+
* The resampler now correctly produces resampling windows of exact *resampling period* size, which only include samples emitted during the resampling window (see #170)

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from bisect import bisect
1313
from collections import deque
1414
from dataclasses import dataclass
15-
from datetime import datetime, timedelta
15+
from datetime import datetime, timedelta, timezone
1616
from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence
1717

1818
from frequenz.channels.util import Timer
@@ -346,8 +346,25 @@ def __init__(self, config: ResamplerConfig) -> None:
346346
config: The configuration for the resampler.
347347
"""
348348
self._config = config
349+
"""The configuration for this resampler."""
350+
349351
self._resamplers: dict[Source, _StreamingHelper] = {}
352+
"""A mapping between sources and the streaming helper handling that source."""
353+
350354
self._timer: Timer = Timer(config.resampling_period_s)
355+
"""The timer to trigger the next resampling."""
356+
357+
self._window_end: datetime = datetime.now(timezone.utc) + timedelta(
358+
seconds=self._config.resampling_period_s
359+
)
360+
"""The time in which the current window ends.
361+
362+
This is used to make sure every resampling window is generated at
363+
precise times. We can't rely on the timer timestamp because timers will
364+
never fire at the exact requested time, so if we don't use a precise
365+
time for the end of the window, the resampling windows we produce will
366+
have different sizes.
367+
"""
351368

352369
@property
353370
def config(self) -> ResamplerConfig:
@@ -432,9 +449,10 @@ async def resample(self, *, one_shot: bool = False) -> None:
432449
"""
433450
async for timer_timestamp in self._timer:
434451
results = await asyncio.gather(
435-
*[r.resample(timer_timestamp) for r in self._resamplers.values()],
452+
*[r.resample(self._window_end) for r in self._resamplers.values()],
436453
return_exceptions=True,
437454
)
455+
self._update_window_end(timer_timestamp)
438456
exceptions = {
439457
source: results[i]
440458
for i, source in enumerate(self._resamplers)
@@ -447,6 +465,27 @@ async def resample(self, *, one_shot: bool = False) -> None:
447465
if one_shot:
448466
break
449467

468+
def _update_window_end(self, timer_timestamp: datetime) -> None:
469+
# We use abs() here to account for errors in the timer where the timer
470+
# fires before its time even when in theory it shouldn't be possible,
471+
# but we want to be resilient to timer implementation changes that
472+
# could end up in this case, either because there were some time jump
473+
# or some rounding error.
474+
timer_error_s = abs((timer_timestamp - self._window_end).total_seconds())
475+
if timer_error_s > (self._config.resampling_period_s / 10.0):
476+
_logger.warning(
477+
"The resampling timer fired too late. It should have fired at "
478+
"%s, but it fired at %s (%s seconds difference; resampling "
479+
"period is %s seconds)",
480+
self._window_end,
481+
timer_timestamp,
482+
timer_error_s,
483+
self._config.resampling_period_s,
484+
)
485+
self._window_end = self._window_end + timedelta(
486+
seconds=self._config.resampling_period_s
487+
)
488+
450489

451490
class _ResamplingHelper:
452491
"""Keeps track of *relevant* samples to pass them to the resampling function.
@@ -622,14 +661,15 @@ def resample(self, timestamp: datetime) -> Sample:
622661
# We need to pass a dummy Sample to bisect because it only support
623662
# specifying a key extraction function in Python 3.10, so we need to
624663
# compare samples at the moment.
625-
cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
664+
min_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
665+
max_index = bisect(self._buffer, Sample(timestamp, None))
626666
# Using itertools for slicing doesn't look very efficient, but
627667
# experiements with a custom (ring) buffer that can slice showed that
628668
# it is not that bad. See:
629669
# https://github.com/frequenz-floss/frequenz-sdk-python/pull/130
630670
# So if we need more performance beyond this point, we probably need to
631671
# resort to some C (or similar) implementation.
632-
relevant_samples = list(itertools.islice(self._buffer, cut_index, None))
672+
relevant_samples = list(itertools.islice(self._buffer, min_index, max_index))
633673
value = (
634674
conf.resampling_function(relevant_samples, conf, props)
635675
if relevant_samples

0 commit comments

Comments
 (0)