Skip to content

Commit 3a59bdd

Browse files
committed
Add initial buffer length to the resampler
This commit makes the resampler use a proper ring buffer instead of using an unbound buffer that only gets clear up when a resampling is done (which could easily end up in memory issues if the input sampling rate is much higher than the resampling rate). This also improves the performance of the resampling buy an average of 20% (on my local machine, your millage might vary), even when the current implementation now needs to copy the buffer when passed to the resampling function. Here are the results of running benchmarks/timeseries/resampling.py for the old implementation, the new implementation (without doing the bisection and copying) and the new complete implementation with the current fixable inefficiencies: OLD NEW NEW WITH BISECT resamples=10 samples=10 resamples=10 samples=10 resamples=10 samples=10 0.0008896420185919851 0.00062773801619187 0.0008012260077521205 resamples=10 samples=100 resamples=10 samples=100 resamples=10 samples=100 0.007817161997081712 0.005761806009104475 0.006307974021183327 resamples=10 samples=1000 resamples=10 samples=1000 resamples=10 samples=1000 0.07768873398890719 0.05851042701397091 0.0604277040110901 resamples=100 samples=10 resamples=100 samples=10 resamples=100 samples=10 0.008742492995224893 0.0062527229893021286 0.00808265499654226 resamples=100 samples=100 resamples=100 samples=100 resamples=100 samples=100 0.07808284999919124 0.057997508003609255 0.0624066719901748 resamples=100 samples=1000 resamples=100 samples=1000 resamples=100 samples=1000 0.782658567011822 0.5870920980232768 0.6098103950207587 resamples=1000 samples=10 resamples=1000 samples=10 resamples=1000 samples=10 0.08764891701866873 0.062448524025967345 0.07815460601705126 resamples=1000 samples=100 resamples=1000 samples=100 resamples=1000 samples=100 0.78426024899818 0.5858371119829826 0.6357307220168877 resamples=1000 samples=1000 resamples=1000 samples=1000 resamples=1000 samples=1000 7.513815971993608 5.984694316983223 6.42200836900156 Average improvement: 35.3% 19.7% This commit sadly introduces some nasty hack to be able to bisect the buffer, we need to make Sample ordered by timestamp because bisect doesn't support using a key extraction function until Python 3.10. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 42143f5 commit 3a59bdd

File tree

3 files changed

+47
-39
lines changed

3 files changed

+47
-39
lines changed

benchmarks/timeseries/resampling.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2525
resampling_period_s=1.0,
2626
max_data_age_in_periods=3.0,
2727
resampling_function=nop,
28+
initial_buffer_len=samples * 3,
2829
)
2930
)
3031
now = datetime.now(timezone.utc)

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33

44
"""Timeseries basic types."""
55

6-
from dataclasses import dataclass
6+
from dataclasses import dataclass, field
77
from datetime import datetime
88
from typing import Optional
99

1010

11-
@dataclass(frozen=True)
11+
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
12+
# wanted. We are using this order now because usually we need to do binary
13+
# searches on sequences of samples, and the Python `bisect` module doesn't
14+
# support providing a key until Python 3.10.
15+
@dataclass(frozen=True, order=True)
1216
class Sample:
1317
"""A measurement taken at a particular point in time.
1418
@@ -17,5 +21,8 @@ class Sample:
1721
coherent view on a group of component metrics for a particular timestamp.
1822
"""
1923

20-
timestamp: datetime
21-
value: Optional[float] = None
24+
timestamp: datetime = field(compare=True)
25+
"""The time when this sample was generated."""
26+
27+
value: Optional[float] = field(compare=False, default=None)
28+
"""The value of this sample."""

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
from __future__ import annotations
77

88
import asyncio
9+
import itertools
910
import logging
1011
import math
12+
from bisect import bisect
1113
from collections import deque
1214
from dataclasses import dataclass
1315
from datetime import datetime, timedelta
@@ -21,6 +23,15 @@
2123
_logger = logging.Logger(__name__)
2224

2325

26+
DEFAULT_BUFFER_LEN_INIT = 16
27+
"""Default initial buffer length.
28+
29+
Buffers will be created initially with this length, but they could grow or
30+
shrink depending on the source characteristics, like sampling rate, to make
31+
sure all the requested past sampling periods can be stored.
32+
"""
33+
34+
2435
Source = AsyncIterator[Sample]
2536
"""A source for a timeseries.
2637
@@ -113,6 +124,14 @@ class ResamplerConfig:
113124
value.
114125
"""
115126

127+
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
128+
"""The initial length of the resampling buffer.
129+
130+
The buffer could grow or shrink depending on the source characteristics,
131+
like sampling rate, to make sure all the requested past sampling periods
132+
can be stored.
133+
"""
134+
116135

117136
class SourceStoppedError(RuntimeError):
118137
"""A timeseries stopped producing samples."""
@@ -306,7 +325,7 @@ def __init__(self, config: ResamplerConfig) -> None:
306325
config: The configuration for the resampler.
307326
"""
308327
self._config = config
309-
self._buffer: deque[Sample] = deque()
328+
self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len)
310329

311330
def add_sample(self, sample: Sample) -> None:
312331
"""Add a new sample to the internal buffer.
@@ -316,30 +335,6 @@ def add_sample(self, sample: Sample) -> None:
316335
"""
317336
self._buffer.append(sample)
318337

319-
def _remove_outdated_samples(self, threshold: datetime) -> None:
320-
"""Remove samples that are older than the provided time threshold.
321-
322-
It is assumed that items in the buffer are in a sorted order (ascending order
323-
by timestamp).
324-
325-
The removal works by traversing the buffer starting from the oldest sample
326-
(smallest timestamp) and comparing sample's timestamp with the threshold.
327-
If the sample's threshold is smaller than `threshold`, it means that the
328-
sample is outdated and it is removed from the buffer. This continues until
329-
the first sample that is with timestamp greater or equal to `threshold` is
330-
encountered, then buffer is considered up to date.
331-
332-
Args:
333-
threshold: samples whose timestamp is older than the threshold are
334-
considered outdated and should be remove from the buffer
335-
"""
336-
while self._buffer:
337-
sample: Sample = self._buffer[0]
338-
if sample.timestamp > threshold:
339-
return
340-
341-
self._buffer.popleft()
342-
343338
def resample(self, timestamp: datetime) -> Sample:
344339
"""Generate a new sample based on all the current *relevant* samples.
345340
@@ -352,18 +347,23 @@ def resample(self, timestamp: datetime) -> Sample:
352347
If there are no *relevant* samples, then the new sample will
353348
have `None` as `value`.
354349
"""
355-
threshold = timestamp - timedelta(
356-
seconds=self._config.max_data_age_in_periods
357-
* self._config.resampling_period_s
350+
conf = self._config
351+
minimum_relevant_timestamp = timestamp - timedelta(
352+
seconds=conf.resampling_period_s * conf.max_data_age_in_periods
358353
)
359-
self._remove_outdated_samples(threshold=threshold)
360-
354+
# We need to pass a dummy Sample to bisect because it only support
355+
# specifying a key extraction function in Python 3.10, so we need to
356+
# compare samples at the moment.
357+
cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
358+
# pylint: disable=fixme
359+
# FIXME: This is far from efficient, but we don't want to start new
360+
# ring buffer implementation here that uses a list to overcome the
361+
# deque limitation of not being able to get slices
362+
relevant_samples = list(itertools.islice(self._buffer, cut_index, None))
361363
value = (
362-
None
363-
if not self._buffer
364-
else self._config.resampling_function(
365-
self._buffer, self._config.resampling_period_s
366-
)
364+
conf.resampling_function(relevant_samples, conf.resampling_period_s)
365+
if relevant_samples
366+
else None
367367
)
368368
return Sample(timestamp, value)
369369

0 commit comments

Comments
 (0)