Skip to content

Commit 026544a

Browse files
llucaxjack-herrmann
authored andcommitted
Properly align the timer to align_to
By using the Timer.periodic, the alignment was lost. We now make the timer aligned to the align_to configuration by delaying its start. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent bbaf3d3 commit 026544a

File tree

2 files changed

+126
-20
lines changed

2 files changed

+126
-20
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence
1717

1818
from frequenz.channels.util import Timer
19+
from frequenz.channels.util._timer import _to_microseconds
1920

2021
from .._internal._asyncio import cancel_and_await
2122
from ._base_types import UNIX_EPOCH, Sample
@@ -369,7 +370,8 @@ def __init__(self, config: ResamplerConfig) -> None:
369370
self._resamplers: dict[Source, _StreamingHelper] = {}
370371
"""A mapping between sources and the streaming helper handling that source."""
371372

372-
self._window_end: datetime = self._calculate_window_end()
373+
window_end, start_delay_time = self._calculate_window_end()
374+
self._window_end: datetime = window_end
373375
"""The time in which the current window ends.
374376
375377
This is used to make sure every resampling window is generated at
@@ -385,6 +387,15 @@ def __init__(self, config: ResamplerConfig) -> None:
385387
self._timer: Timer = Timer.periodic(config.resampling_period)
386388
"""The timer used to trigger the resampling windows."""
387389

390+
# Hack to align the timer, this should be implemented in the Timer class
391+
# If there is no delay, then we need to add one period, because the timer
392+
# shouldn't fire right away
393+
if not start_delay_time:
394+
start_delay_time += config.resampling_period
395+
self._timer._next_tick_time = _to_microseconds(
396+
timedelta(seconds=asyncio.get_running_loop().time()) + start_delay_time
397+
) # pylint: disable=protected-access
398+
388399
@property
389400
def config(self) -> ResamplerConfig:
390401
"""Get the resampler configuration.
@@ -504,28 +515,36 @@ async def resample(self, *, one_shot: bool = False) -> None:
504515
if one_shot:
505516
break
506517

507-
def _calculate_window_end(self) -> datetime:
518+
def _calculate_window_end(self) -> tuple[datetime, timedelta]:
508519
"""Calculate the end of the current resampling window.
509520
510521
The calculated resampling window end is a multiple of
511522
`self._config.resampling_period` starting at `self._config.align_to`.
512523
513524
if `self._config.align_to` is `None`, the current time is used.
514525
526+
If the current time is not aligned to `self._config.resampling_period`, then
527+
the end of the current resampling window will be more than one period away, to
528+
make sure to have some time to collect samples if the misalignment is too big.
529+
515530
Returns:
516-
The end of the current resampling window aligned to
517-
`self._config.align_to`.
531+
A tuple with the end of the current resampling window aligned to
532+
`self._config.align_to` as the first item and the time we need to
533+
delay the timer start to make sure it is also aligned.
518534
"""
519535
now = datetime.now(timezone.utc)
520536
period = self._config.resampling_period
521537
align_to = self._config.align_to
522538

523539
if align_to is None:
524-
return now + period
540+
return (now + period, timedelta(0))
525541

526542
elapsed = (now - align_to) % period
527543

528-
return now + period - elapsed
544+
return (
545+
now + period - elapsed,
546+
period - elapsed if elapsed else timedelta(0),
547+
)
529548

530549

531550
class _ResamplingHelper:

tests/timeseries/test_resampling.py

Lines changed: 101 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -191,35 +191,47 @@ async def test_helper_buffer_too_big(
191191
1.0,
192192
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
193193
datetime(2020, 1, 1, tzinfo=timezone.utc),
194-
datetime(2020, 1, 1, 2, 3, 6, tzinfo=timezone.utc),
194+
(
195+
datetime(2020, 1, 1, 2, 3, 6, tzinfo=timezone.utc),
196+
timedelta(seconds=0.7),
197+
),
195198
),
196199
(
197200
3.0,
198201
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
199202
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
200-
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
203+
(
204+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
205+
timedelta(seconds=2.7),
206+
),
201207
),
202208
(
203209
10.0,
204210
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
205211
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
206-
datetime(2020, 1, 1, 2, 3, 15, tzinfo=timezone.utc),
212+
(
213+
datetime(2020, 1, 1, 2, 3, 15, tzinfo=timezone.utc),
214+
timedelta(seconds=9.7),
215+
),
207216
),
208217
# Future align_to
209218
(
210219
10.0,
211220
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
212221
datetime(2020, 1, 1, 2, 3, 18, tzinfo=timezone.utc),
213-
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
222+
(
223+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
224+
timedelta(seconds=2.7),
225+
),
214226
),
215227
),
216228
)
217-
def test_calculate_window_end_trivial_cases(
229+
async def test_calculate_window_end_trivial_cases(
218230
fake_time: time_machine.Coordinates,
219231
resampling_period_s: float,
220232
now: datetime,
221233
align_to: datetime,
222-
result: datetime,
234+
result: tuple[datetime, timedelta],
223235
) -> None:
224236
"""Test the calculation of the resampling window end for simple cases."""
225237
resampling_period = timedelta(seconds=resampling_period_s)
@@ -249,11 +261,10 @@ def test_calculate_window_end_trivial_cases(
249261
)
250262
fake_time.move_to(now)
251263
# pylint: disable=protected-access
252-
assert (
253-
resampler_now._calculate_window_end() == resampler_none._calculate_window_end()
254-
)
264+
none_result = resampler_none._calculate_window_end()
265+
assert resampler_now._calculate_window_end() == none_result
255266
# pylint: disable=protected-access
256-
assert resampler_none._calculate_window_end() == now + resampling_period
267+
assert none_result[0] == now + resampling_period
257268

258269

259270
async def test_resampling_window_size_is_constant(
@@ -303,6 +314,7 @@ async def test_resampling_window_size_is_constant(
303314
await resampler.resample(one_shot=True)
304315

305316
assert datetime.now(timezone.utc).timestamp() == 2
317+
assert asyncio.get_event_loop().time() == 2
306318
sink_mock.assert_called_once_with(
307319
Sample(
308320
timestamp + timedelta(seconds=resampling_period_s),
@@ -343,7 +355,7 @@ async def test_resampling_window_size_is_constant(
343355
resampling_fun_mock.reset_mock()
344356

345357

346-
async def test_timer_errors_are_logged(
358+
async def test_timer_errors_are_logged( # pylint: disable=too-many-statements
347359
fake_time: time_machine.Coordinates,
348360
source_chan: Broadcast[Sample[Quantity]],
349361
caplog: pytest.LogCaptureFixture,
@@ -384,15 +396,14 @@ async def test_timer_errors_are_logged(
384396
# T = timer tick
385397

386398
# Send a few samples and run a resample tick, advancing the fake time by one period
387-
# Important: this is needed because the resampling timer only starts after the first
388-
# resapmle() is called, so the first resampling will never have a shift.
399+
# No log message should be produced
389400
sample0s = Sample(timestamp, value=Quantity(5.0))
390401
sample1s = Sample(timestamp + timedelta(seconds=1.0), value=Quantity(12.0))
391402
await source_sender.send(sample0s)
392403
await source_sender.send(sample1s)
393404
# Here we need to advance only the wall clock because the resampler timer is not yet
394405
# started, otherwise the loop time will be advanced twice
395-
fake_time.shift(resampling_period_s)
406+
await _advance_time(fake_time, resampling_period_s)
396407
await resampler.resample(one_shot=True)
397408

398409
assert datetime.now(timezone.utc).timestamp() == pytest.approx(2)
@@ -1112,6 +1123,82 @@ async def make_fake_source() -> Source:
11121123
assert isinstance(timeseries_error, TestException)
11131124

11141125

1126+
async def test_timer_is_aligned(
1127+
fake_time: time_machine.Coordinates,
1128+
source_chan: Broadcast[Sample[Quantity]],
1129+
caplog: pytest.LogCaptureFixture,
1130+
) -> None:
1131+
"""Test that big differences between the expected window end and the fired timer are logged."""
1132+
timestamp = datetime.now(timezone.utc)
1133+
1134+
resampling_period_s = 2
1135+
expected_resampled_value = 42.0
1136+
1137+
resampling_fun_mock = MagicMock(
1138+
spec=ResamplingFunction, return_value=expected_resampled_value
1139+
)
1140+
config = ResamplerConfig(
1141+
resampling_period=timedelta(seconds=resampling_period_s),
1142+
max_data_age_in_periods=2.0,
1143+
resampling_function=resampling_fun_mock,
1144+
initial_buffer_len=4,
1145+
)
1146+
1147+
# Advance the time a bit so that the resampler is not aligned to the resampling
1148+
# period
1149+
await _advance_time(fake_time, resampling_period_s / 3)
1150+
1151+
resampler = Resampler(config)
1152+
1153+
source_receiver = source_chan.new_receiver()
1154+
source_sender = source_chan.new_sender()
1155+
1156+
sink_mock = AsyncMock(spec=Sink, return_value=True)
1157+
1158+
resampler.add_timeseries("test", source_receiver, sink_mock)
1159+
source_props = resampler.get_source_properties(source_receiver)
1160+
1161+
# Test timeline
1162+
# alignment
1163+
# ,-------------.
1164+
# start = 0.667
1165+
# t(s) 0 | 1 1.5 2
1166+
# |-------+--|-----|----R-----> (no more samples)
1167+
# value 5.0 12.0
1168+
#
1169+
# R = resampling is done
1170+
1171+
# Send samples and resample
1172+
sample1s = Sample(timestamp + timedelta(seconds=1.0), value=Quantity(5.0))
1173+
sample1_5s = Sample(timestamp + timedelta(seconds=1.5), value=Quantity(12.0))
1174+
await source_sender.send(sample1s)
1175+
await source_sender.send(sample1_5s)
1176+
await _advance_time(fake_time, resampling_period_s * 2 / 3)
1177+
await resampler.resample(one_shot=True)
1178+
1179+
assert datetime.now(timezone.utc).timestamp() == pytest.approx(2)
1180+
assert asyncio.get_running_loop().time() == pytest.approx(2)
1181+
sink_mock.assert_called_once_with(
1182+
Sample(
1183+
timestamp + timedelta(seconds=resampling_period_s),
1184+
Quantity(expected_resampled_value),
1185+
)
1186+
)
1187+
resampling_fun_mock.assert_called_once_with(
1188+
a_sequence(sample1s, sample1_5s),
1189+
config,
1190+
source_props,
1191+
)
1192+
assert not [
1193+
*_filter_logs(
1194+
caplog.record_tuples,
1195+
logger_level=logging.WARNING,
1196+
)
1197+
]
1198+
sink_mock.reset_mock()
1199+
resampling_fun_mock.reset_mock()
1200+
1201+
11151202
def _get_buffer_len(resampler: Resampler, source_recvr: Source) -> int:
11161203
# pylint: disable=protected-access
11171204
blen = resampler._resamplers[source_recvr]._helper._buffer.maxlen

0 commit comments

Comments
 (0)