Skip to content

Commit 541d3c9

Browse files
committed
Update the buffer length based on the input sampling period
When the input sampling period is know, we can adjust the internal buffer to have a good size to be able to store all the requested resampling and not waste space if the initial buffer was too big. If we are upsampling, one sample could be enough for back-filling, but we store max_data_age_in_periods using the input sampling period, so the resampling functions can do more complex inter/extrapolation if they need to. If we are upsampling, we want a buffer that can hold max_data_age_in_periods * resampling_period_s seconds of data, and we have one sample every input_sampling_period_s, so we use a buffer length of: max_data_age_in_periods * resampling_period_s / input_sampling_period_s The buffer size, like the input sampling period, is only calculated once. The buffer data is copied to a new buffer when/if the buffer needs resizing. The new size is validated against config.warn_buffer_len and config.max_buffer_len and will emit a warning if it goes above any of those and if it goes above config.max_buffer_len it will be truncated to config.max_buffer_len as a safety measure. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 3513f90 commit 541d3c9

File tree

2 files changed

+116
-1
lines changed

2 files changed

+116
-1
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,65 @@ def _update_source_sample_period(self, now: datetime) -> bool:
531531
)
532532
return True
533533

534+
def _update_buffer_len(self) -> bool:
535+
"""Update the length of the buffer based on the source properties.
536+
537+
Returns:
538+
Whether the buffer length was changed (was really updated).
539+
"""
540+
input_sampling_period_s = self._source_properties.sampling_period_s
541+
542+
# To make type checking happy
543+
assert input_sampling_period_s is not None
544+
assert self._buffer.maxlen is not None
545+
546+
config = self._config
547+
548+
# If we are upsampling, one sample could be enough for back-filling, but
549+
# we store max_data_age_in_periods for input periods, so resampling
550+
# functions can do more complex inter/extrapolation if they need to.
551+
if input_sampling_period_s > config.resampling_period_s:
552+
new_buffer_len = input_sampling_period_s * config.max_data_age_in_periods
553+
# If we are upsampling, we want a buffer that can hold
554+
# max_data_age_in_periods * resampling_period_s seconds of data, and we
555+
# one sample every input_sampling_period_s.
556+
else:
557+
new_buffer_len = (
558+
config.resampling_period_s
559+
/ input_sampling_period_s
560+
* config.max_data_age_in_periods
561+
)
562+
563+
new_buffer_len = max(1, math.ceil(new_buffer_len))
564+
if new_buffer_len > config.max_buffer_len:
565+
_logger.error(
566+
"The new buffer length (%s) for timeseries %s is too big, using %s instead",
567+
new_buffer_len,
568+
self._name,
569+
config.max_buffer_len,
570+
)
571+
new_buffer_len = config.max_buffer_len
572+
elif new_buffer_len > config.warn_buffer_len:
573+
_logger.warning(
574+
"The new buffer length (%s) for timeseries %s bigger than %s",
575+
new_buffer_len,
576+
self._name,
577+
config.warn_buffer_len,
578+
)
579+
580+
if new_buffer_len == self._buffer.maxlen:
581+
return False
582+
583+
_logger.info(
584+
"New buffer length calculated for %r: %s",
585+
self._name,
586+
new_buffer_len,
587+
)
588+
589+
self._buffer = deque(self._buffer, maxlen=new_buffer_len)
590+
591+
return True
592+
534593
def resample(self, timestamp: datetime) -> Sample:
535594
"""Generate a new sample based on all the current *relevant* samples.
536595
@@ -543,7 +602,8 @@ def resample(self, timestamp: datetime) -> Sample:
543602
If there are no *relevant* samples, then the new sample will
544603
have `None` as `value`.
545604
"""
546-
self._update_source_sample_period(timestamp)
605+
if self._update_source_sample_period(timestamp):
606+
self._update_buffer_len()
547607

548608
conf = self._config
549609
props = self._source_properties

tests/timeseries/test_resampling.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
Tests for the `TimeSeriesResampler`
66
"""
77

8+
import logging
89
from datetime import datetime, timedelta, timezone
910
from typing import AsyncIterator, Iterator
1011
from unittest.mock import AsyncMock, MagicMock
@@ -26,6 +27,7 @@
2627
Source,
2728
SourceProperties,
2829
SourceStoppedError,
30+
_ResamplingHelper,
2931
)
3032

3133
from ..utils import a_sequence
@@ -137,6 +139,35 @@ async def test_resampler_config_len_error(init_len: int) -> None:
137139
)
138140

139141

142+
async def test_helper_buffer_too_big(
143+
fake_time: time_machine.Coordinates,
144+
caplog: pytest.LogCaptureFixture,
145+
) -> None:
146+
"""Test checks on the resampling buffer."""
147+
config = ResamplerConfig(
148+
resampling_period_s=DEFAULT_BUFFER_LEN_MAX + 1,
149+
max_data_age_in_periods=1,
150+
)
151+
helper = _ResamplingHelper("test", config)
152+
153+
for i in range(DEFAULT_BUFFER_LEN_MAX + 1):
154+
sample = Sample(datetime.now(timezone.utc), i)
155+
helper.add_sample(sample)
156+
fake_time.shift(1)
157+
158+
_ = helper.resample(datetime.now(timezone.utc))
159+
assert caplog.record_tuples == [
160+
(
161+
"frequenz.sdk.timeseries._resampling",
162+
logging.ERROR,
163+
f"The new buffer length ({DEFAULT_BUFFER_LEN_MAX + 1}) "
164+
f"for timeseries test is too big, using {DEFAULT_BUFFER_LEN_MAX} instead",
165+
)
166+
]
167+
# pylint: disable=protected-access
168+
assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX
169+
170+
140171
async def test_resampling_with_one_window(
141172
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
142173
) -> None:
@@ -193,6 +224,7 @@ async def test_resampling_with_one_window(
193224
assert source_props == SourceProperties(
194225
sampling_start=timestamp, received_samples=2, sampling_period_s=None
195226
)
227+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
196228
sink_mock.reset_mock()
197229
resampling_fun_mock.reset_mock()
198230

@@ -221,6 +253,9 @@ async def test_resampling_with_one_window(
221253
assert source_props == SourceProperties(
222254
sampling_start=timestamp, received_samples=5, sampling_period_s=0.8
223255
)
256+
# The buffer should be able to hold 2 seconds of data, and data is coming
257+
# every 0.8 seconds, so we should be able to store 3 samples.
258+
assert _get_buffer_len(resampler, source_recvr) == 3
224259
sink_mock.reset_mock()
225260
resampling_fun_mock.reset_mock()
226261

@@ -236,6 +271,7 @@ async def test_resampling_with_one_window(
236271
assert source_props == SourceProperties(
237272
sampling_start=timestamp, received_samples=5, sampling_period_s=0.8
238273
)
274+
assert _get_buffer_len(resampler, source_recvr) == 3
239275

240276

241277
# Even when a lot could be refactored to use smaller functions, I'm allowing
@@ -297,6 +333,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma
297333
assert source_props == SourceProperties(
298334
sampling_start=timestamp, received_samples=2, sampling_period_s=None
299335
)
336+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
300337
sink_mock.reset_mock()
301338
resampling_fun_mock.reset_mock()
302339

@@ -324,6 +361,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma
324361
assert source_props == SourceProperties(
325362
sampling_start=timestamp, received_samples=5, sampling_period_s=None
326363
)
364+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
327365
sink_mock.reset_mock()
328366
resampling_fun_mock.reset_mock()
329367

@@ -351,6 +389,10 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma
351389
assert source_props == SourceProperties(
352390
sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7
353391
)
392+
# The buffer should be able to hold 2 * 1.5 (3) seconds of data, and data
393+
# is coming every 6/7 seconds (~0.857s), so we should be able to store
394+
# 4 samples.
395+
assert _get_buffer_len(resampler, source_recvr) == 4
354396
sink_mock.reset_mock()
355397
resampling_fun_mock.reset_mock()
356398

@@ -386,6 +428,7 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma
386428
assert source_props == SourceProperties(
387429
sampling_start=timestamp, received_samples=7, sampling_period_s=6 / 7
388430
)
431+
assert _get_buffer_len(resampler, source_recvr) == 4
389432

390433

391434
# Even when a lot could be refactored to use smaller functions, I'm allowing
@@ -447,6 +490,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen
447490
assert source_props == SourceProperties(
448491
sampling_start=timestamp, received_samples=2, sampling_period_s=None
449492
)
493+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
450494
sink_mock.reset_mock()
451495
resampling_fun_mock.reset_mock()
452496

@@ -474,6 +518,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen
474518
assert source_props == SourceProperties(
475519
sampling_start=timestamp, received_samples=5, sampling_period_s=None
476520
)
521+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
477522
sink_mock.reset_mock()
478523
resampling_fun_mock.reset_mock()
479524

@@ -501,6 +546,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen
501546
assert source_props == SourceProperties(
502547
sampling_start=timestamp, received_samples=7, sampling_period_s=None
503548
)
549+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
504550
sink_mock.reset_mock()
505551
resampling_fun_mock.reset_mock()
506552

@@ -522,6 +568,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen
522568
assert source_props == SourceProperties(
523569
sampling_start=timestamp, received_samples=7, sampling_period_s=None
524570
)
571+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
525572
sink_mock.reset_mock()
526573
resampling_fun_mock.reset_mock()
527574

@@ -537,6 +584,7 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen
537584
assert source_props == SourceProperties(
538585
sampling_start=timestamp, received_samples=7, sampling_period_s=None
539586
)
587+
assert _get_buffer_len(resampler, source_recvr) == config.initial_buffer_len
540588

541589

542590
async def test_receiving_stopped_resampling_error(
@@ -643,3 +691,10 @@ async def make_fake_source() -> Source:
643691
assert fake_source in exceptions
644692
timeseries_error = exceptions[fake_source]
645693
assert isinstance(timeseries_error, TestException)
694+
695+
696+
def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int:
697+
# pylint: disable=protected-access
698+
blen = resampler._resamplers[source_recvr]._helper._buffer.maxlen
699+
assert blen is not None
700+
return blen

0 commit comments

Comments
 (0)