Skip to content

Commit 11e1aaa

Browse files
authored
Add an option to align resampling windows (#335)
The `ResamplerConfig` now has a new option `align_to` to align the resampling windows to some arbitrary time. This means that the timestamps of the produced samples will be a multiple of the resampling period beginning at the `align_to` time. By default `align_to` is the Unix epoch, and if it is `None` it will have the current behaviour of aligning timestamps to the time when the resampler is started. Fixes #328.
2 parents 837d482 + 4ada9e9 commit 11e1aaa

File tree

6 files changed

+202
-28
lines changed

6 files changed

+202
-28
lines changed

RELEASE_NOTES.md

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
battery_power_receiver = microgrid.battery_pool().power.new_receiver()
1313
```
1414

15-
+ Formulas composition has changed (#327) -
16-
- receivers from formulas are no longer composable.
17-
- formula composition is now done by composing FormulaEngine instances.
18-
- Automatic formulas from the logical meter and *pools, are now
19-
properties, and return `FormulaEngine` instances, which can be
20-
composed further, or can provide a receiver to fetch values.
15+
* Formulas composition has changed (#327)
16+
* Receivers from formulas are no longer composable.
17+
* Formula composition is now done by composing FormulaEngine instances.
18+
* Automatic formulas from the logical meter and \*pools, are now properties, and return `FormulaEngine` instances, which can be composed further, or can provide a receiver to fetch values.
2119

2220
``` python
2321
grid_power_receiver = microgrid.logical_meter().grid_power.new_receiver()
@@ -30,26 +28,47 @@
3028
inverter_power_receiver = self._inverter_power.new_receiver()
3129
```
3230

33-
* Update BatteryStatus to mark battery with unknown capacity as not working (#263)
31+
* Update `BatteryStatus` to mark battery with unknown capacity as not working (#263)
32+
3433
* The channels dependency was updated to v0.14.0 (#292)
34+
3535
* Some properties for `PowerDistributingActor` results were renamed to be more consistent between `Success` and `PartialFailure`:
36+
3637
* The `Success.used_batteries` property was renamed to `succeeded_batteries`.
3738
* The `PartialFailure.success_batteries` property was renamed to `succeeded_batteries`.
3839
* The `succeed_power` property was renamed to `succeeded_power` for both `Success` and `PartialFailure`.
39-
* Update MovingWindow to accept size parameter as timedelta instead of int (#269).
40-
This change allows users to define the time span of the moving window more intuitively, representing the duration over which samples will be stored.
41-
* Add a resampler in the MovingWindow to control the granularity of the samples to be stored in the underlying buffer (#269).
42-
Notice that the parameter `sampling_period` has been renamed to `input_sampling_period`
43-
to better distinguish it from the sampling period parameter in the resampler.
40+
4441
* The serialization feature for the ringbuffer was made more flexible. The `dump` and `load` methods can now work directly with a ringbuffer instance.
45-
* The `ResamplerConfig` now takes the resampling period as a `timedelta`. The configuration was renamed from `resampling_period_s` to `resampling_period` accordingly.
46-
* The `SourceProperties` of the resampler now uses a `timedelta` for the input sampling period. The attribute was renamed from `sampling_period_s` to `sampling_period` accordingly.
42+
43+
* `MovingWindow`
44+
45+
* Accept the `size` parameter as `timedelta` instead of `int` (#269).
46+
47+
This change allows users to define the time span of the moving window more intuitively, representing the duration over which samples will be stored.
48+
49+
* The input data will be resampled if a `resampler_config` is passed (#269).
50+
51+
This allows controlling the granularity of the samples to be stored in the underlying buffer.
52+
53+
Note that the parameter `sampling_period` has been renamed to `input_sampling_period` to better distinguish it from the sampling period parameter in the `resampler_config`.
54+
55+
* `Resampler`
56+
57+
* The `ResamplerConfig` now takes the resampling period as a `timedelta`. The configuration was renamed from `resampling_period_s` to `resampling_period` accordingly.
58+
59+
* The `SourceProperties` of the resampler now uses a `timedelta` for the input sampling period. The attribute was renamed from `sampling_period_s` to `sampling_period` accordingly.
60+
61+
* The periods are now aligned to the `UNIX_EPOCH` by default.
62+
63+
To use the old behaviour (aligning to the time the resampler was created), pass `align_to=None` to the `ResamplerConfig`.
4764

4865
## New Features
4966

50-
* Automatic creation of core data-pipeline actors, to eliminate a lot
51-
of boiler plate code. This makes it much simpler to deploy apps
52-
(#270). For example:
67+
* The core data-pipeline actors are now created automatically (#270).
68+
69+
This eliminates a lot of boiler plate code and makes it much simpler to deploy apps.
70+
71+
For example:
5372

5473
``` python
5574
async def run():
@@ -59,9 +78,14 @@
5978
grid_power = microgrid.logical_meter().grid_power()
6079
```
6180

62-
* The `Result` class (and subclasses) for the `PowerDistributingActor` are now dataclasses, so logging them will produce a more detailed output.
81+
* The `Result` class (and subclasses) for the `PowerDistributingActor` are now `dataclass`es, so logging them will produce a more detailed output.
82+
83+
* The `Resampler` can now can align the resampling period to an arbitrary `datetime`.
84+
85+
This can be configured via the new `align_to` option in the `ResamplerConfig`. By default the resampling period is aligned to the `UNIX_EPOCH`.
6386

6487
## Bug Fixes
6588

66-
* Change PowerDistributor to use all batteries if none is working (#258)
67-
* Update the ordered ring buffer to fix the len() function so that it returns a value equal to or greater than zero, as expected (#274)
89+
* Change `PowerDistributor` to use all batteries when none are working (#258)
90+
91+
* Update the ordered ring buffer to fix the `len()` function so that it returns a value equal to or greater than zero, as expected (#274)

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,35 @@
66
77
A timeseries is a stream (normally an async iterator) of
88
[`Sample`][frequenz.sdk.timeseries.Sample]s.
9+
10+
# Periodicity and alignment
11+
12+
All the data produced by this package is always periodic and aligned to the
13+
`UNIX_EPOCH` (by default).
14+
15+
Classes normally take a (re)sampling period as and argument and, optionally, an
16+
`align_to` argument.
17+
18+
This means timestamps are always separated exaclty by a period, and that this
19+
timestamp falls always at multiples of the period, starting at the `align_to`.
20+
21+
This ensures that the data is predictable and consistent among restarts.
22+
23+
Example:
24+
If we have a period of 10 seconds, and are aligning to the UNIX
25+
epoch. Assuming the following timeline starts in 1970-01-01 00:00:00
26+
UTC and our current `now` is 1970-01-01 00:00:32 UTC, then the next
27+
timestamp will be at 1970-01-01 00:00:40 UTC:
28+
29+
```
30+
align_to = 1970-01-01 00:00:00 next event = 1970-01-01 00:00:40
31+
| |
32+
|---------|---------|---------|-|-------|---------|---------|---------|
33+
0 10 20 30 | 40 50 60 70
34+
now = 1970-01-01 00:00:32
35+
```
936
"""
1037

11-
from ._base_types import Sample, Sample3Phase
38+
from ._base_types import UNIX_EPOCH, Sample, Sample3Phase
1239

13-
__all__ = ["Sample", "Sample3Phase"]
40+
__all__ = ["Sample", "Sample3Phase", "UNIX_EPOCH"]

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
import functools
99
from dataclasses import dataclass, field
10-
from datetime import datetime
10+
from datetime import datetime, timezone
1111
from typing import Callable, Iterator, Optional, overload
1212

13+
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
14+
"""The UNIX epoch (in UTC)."""
15+
1316

1417
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
1518
# wanted. We are using this order now because usually we need to do binary

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from .._internal.asyncio import cancel_and_await
1919
from . import Sample
20+
from ._base_types import UNIX_EPOCH
2021

2122
_logger = logging.getLogger(__name__)
2223

@@ -191,6 +192,17 @@ class ResamplerConfig:
191192
It must be at bigger than `warn_buffer_len`.
192193
"""
193194

195+
align_to: datetime | None = UNIX_EPOCH
196+
"""The time to align the resampling period to.
197+
198+
The resampling period will be aligned to this time, so the first resampled
199+
sample will be at the first multiple of `resampling_period` starting from
200+
`align_to`. It must be an aware datetime and can be in the future too.
201+
202+
If `align_to` is `None`, the resampling period will be aligned to the
203+
time the resampler is created.
204+
"""
205+
194206
def __post_init__(self) -> None:
195207
"""Check that config values are valid.
196208
@@ -231,6 +243,10 @@ def __post_init__(self) -> None:
231243
self.initial_buffer_len,
232244
self.warn_buffer_len,
233245
)
246+
if self.align_to is not None and self.align_to.tzinfo is None:
247+
raise ValueError(
248+
f"align_to ({self.align_to}) should be a timezone aware datetime"
249+
)
234250

235251

236252
class SourceStoppedError(RuntimeError):
@@ -348,16 +364,17 @@ def __init__(self, config: ResamplerConfig) -> None:
348364
self._resamplers: dict[Source, _StreamingHelper] = {}
349365
"""A mapping between sources and the streaming helper handling that source."""
350366

351-
self._window_end: datetime = (
352-
datetime.now(timezone.utc) + self._config.resampling_period
353-
)
367+
self._window_end: datetime = self._calculate_window_end()
354368
"""The time in which the current window ends.
355369
356370
This is used to make sure every resampling window is generated at
357371
precise times. We can't rely on the timer timestamp because timers will
358372
never fire at the exact requested time, so if we don't use a precise
359373
time for the end of the window, the resampling windows we produce will
360374
have different sizes.
375+
376+
The window end will also be aligned to the `config.align_to` time, so
377+
the window end is deterministic.
361378
"""
362379

363380
@property
@@ -491,6 +508,29 @@ async def _wait_for_next_resampling_period(self) -> None:
491508
self._config.resampling_period,
492509
)
493510

511+
def _calculate_window_end(self) -> datetime:
512+
"""Calculate the end of the current resampling window.
513+
514+
The calculated resampling window end is a multiple of
515+
`self._config.resampling_period` starting at `self._config.align_to`.
516+
517+
if `self._config.align_to` is `None`, the current time is used.
518+
519+
Returns:
520+
The end of the current resampling window aligned to
521+
`self._config.align_to`.
522+
"""
523+
now = datetime.now(timezone.utc)
524+
period = self._config.resampling_period
525+
align_to = self._config.align_to
526+
527+
if align_to is None:
528+
return now + period
529+
530+
elapsed = (now - align_to) % period
531+
532+
return now + period - elapsed
533+
494534

495535
class _ResamplingHelper:
496536
"""Keeps track of *relevant* samples to pass them to the resampling function.

tests/timeseries/mock_microgrid.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,14 @@ async def start(self, mocker: MockerFixture) -> None:
119119

120120
# pylint: disable=protected-access
121121
_data_pipeline._DATA_PIPELINE = _data_pipeline._DataPipeline(
122-
ResamplerConfig(resampling_period=timedelta(seconds=self._sample_rate_s))
122+
ResamplerConfig(
123+
resampling_period=timedelta(seconds=self._sample_rate_s),
124+
# Align to the time the resampler is created to avoid flakiness
125+
# in the tests, it seems test using the mock microgrid assume
126+
# that the resampling window is aligned to the start of the
127+
# test.
128+
align_to=None,
129+
)
123130
)
124131
# pylint: enable=protected-access
125132

tests/timeseries/test_resampling.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434

3535
from ..utils import a_sequence
3636

37-
# pylint: disable=too-many-locals,redefined-outer-name
37+
# We relax some pylint checks as for tests they don't make a lot of sense.
38+
# pylint: disable=too-many-lines,disable=too-many-locals,redefined-outer-name
3839

3940

4041
# Setting 'autouse' has no effect as this method replaces the event loop for all tests in the file.
@@ -168,6 +169,78 @@ async def test_helper_buffer_too_big(
168169
assert helper._buffer.maxlen == DEFAULT_BUFFER_LEN_MAX
169170

170171

172+
@pytest.mark.parametrize(
173+
"resampling_period_s,now,align_to,result",
174+
(
175+
(
176+
1.0,
177+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
178+
datetime(2020, 1, 1, tzinfo=timezone.utc),
179+
datetime(2020, 1, 1, 2, 3, 6, tzinfo=timezone.utc),
180+
),
181+
(
182+
3.0,
183+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
184+
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
185+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
186+
),
187+
(
188+
10.0,
189+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
190+
datetime(2020, 1, 1, 0, 0, 5, tzinfo=timezone.utc),
191+
datetime(2020, 1, 1, 2, 3, 15, tzinfo=timezone.utc),
192+
),
193+
# Future align_to
194+
(
195+
10.0,
196+
datetime(2020, 1, 1, 2, 3, 5, 300000, tzinfo=timezone.utc),
197+
datetime(2020, 1, 1, 2, 3, 18, tzinfo=timezone.utc),
198+
datetime(2020, 1, 1, 2, 3, 8, tzinfo=timezone.utc),
199+
),
200+
),
201+
)
202+
def test_calculate_window_end_trivial_cases(
203+
fake_time: time_machine.Coordinates,
204+
resampling_period_s: float,
205+
now: datetime,
206+
align_to: datetime,
207+
result: datetime,
208+
) -> None:
209+
"""Test the calculation of the resampling window end for simple cases."""
210+
resampling_period = timedelta(seconds=resampling_period_s)
211+
resampler = Resampler(
212+
ResamplerConfig(
213+
resampling_period=resampling_period,
214+
align_to=align_to,
215+
)
216+
)
217+
fake_time.move_to(now)
218+
# pylint: disable=protected-access
219+
assert resampler._calculate_window_end() == result
220+
221+
# Repeat the test with align_to=None, so the result should be align to now
222+
# instead
223+
resampler_now = Resampler(
224+
ResamplerConfig(
225+
resampling_period=resampling_period,
226+
align_to=now,
227+
)
228+
)
229+
resampler_none = Resampler(
230+
ResamplerConfig(
231+
resampling_period=resampling_period,
232+
align_to=None,
233+
)
234+
)
235+
fake_time.move_to(now)
236+
# pylint: disable=protected-access
237+
assert (
238+
resampler_now._calculate_window_end() == resampler_none._calculate_window_end()
239+
)
240+
# pylint: disable=protected-access
241+
assert resampler_none._calculate_window_end() == now + resampling_period
242+
243+
171244
async def test_resampling_window_size_is_constant(
172245
fake_time: time_machine.Coordinates, source_chan: Broadcast[Sample]
173246
) -> None:

0 commit comments

Comments
 (0)