Skip to content

Commit b147ad7

Browse files
committed
Add a warning if the resampling timer delay is too long
The resampling timer will always be fired after a resampling period has passed (unless we are unrealistically lucky). If we detect the timer fired much later than the resampling period (10%), we now emit a warning as there is probably something doing too much processing in the main thread, and we probably want to detect that issue early. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent c29139b commit b147ad7

File tree

2 files changed

+127
-5
lines changed

2 files changed

+127
-5
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,12 +447,12 @@ async def resample(self, *, one_shot: bool = False) -> None:
447447
timeseries from the resampler before calling this method
448448
again).
449449
"""
450-
async for _ in self._timer:
450+
async for timer_timestamp in self._timer:
451451
results = await asyncio.gather(
452452
*[r.resample(self._window_end) for r in self._resamplers.values()],
453453
return_exceptions=True,
454454
)
455-
self._update_window_end()
455+
self._update_window_end(timer_timestamp)
456456
exceptions = {
457457
source: results[i]
458458
for i, source in enumerate(self._resamplers)
@@ -465,7 +465,23 @@ async def resample(self, *, one_shot: bool = False) -> None:
465465
if one_shot:
466466
break
467467

468-
def _update_window_end(self) -> None:
468+
def _update_window_end(self, timer_timestamp: datetime) -> None:
469+
# We use abs() here to account for errors in the timer where the timer
470+
# fires before its time even when in theory it shouldn't be possible,
471+
# but we want to be resilient to timer implementation changes that
472+
# could end up in this case, either because there were some time jump
473+
# or some rounding error.
474+
timer_error_s = abs((timer_timestamp - self._window_end).total_seconds())
475+
if timer_error_s > (self._config.resampling_period_s / 10.0):
476+
_logger.warning(
477+
"The resampling timer fired too late. It should have fired at "
478+
"%s, but it fired at %s (%s seconds difference; resampling "
479+
"period is %s seconds)",
480+
self._window_end,
481+
timer_timestamp,
482+
timer_error_s,
483+
self._config.resampling_period_s,
484+
)
469485
self._window_end = self._window_end + timedelta(
470486
seconds=self._config.resampling_period_s
471487
)

tests/timeseries/test_resampling.py

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,105 @@ async def test_resampling_window_size_is_constant(
256256
resampling_fun_mock.reset_mock()
257257

258258

259+
async def test_timer_errors_are_logged(
260+
fake_time: time_machine.Coordinates,
261+
source_chan: Broadcast[Sample],
262+
caplog: pytest.LogCaptureFixture,
263+
) -> None:
264+
"""Test that big differences between the expected window end and the fired timer are logged."""
265+
timestamp = datetime.now(timezone.utc)
266+
267+
resampling_period_s = 2
268+
expected_resampled_value = 42.0
269+
270+
resampling_fun_mock = MagicMock(
271+
spec=ResamplingFunction, return_value=expected_resampled_value
272+
)
273+
config = ResamplerConfig(
274+
resampling_period_s=resampling_period_s,
275+
max_data_age_in_periods=2.0,
276+
resampling_function=resampling_fun_mock,
277+
initial_buffer_len=4,
278+
)
279+
resampler = Resampler(config)
280+
281+
source_receiver = source_chan.new_receiver()
282+
source_sender = source_chan.new_sender()
283+
284+
sink_mock = AsyncMock(spec=Sink, return_value=True)
285+
286+
resampler.add_timeseries("test", source_receiver, sink_mock)
287+
source_props = resampler.get_source_properties(source_receiver)
288+
289+
# Test timeline
290+
#
291+
# t(s) 0 1 2 2.5 3 4
292+
# |----------|----------R----|-----|----------R-----> (no more samples)
293+
# value 5.0 12.0 2.0 4.0 5.0
294+
#
295+
# R = resampling is done
296+
297+
# Send a few samples and run a resample tick, advancing the fake time by one period
298+
sample0s = Sample(timestamp, value=5.0)
299+
sample1s = Sample(timestamp + timedelta(seconds=1.0), value=12.0)
300+
await source_sender.send(sample0s)
301+
await source_sender.send(sample1s)
302+
fake_time.shift(resampling_period_s * 1.0999) # Timer is delayed 9.99%
303+
await resampler.resample(one_shot=True)
304+
305+
assert datetime.now(timezone.utc).timestamp() == pytest.approx(2.1998)
306+
sink_mock.assert_called_once_with(
307+
Sample(
308+
timestamp + timedelta(seconds=resampling_period_s), expected_resampled_value
309+
)
310+
)
311+
resampling_fun_mock.assert_called_once_with(
312+
a_sequence(sample0s, sample1s), config, source_props
313+
)
314+
assert not [
315+
*_filter_logs(
316+
caplog.record_tuples,
317+
logger_level=logging.WARNING,
318+
)
319+
]
320+
sink_mock.reset_mock()
321+
resampling_fun_mock.reset_mock()
322+
323+
# Second resampling run, now with 10% delay
324+
sample2_5s = Sample(timestamp + timedelta(seconds=2.5), value=2.0)
325+
sample3s = Sample(timestamp + timedelta(seconds=3), value=4.0)
326+
sample4s = Sample(timestamp + timedelta(seconds=4), value=5.0)
327+
await source_sender.send(sample2_5s)
328+
await source_sender.send(sample3s)
329+
await source_sender.send(sample4s)
330+
fake_time.shift(resampling_period_s * 1.10) # Timer delayed 10%
331+
await resampler.resample(one_shot=True)
332+
333+
assert datetime.now(timezone.utc).timestamp() == pytest.approx(2.1998 + 2.2)
334+
sink_mock.assert_called_once_with(
335+
Sample(
336+
# But the sample still gets 4s as timestamp, because we are keeping
337+
# the window size constant, not dependent on when the timer fired
338+
timestamp + timedelta(seconds=resampling_period_s * 2),
339+
expected_resampled_value,
340+
)
341+
)
342+
resampling_fun_mock.assert_called_once_with(
343+
a_sequence(sample1s, sample2_5s, sample3s, sample4s),
344+
config,
345+
source_props,
346+
)
347+
assert (
348+
"frequenz.sdk.timeseries._resampling",
349+
logging.WARNING,
350+
"The resampling timer fired too late. It should have fired at 1970-01-01 00:00:04+00:00, "
351+
"but it fired at 1970-01-01 00:00:04.399800+00:00 (0.3998 seconds difference; resampling "
352+
"period is 2 seconds)",
353+
) in _filter_logs(caplog.record_tuples, logger_level=logging.WARNING)
354+
sink_mock.reset_mock()
355+
resampling_fun_mock.reset_mock()
356+
357+
259358
async def test_resampling_with_one_window(
260359
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
261360
) -> None:
@@ -789,6 +888,13 @@ def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int:
789888

790889

791890
def _filter_logs(
792-
record_tuples: list[tuple[str, int, str]], *, logger_name: str
891+
record_tuples: list[tuple[str, int, str]],
892+
*,
893+
logger_name: str,
894+
logger_level: int | None = None,
793895
) -> list[tuple[str, int, str]]:
794-
return [t for t in record_tuples if t[0] == logger_name]
896+
return [
897+
t
898+
for t in record_tuples
899+
if t[0] == logger_name and (logger_level is None or logger_level == t[1])
900+
]

0 commit comments

Comments
 (0)