Skip to content

Commit 4ada9e9

Browse files
committed
Add an option to align resampling windows
The `ResamplerConfig` now has a new option `align_to` to align the resampling windows to some arbitrary time. This means that the timestamps of the produced samples will be a multiple of the resampling period beginning at the `align_to` time. By default `align_to` is the Unix epoch, and if it is `None` it will have the current behaviour of aligning timestamps to the time when the resampler is started. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent c89d357 commit 4ada9e9

File tree

6 files changed

+166
-8
lines changed

6 files changed

+166
-8
lines changed

RELEASE_NOTES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858

5959
* The `SourceProperties` of the resampler now uses a `timedelta` for the input sampling period. The attribute was renamed from `sampling_period_s` to `sampling_period` accordingly.
6060

61+
* The periods are now aligned to the `UNIX_EPOCH` by default.
62+
63+
To use the old behaviour (aligning to the time the resampler was created), pass `align_to=None` to the `ResamplerConfig`.
64+
6165
## New Features
6266

6367
* The core data-pipeline actors are now created automatically (#270).
@@ -76,6 +80,10 @@
7680

7781
* The `Result` class (and subclasses) for the `PowerDistributingActor` are now `dataclass`es, so logging them will produce a more detailed output.
7882

83+
* The `Resampler` can now can align the resampling period to an arbitrary `datetime`.
84+
85+
This can be configured via the new `align_to` option in the `ResamplerConfig`. By default the resampling period is aligned to the `UNIX_EPOCH`.
86+
7987
## Bug Fixes
8088

8189
* Change `PowerDistributor` to use all batteries when none are working (#258)

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,35 @@
66
77
A timeseries is a stream (normally an async iterator) of
88
[`Sample`][frequenz.sdk.timeseries.Sample]s.
9+
10+
# Periodicity and alignment
11+
12+
All the data produced by this package is always periodic and aligned to the
13+
`UNIX_EPOCH` (by default).
14+
15+
Classes normally take a (re)sampling period as and argument and, optionally, an
16+
`align_to` argument.
17+
18+
This means timestamps are always separated exaclty by a period, and that this
19+
timestamp falls always at multiples of the period, starting at the `align_to`.
20+
21+
This ensures that the data is predictable and consistent among restarts.
22+
23+
Example:
24+
If we have a period of 10 seconds, and are aligning to the UNIX
25+
epoch. Assuming the following timeline starts in 1970-01-01 00:00:00
26+
UTC and our current `now` is 1970-01-01 00:00:32 UTC, then the next
27+
timestamp will be at 1970-01-01 00:00:40 UTC:
28+
29+
```
30+
align_to = 1970-01-01 00:00:00 next event = 1970-01-01 00:00:40
31+
| |
32+
|---------|---------|---------|-|-------|---------|---------|---------|
33+
0 10 20 30 | 40 50 60 70
34+
now = 1970-01-01 00:00:32
35+
```
936
"""
1037

11-
from ._base_types import Sample, Sample3Phase
38+
from ._base_types import UNIX_EPOCH, Sample, Sample3Phase
1239

13-
__all__ = ["Sample", "Sample3Phase"]
40+
__all__ = ["Sample", "Sample3Phase", "UNIX_EPOCH"]

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
import functools
99
from dataclasses import dataclass, field
10-
from datetime import datetime
10+
from datetime import datetime, timezone
1111
from typing import Callable, Iterator, Optional, overload
1212

13+
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
14+
"""The UNIX epoch (in UTC)."""
15+
1316

1417
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
1518
# wanted. We are using this order now because usually we need to do binary

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from .._internal.asyncio import cancel_and_await
1919
from . import Sample
20+
from ._base_types import UNIX_EPOCH
2021

2122
_logger = logging.getLogger(__name__)
2223

@@ -191,6 +192,17 @@ class ResamplerConfig:
191192
It must be at bigger than `warn_buffer_len`.
192193
"""
193194

195+
align_to: datetime | None = UNIX_EPOCH
196+
"""The time to align the resampling period to.
197+
198+
The resampling period will be aligned to this time, so the first resampled
199+
sample will be at the first multiple of `resampling_period` starting from
200+
`align_to`. It must be an aware datetime and can be in the future too.
201+
202+
If `align_to` is `None`, the resampling period will be aligned to the
203+
time the resampler is created.
204+
"""
205+
194206
def __post_init__(self) -> None:
195207
"""Check that config values are valid.
196208
@@ -231,6 +243,10 @@ def __post_init__(self) -> None:
231243
self.initial_buffer_len,
232244
self.warn_buffer_len,
233245
)
246+
if self.align_to is not None and self.align_to.tzinfo is None:
247+
raise ValueError(
248+
f"align_to ({self.align_to}) should be a timezone aware datetime"
249+
)
234250

235251

236252
class SourceStoppedError(RuntimeError):
@@ -348,16 +364,17 @@ def __init__(self, config: ResamplerConfig) -> None:
348364
self._resamplers: dict[Source, _StreamingHelper] = {}
349365
"""A mapping between sources and the streaming helper handling that source."""
350366

351-
self._window_end: datetime = (
352-
datetime.now(timezone.utc) + self._config.resampling_period
353-
)
367+
self._window_end: datetime = self._calculate_window_end()
354368
"""The time in which the current window ends.
355369
356370
This is used to make sure every resampling window is generated at
357371
precise times. We can't rely on the timer timestamp because timers will
358372
never fire at the exact requested time, so if we don't use a precise
359373
time for the end of the window, the resampling windows we produce will
360374
have different sizes.
375+
376+
The window end will also be aligned to the `config.align_to` time, so
377+
the window end is deterministic.
361378
"""
362379

363380
@property
@@ -491,6 +508,29 @@ async def _wait_for_next_resampling_period(self) -> None:
491508
self._config.resampling_period,
492509
)
493510

511+
def _calculate_window_end(self) -> datetime:
512+
"""Calculate the end of the current resampling window.
513+
514+
The calculated resampling window end is a multiple of
515+
`self._config.resampling_period` starting at `self._config.align_to`.
516+
517+
if `self._config.align_to` is `None`, the current time is used.
518+
519+
Returns:
520+
The end of the current resampling window aligned to
521+
`self._config.align_to`.
522+
"""
523+
now = datetime.now(timezone.utc)
524+
period = self._config.resampling_period
525+
align_to = self._config.align_to
526+
527+
if align_to is None:
528+
return now + period
529+
530+
elapsed = (now - align_to) % period
531+
532+
return now + period - elapsed
533+
494534

495535
class _ResamplingHelper:
496536
"""Keeps track of *relevant* samples to pass them to the resampling function.

tests/timeseries/mock_microgrid.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,14 @@ async def start(self, mocker: MockerFixture) -> None:
119119

120120
# pylint: disable=protected-access
121121
_data_pipeline._DATA_PIPELINE = _data_pipeline._DataPipeline(
122-
ResamplerConfig(resampling_period=timedelta(seconds=self._sample_rate_s))
122+
ResamplerConfig(
123+
resampling_period=timedelta(seconds=self._sample_rate_s),
124+
# Align to the time the resampler is created to avoid flakiness
125+
# in the tests, it seems test using the mock microgrid assume
126+
# that the resampling window is aligned to the start of the
127+
# test.
128+
align_to=None,
129+
)
123130
)
124131
# pylint: enable=protected-access
125132

tests/timeseries/test_resampling.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434

3535
from ..utils import a_sequence
3636

37-
# pylint: disable=too-many-locals,redefined-outer-name
37+
# We relax some pylint checks as for tests they don't make a lot of sense.
38+
# pylint: disable=too-many-lines,disable=too-many-locals,redefined-outer-name
3839

3940

4041
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
@@ -168,6 +169,78 @@ async def test_helper_buffer_too_big(
168169
assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX
169170

170171

172+
@pytest.mark.parametrize(
173+
"resampling_period_s,now,align_to,result",
174+
(
175+
(
176+
1.0,
177+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
178+
datetime(2020, 1, 1, tzinfo=timezone.utc),
179+
datetime(2020, 1, 1, 2, 3, 6, tzinfo=timezone.utc),
180+
),
181+
(
182+
3.0,
183+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
184+
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
185+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
186+
),
187+
(
188+
10.0,
189+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
190+
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
191+
datetime(2020, 1, 1, 2, 3, 15, tzinfo=timezone.utc),
192+
),
193+
# Future align_to
194+
(
195+
10.0,
196+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
197+
datetime(2020, 1, 1, 2, 3, 18, tzinfo=timezone.utc),
198+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
199+
),
200+
),
201+
)
202+
def test_calculate_window_end_trivial_cases(
203+
fake_time: time_machine.Coordinates,
204+
resampling_period_s: float,
205+
now: datetime,
206+
align_to: datetime,
207+
result: datetime,
208+
) -> None:
209+
"""Test the calculation of the resampling window end for simple cases."""
210+
resampling_period = timedelta(seconds=resampling_period_s)
211+
resampler = Resampler(
212+
ResamplerConfig(
213+
resampling_period=resampling_period,
214+
align_to=align_to,
215+
)
216+
)
217+
fake_time.move_to(now)
218+
# pylint: disable=protected-access
219+
assert resampler._calculate_window_end() == result
220+
221+
# Repeat the test with align_to=None, so the result should be align to now
222+
# instead
223+
resampler_now = Resampler(
224+
ResamplerConfig(
225+
resampling_period=resampling_period,
226+
align_to=now,
227+
)
228+
)
229+
resampler_none = Resampler(
230+
ResamplerConfig(
231+
resampling_period=resampling_period,
232+
align_to=None,
233+
)
234+
)
235+
fake_time.move_to(now)
236+
# pylint: disable=protected-access
237+
assert (
238+
resampler_now._calculate_window_end() == resampler_none._calculate_window_end()
239+
)
240+
# pylint: disable=protected-access
241+
assert resampler_none._calculate_window_end() == now + resampling_period
242+
243+
171244
async def test_resampling_window_size_is_constant(
172245
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
173246
) -> None:

0 commit comments

Comments
 (0)