Skip to content

Commit c29139b

Browse files
committed
Make resampling window constant
The resampling window depends on when the resampling timer was fired. This is wrong, because it makes the resampling window not constant, as in general the timer will fire a bit after the resampling period has passed, so it tends to be a bit bigger than resampling period. This commit fixes this and uses a constant resampling window, always emitting new samples with an exact multiple of the resampling period. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 23e3d8e commit c29139b

File tree

2 files changed

+103
-3
lines changed

2 files changed

+103
-3
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from bisect import bisect
1313
from collections import deque
1414
from dataclasses import dataclass
15-
from datetime import datetime, timedelta
15+
from datetime import datetime, timedelta, timezone
1616
from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence
1717

1818
from frequenz.channels.util import Timer
@@ -354,6 +354,18 @@ def __init__(self, config: ResamplerConfig) -> None:
354354
self._timer: Timer = Timer(config.resampling_period_s)
355355
"""The timer to trigger the next resampling."""
356356

357+
self._window_end: datetime = datetime.now(timezone.utc) + timedelta(
358+
seconds=self._config.resampling_period_s
359+
)
360+
"""The time in which the current window ends.
361+
362+
This is used to make sure every resampling window is generated at
363+
precise times. We can't rely on the timer timestamp because timers will
364+
never fire at the exact requested time, so if we don't use a precise
365+
time for the end of the window, the resampling windows we produce will
366+
have different sizes.
367+
"""
368+
357369
@property
358370
def config(self) -> ResamplerConfig:
359371
"""Get the resampler configuration.
@@ -435,11 +447,12 @@ async def resample(self, *, one_shot: bool = False) -> None:
435447
timeseries from the resampler before calling this method
436448
again).
437449
"""
438-
async for timer_timestamp in self._timer:
450+
async for _ in self._timer:
439451
results = await asyncio.gather(
440-
*[r.resample(timer_timestamp) for r in self._resamplers.values()],
452+
*[r.resample(self._window_end) for r in self._resamplers.values()],
441453
return_exceptions=True,
442454
)
455+
self._update_window_end()
443456
exceptions = {
444457
source: results[i]
445458
for i, source in enumerate(self._resamplers)
@@ -452,6 +465,11 @@ async def resample(self, *, one_shot: bool = False) -> None:
452465
if one_shot:
453466
break
454467

468+
def _update_window_end(self) -> None:
469+
self._window_end = self._window_end + timedelta(
470+
seconds=self._config.resampling_period_s
471+
)
472+
455473

456474
class _ResamplingHelper:
457475
"""Keeps track of *relevant* samples to pass them to the resampling function.

tests/timeseries/test_resampling.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,88 @@ async def test_helper_buffer_too_big(
174174
assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX
175175

176176

177+
async def test_resampling_window_size_is_constant(
178+
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
179+
) -> None:
180+
"""Test resampling window size is consistent."""
181+
timestamp = datetime.now(timezone.utc)
182+
183+
resampling_period_s = 2
184+
expected_resampled_value = 42.0
185+
186+
resampling_fun_mock = MagicMock(
187+
spec=ResamplingFunction, return_value=expected_resampled_value
188+
)
189+
config = ResamplerConfig(
190+
resampling_period_s=resampling_period_s,
191+
max_data_age_in_periods=1.0,
192+
resampling_function=resampling_fun_mock,
193+
initial_buffer_len=4,
194+
)
195+
resampler = Resampler(config)
196+
197+
source_receiver = source_chan.new_receiver()
198+
source_sender = source_chan.new_sender()
199+
200+
sink_mock = AsyncMock(spec=Sink, return_value=True)
201+
202+
resampler.add_timeseries("test", source_receiver, sink_mock)
203+
source_props = resampler.get_source_properties(source_receiver)
204+
205+
# Test timeline
206+
#
207+
# t(s) 0 1 2 2.5 3 4
208+
# |----------|----------R----|-----|----------R-----> (no more samples)
209+
# value 5.0 12.0 2.0 4.0 5.0
210+
#
211+
# R = resampling is done
212+
213+
# Send a few samples and run a resample tick, advancing the fake time by one period
214+
sample0s = Sample(timestamp, value=5.0)
215+
sample1s = Sample(timestamp + timedelta(seconds=1), value=12.0)
216+
await source_sender.send(sample0s)
217+
await source_sender.send(sample1s)
218+
fake_time.shift(resampling_period_s) # timer matches resampling period
219+
await resampler.resample(one_shot=True)
220+
221+
assert datetime.now(timezone.utc).timestamp() == 2
222+
sink_mock.assert_called_once_with(
223+
Sample(
224+
timestamp + timedelta(seconds=resampling_period_s), expected_resampled_value
225+
)
226+
)
227+
resampling_fun_mock.assert_called_once_with(
228+
a_sequence(sample1s), config, source_props
229+
)
230+
sink_mock.reset_mock()
231+
resampling_fun_mock.reset_mock()
232+
233+
# Second resampling run
234+
sample2_5s = Sample(timestamp + timedelta(seconds=2.5), value=2.0)
235+
sample3s = Sample(timestamp + timedelta(seconds=3), value=4.0)
236+
sample4s = Sample(timestamp + timedelta(seconds=4), value=5.0)
237+
await source_sender.send(sample2_5s)
238+
await source_sender.send(sample3s)
239+
await source_sender.send(sample4s)
240+
fake_time.shift(resampling_period_s + 0.5) # Timer fired with some delay
241+
await resampler.resample(one_shot=True)
242+
243+
assert datetime.now(timezone.utc).timestamp() == 4.5
244+
sink_mock.assert_called_once_with(
245+
Sample(
246+
# But the sample still gets 4s as timestamp, because we are keeping
247+
# the window size constant, not dependent on when the timer fired
248+
timestamp + timedelta(seconds=resampling_period_s * 2),
249+
expected_resampled_value,
250+
)
251+
)
252+
resampling_fun_mock.assert_called_once_with(
253+
a_sequence(sample2_5s, sample3s, sample4s), config, source_props
254+
)
255+
sink_mock.reset_mock()
256+
resampling_fun_mock.reset_mock()
257+
258+
177259
async def test_resampling_with_one_window(
178260
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
179261
) -> None:

0 commit comments

Comments
 (0)