Skip to content

Commit 093f8f1

Browse files
committed
Don't include future samples in resampling windows
Currently only too-old samples are being filtered out from the relevant samples by the resampler, but we should also filter out future samples, as it might happen that the timer was delayed and was triggered a while after a resampling period has passed, so we could have received samples in the future for the current window's perspective. This commit also filters these future samples and only consider relevant samples that were emitted during the current resampling period. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 2eb58b5 commit 093f8f1

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -661,14 +661,15 @@ def resample(self, timestamp: datetime) -> Sample:
661661
# We need to pass a dummy Sample to bisect because it only support
662662
# specifying a key extraction function in Python 3.10, so we need to
663663
# compare samples at the moment.
664-
cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
664+
min_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
665+
max_index = bisect(self._buffer, Sample(timestamp, None))
665666
# Using itertools for slicing doesn't look very efficient, but
666667
# experiements with a custom (ring) buffer that can slice showed that
667668
# it is not that bad. See:
668669
# https://github.com/frequenz-floss/frequenz-sdk-python/pull/130
669670
# So if we need more performance beyond this point, we probably need to
670671
# resort to some C (or similar) implementation.
671-
relevant_samples = list(itertools.islice(self._buffer, cut_index, None))
672+
relevant_samples = list(itertools.islice(self._buffer, min_index, max_index))
672673
value = (
673674
conf.resampling_function(relevant_samples, conf, props)
674675
if relevant_samples

tests/timeseries/test_resampling.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,91 @@ async def test_timer_errors_are_logged(
349349
resampling_fun_mock.reset_mock()
350350

351351

352+
async def test_future_samples_not_included(
353+
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
354+
) -> None:
355+
"""Test resampling window size is consistent."""
356+
timestamp = datetime.now(timezone.utc)
357+
358+
resampling_period_s = 2
359+
expected_resampled_value = 42.0
360+
361+
resampling_fun_mock = MagicMock(
362+
spec=ResamplingFunction, return_value=expected_resampled_value
363+
)
364+
config = ResamplerConfig(
365+
resampling_period_s=resampling_period_s,
366+
max_data_age_in_periods=2.0,
367+
resampling_function=resampling_fun_mock,
368+
initial_buffer_len=4,
369+
)
370+
resampler = Resampler(config)
371+
372+
source_receiver = source_chan.new_receiver()
373+
source_sender = source_chan.new_sender()
374+
375+
sink_mock = AsyncMock(spec=Sink, return_value=True)
376+
377+
resampler.add_timeseries("test", source_receiver, sink_mock)
378+
source_props = resampler.get_source_properties(source_receiver)
379+
380+
# Test timeline
381+
#
382+
# t(s) 0 1 1.9 2 3 4 4.1 4.2
383+
# |----------|--------|--R----------|----------R--|---|------------>
384+
# value 5.0 7.0 4.0 3.0 timer fires
385+
# (with ts=2.1)
386+
#
387+
# R = resampling is done
388+
389+
# Send a few samples and run a resample tick, advancing the fake time by one period
390+
sample0s = Sample(timestamp, value=5.0)
391+
sample1s = Sample(timestamp + timedelta(seconds=1), value=12.0)
392+
sample2_1s = Sample(timestamp + timedelta(seconds=2.1), value=7.0)
393+
await source_sender.send(sample0s)
394+
await source_sender.send(sample1s)
395+
await source_sender.send(sample2_1s)
396+
fake_time.shift(resampling_period_s)
397+
await resampler.resample(one_shot=True)
398+
399+
assert datetime.now(timezone.utc).timestamp() == 2
400+
sink_mock.assert_called_once_with(
401+
Sample(
402+
timestamp + timedelta(seconds=resampling_period_s), expected_resampled_value
403+
)
404+
)
405+
resampling_fun_mock.assert_called_once_with(
406+
a_sequence(sample0s, sample1s), config, source_props # sample2_1s is not here
407+
)
408+
assert source_props == SourceProperties(
409+
sampling_start=timestamp, received_samples=3, sampling_period_s=None
410+
)
411+
assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len
412+
sink_mock.reset_mock()
413+
resampling_fun_mock.reset_mock()
414+
415+
# Second resampling run
416+
sample3s = Sample(timestamp + timedelta(seconds=3), value=4.0)
417+
sample4_1s = Sample(timestamp + timedelta(seconds=4.1), value=3.0)
418+
await source_sender.send(sample3s)
419+
await source_sender.send(sample4_1s)
420+
fake_time.shift(resampling_period_s + 0.2)
421+
await resampler.resample(one_shot=True)
422+
423+
assert datetime.now(timezone.utc).timestamp() == 4.2
424+
sink_mock.assert_called_once_with(
425+
Sample(
426+
timestamp + timedelta(seconds=resampling_period_s * 2),
427+
expected_resampled_value,
428+
)
429+
)
430+
resampling_fun_mock.assert_called_once_with(
431+
a_sequence(sample1s, sample2_1s, sample3s),
432+
config,
433+
source_props, # sample4_1s is not here
434+
)
435+
436+
352437
async def test_resampling_with_one_window(
353438
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
354439
) -> None:

0 commit comments

Comments
 (0)