Skip to content

Commit eb12a3b

Browse files
committed
Add a maximum resampling buffer size
This is mainly because in a near future the resampling buffer size will dynamically change based on the input sampling rate, so we need to make sure it doesn't get out of hand in the event of some unlikely high input sampling rate combined with a very low frequency resampling. We also add a warning size, for which we just emit a warning in the logs this is to also try to minimize the presence of very big buffers without affecting the running apps (unless again, it gets out of hand). When this happens users should either try to use a different resampling period to keep buffer smaller, or change the (default) warning buffer length if they know it is still manageable. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 3d80455 commit eb12a3b

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

benchmarks/timeseries/resampling.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
2626
max_data_age_in_periods=3.0,
2727
resampling_function=nop,
2828
initial_buffer_len=samples * 3,
29+
max_buffer_len=samples * 3,
30+
warn_buffer_len=samples * 3,
2931
)
3032
)
3133
now = datetime.now(timezone.utc)

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@
3232
"""
3333

3434

35+
DEFAULT_BUFFER_LEN_MAX = 1024
36+
"""Default maximum allowed buffer length.
37+
38+
If a buffer length would get bigger than this, it will be truncated to this
39+
length.
40+
"""
41+
42+
43+
DEFAULT_BUFFER_LEN_WARN = 128
44+
"""Default minimum buffer length that will produce a warning.
45+
46+
If a buffer length would get bigger than this, a warning will be logged.
47+
"""
48+
49+
3550
Source = AsyncIterator[Sample]
3651
"""A source for a timeseries.
3752
@@ -132,6 +147,51 @@ class ResamplerConfig:
132147
can be stored.
133148
"""
134149

150+
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
151+
"""The minimum length of the resampling buffer that will emit a warning.
152+
153+
If a buffer grows bigger than this value, it will emit a warning in the
154+
logs, so buffers don't grow too big inadvertly.
155+
"""
156+
157+
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
158+
"""The maximum length of the resampling buffer.
159+
160+
Buffers won't be allowed to grow beyond this point even if it would be
161+
needed to keep all the requested past sampling periods. An error will be
162+
emitted in the logs if the buffer length needs to be truncated to this
163+
value.
164+
"""
165+
166+
def __post_init__(self) -> None:
167+
"""Check config values are valid.
168+
169+
Raises:
170+
ValueError: if the initial buffer length is too small (less than 2)
171+
or too big (more than `max_buffer_len`).
172+
"""
173+
if self.initial_buffer_len < 2:
174+
raise ValueError(
175+
f"initial_buffer_len must be at least 2, got {self.initial_buffer_len}"
176+
)
177+
if self.initial_buffer_len > self.max_buffer_len:
178+
raise ValueError(
179+
f"initial_buffer_len be smaller than {self.max_buffer_len}, "
180+
"got {self.initial_buffer_len}"
181+
)
182+
if self.initial_buffer_len > self.warn_buffer_len:
183+
_logger.warning(
184+
"initial_buffer_len (%s) is bigger than %s",
185+
self.initial_buffer_len,
186+
self.warn_buffer_len,
187+
)
188+
if self.initial_buffer_len > self.warn_buffer_len:
189+
_logger.warning(
190+
"initial_buffer_len (%s) is bigger than warn_buffer_len (%s)",
191+
self.initial_buffer_len,
192+
self.warn_buffer_len,
193+
)
194+
135195

136196
class SourceStoppedError(RuntimeError):
137197
"""A timeseries stopped producing samples."""

tests/timeseries/test_resampling.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
from frequenz.sdk.timeseries import Sample
1818
from frequenz.sdk.timeseries._resampling import (
19+
DEFAULT_BUFFER_LEN_MAX,
20+
DEFAULT_BUFFER_LEN_WARN,
1921
Resampler,
2022
ResamplerConfig,
2123
ResamplingError,
@@ -78,6 +80,62 @@ async def _assert_no_more_samples( # pylint: disable=too-many-arguments
7880
resampling_fun_mock.reset_mock()
7981

8082

83+
@pytest.mark.parametrize("init_len", list(range(2, DEFAULT_BUFFER_LEN_WARN + 1, 16)))
84+
async def test_resampler_config_len_ok(
85+
init_len: int,
86+
caplog: pytest.LogCaptureFixture,
87+
) -> None:
88+
"""Test checks on the resampling buffer."""
89+
config = ResamplerConfig(
90+
resampling_period_s=1.0,
91+
initial_buffer_len=init_len,
92+
)
93+
assert config.initial_buffer_len == init_len
94+
assert caplog.records == []
95+
96+
97+
@pytest.mark.parametrize(
98+
"init_len",
99+
range(DEFAULT_BUFFER_LEN_WARN + 1, DEFAULT_BUFFER_LEN_MAX + 1, 64),
100+
)
101+
async def test_resampler_config_len_warn(
102+
init_len: int, caplog: pytest.LogCaptureFixture
103+
) -> None:
104+
"""Test checks on the resampling buffer."""
105+
config = ResamplerConfig(
106+
resampling_period_s=1.0,
107+
initial_buffer_len=init_len,
108+
)
109+
assert config.initial_buffer_len == init_len
110+
for record in caplog.records:
111+
assert record.levelname == "WARNING"
112+
assert caplog.text.startswith("")
113+
assert (
114+
caplog.text
115+
== f"initial_buffer_len ({init_len}) is bigger than {DEFAULT_BUFFER_LEN_WARN}"
116+
assert caplog.record_tuples == [
117+
(
118+
"frequenz.sdk.timeseries._resampling",
119+
logging.WARNING,
120+
f"initial_buffer_len ({init_len}) is bigger than "
121+
f"warn_buffer_len ({DEFAULT_BUFFER_LEN_WARN})",
122+
)
123+
]
124+
125+
126+
@pytest.mark.parametrize(
127+
"init_len",
128+
list(range(-2, 2)) + [DEFAULT_BUFFER_LEN_MAX + 1, DEFAULT_BUFFER_LEN_MAX + 2],
129+
)
130+
async def test_resampler_config_len_error(init_len: int) -> None:
131+
"""Test checks on the resampling buffer."""
132+
with pytest.raises(ValueError):
133+
_ = ResamplerConfig(
134+
resampling_period_s=1.0,
135+
initial_buffer_len=init_len,
136+
)
137+
138+
81139
async def test_resampling_with_one_window(
82140
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
83141
) -> None:

0 commit comments

Comments
 (0)