Skip to content

Commit a3feb15

Browse files
Update MovingWindow to add resampler (#272)
2 parents fcaff7d + 5d37033 commit a3feb15

File tree

10 files changed

+250
-98
lines changed

10 files changed

+250
-98
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
* The `Success.used_batteries` property was renamed to `succeeded_batteries`.
1313
* The `PartialFailure.success_batteries` property was renamed to `succeeded_batteries`.
1414
* The `succeed_power` property was renamed to `succeeded_power` for both `Success` and `PartialFailure`.
15+
* Update MovingWindow to accept size parameter as timedelta instead of int (#269).
16+
This change allows users to define the time span of the moving window more intuitively, representing the duration over which samples will be stored.
17+
* Add a resampler in the MovingWindow to control the granularity of the samples to be stored in the underlying buffer (#269).
18+
Notice that the parameter `sampling_period` has been renamed to `input_sampling_period`
19+
to better distinguish it from the sampling period parameter in the resampler.
1520

1621
## New Features
1722

@@ -26,6 +31,7 @@
2631
)
2732
grid_power = microgrid.logical_meter().grid_power()
2833
```
34+
2935
* The `Result` class (and subclasses) for the `PowerDistributingActor` are now dataclasses, so logging them will produce a more detailed output.
3036

3137
## Bug Fixes

benchmarks/timeseries/benchmark_ringbuffer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import random
77
import timeit
8-
from datetime import datetime, timedelta
8+
from datetime import datetime, timedelta, timezone
99
from typing import Any, Dict, TypeVar
1010

1111
import numpy as np
@@ -23,7 +23,7 @@
2323
def fill_buffer(days: int, buffer: OrderedRingBuffer[Any]) -> None:
2424
"""Fill the given buffer up to the given amount of days, one sample per minute."""
2525
random.seed(0)
26-
basetime = datetime(2022, 1, 1)
26+
basetime = datetime(2022, 1, 1, tzinfo=timezone.utc)
2727
print("..filling", end="", flush=True)
2828

2929
for day in range(days):
@@ -36,7 +36,7 @@ def fill_buffer(days: int, buffer: OrderedRingBuffer[Any]) -> None:
3636

3737
def test_days(days: int, buffer: OrderedRingBuffer[Any]) -> None:
3838
"""Gets the data for each of the 29 days."""
39-
basetime = datetime(2022, 1, 1)
39+
basetime = datetime(2022, 1, 1, tzinfo=timezone.utc)
4040

4141
for day in range(days):
4242
# pylint: disable=unused-variable
@@ -51,7 +51,7 @@ def test_slices(days: int, buffer: OrderedRingBuffer[Any], median: bool) -> None
5151
Takes a buffer, fills it up and then excessively gets
5252
the data for each day to calculate the average/median.
5353
"""
54-
basetime = datetime(2022, 1, 1)
54+
basetime = datetime(2022, 1, 1, tzinfo=timezone.utc)
5555

5656
total = 0.0
5757

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 98 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@
77

88
import asyncio
99
import logging
10+
import math
1011
from collections.abc import Sequence
11-
from datetime import datetime, timedelta
12+
from datetime import datetime, timedelta, timezone
1213
from typing import SupportsIndex, overload
1314

1415
import numpy as np
15-
from frequenz.channels import Receiver
16+
from frequenz.channels import Broadcast, Receiver, Sender
1617
from numpy.typing import ArrayLike
1718

1819
from .._internal.asyncio import cancel_and_await
1920
from . import Sample
21+
from ._resampling import Resampler, ResamplerConfig
2022
from ._ringbuffer import OrderedRingBuffer
2123

2224
log = logging.getLogger(__name__)
@@ -27,7 +29,7 @@ class MovingWindow:
2729
A data window that moves with the latest datapoints of a data stream.
2830
2931
After initialization the `MovingWindow` can be accessed by an integer
30-
index or a timestamp. A sub window can be accessed by using a slice of integers
32+
index or a timestamp. A sub window can be accessed by using a slice of
3133
integers or timestamps.
3234
3335
Note that a numpy ndarray is returned and thus users can use
@@ -39,19 +41,31 @@ class MovingWindow:
3941
the point in time that defines the alignment can be outside of the time window.
4042
Modulo arithmetic is used to move the `window_alignment` timestamp into the
4143
latest window.
42-
If for example the `window_alignment` parameter is set to `datetime(1, 1, 1)`
43-
and the window size is bigger than one day then the first element will always
44-
be aligned to the midnight. For further information see also the
44+
If for example the `window_alignment` parameter is set to
45+
`datetime(1, 1, 1, tzinfo=timezone.utc)` and the window size is bigger than
46+
one day then the first element will always be aligned to the midnight.
47+
For further information see also the
4548
[`OrderedRingBuffer`][frequenz.sdk.timeseries._ringbuffer.OrderedRingBuffer]
4649
documentation.
4750
51+
Resampling might be required to reduce the number of samples to store, and
52+
it can be set by specifying the resampler config parameter so that the user
53+
can control the granularity of the samples to be stored in the underlying
54+
buffer.
55+
56+
If resampling is not required, the resampler config parameter can be
57+
set to None in which case the MovingWindow will not perform any resampling.
4858
4959
**Example1** (calculating the mean of a time interval):
5060
5161
```
52-
window = MovingWindow(size=100, resampled_data_recv=resampled_data_recv)
62+
window = MovingWindow(
63+
size=timedelta(minutes=5),
64+
resampled_data_recv=resampled_data_recv,
65+
input_sampling_period=timedelta(seconds=1),
66+
)
5367
54-
time_start = datetime.now()
68+
time_start = datetime.now(tz=timezone.utc)
5569
time_end = time_start + timedelta(minutes=5)
5670
5771
# ... wait for 5 minutes until the buffer is filled
@@ -70,24 +84,29 @@ class MovingWindow:
7084
7185
# create a window that stores two days of data
7286
# starting at 1.1.23 with samplerate=1
73-
window = MovingWindow(size = (60 * 60 * 24 * 2), sample_receiver)
87+
window = MovingWindow(
88+
size=timedelta(days=2),
89+
resampled_data_recv=sample_receiver,
90+
input_sampling_period=timedelta(seconds=1),
91+
)
7492
7593
# wait for one full day until the buffer is filled
7694
asyncio.sleep(60*60*24)
7795
7896
# create a polars series with one full day of data
79-
time_start = datetime(2023, 1, 1)
80-
time_end = datetime(2023, 1, 2)
97+
time_start = datetime(2023, 1, 1, tzinfo=timezone.utc)
98+
time_end = datetime(2023, 1, 2, tzinfo=timezone.utc)
8199
s = pl.Series("Jan_1", mv[time_start:time_end])
82100
```
83101
"""
84102

85-
def __init__(
103+
def __init__( # pylint: disable=too-many-arguments
86104
self,
87-
size: int,
105+
size: timedelta,
88106
resampled_data_recv: Receiver[Sample],
89-
sampling_period: timedelta,
90-
window_alignment: datetime = datetime(1, 1, 1),
107+
input_sampling_period: timedelta,
108+
resampler_config: ResamplerConfig | None = None,
109+
window_alignment: datetime = datetime(1, 1, 1, tzinfo=timezone.utc),
91110
) -> None:
92111
"""
93112
Initialize the MovingWindow.
@@ -97,45 +116,97 @@ def __init__(
97116
The task stops running only if the channel receiver is closed.
98117
99118
Args:
100-
size: The number of elements that are stored.
119+
size: The time span of the moving window over which samples will be stored.
101120
resampled_data_recv: A receiver that delivers samples with a
102121
given sampling period.
103-
sampling_period: The sampling period.
122+
input_sampling_period: The time interval between consecutive input samples.
123+
resampler_config: The resampler configuration in case resampling is required.
104124
window_alignment: A datetime object that defines a point in time to which
105125
the window is aligned to modulo window size.
106-
(default is midnight 01.01.01)
126+
(default is 0001-01-01T00:00:00+00:00)
107127
For further information, consult the class level documentation.
108128
109129
Raises:
110130
asyncio.CancelledError: when the task gets cancelled.
111131
"""
132+
assert (
133+
input_sampling_period.total_seconds() > 0
134+
), "The input sampling period should be greater than zero."
135+
assert (
136+
input_sampling_period <= size
137+
), "The input sampling period should be equal to or lower than the window size."
138+
139+
sampling = input_sampling_period
140+
self._resampler: Resampler | None = None
141+
self._resampler_sender: Sender[Sample] | None = None
142+
self._resampler_task: asyncio.Task[None] | None = None
143+
144+
if resampler_config:
145+
resampling_period = timedelta(seconds=resampler_config.resampling_period_s)
146+
assert (
147+
resampling_period <= size
148+
), "The resampling period should be equal to or lower than the window size."
149+
150+
self._resampler = Resampler(resampler_config)
151+
sampling = resampling_period
152+
153+
# Sampling period might not fit perfectly into the window size.
154+
num_samples = math.ceil(size.total_seconds() / sampling.total_seconds())
155+
112156
self._resampled_data_recv = resampled_data_recv
113157
self._buffer = OrderedRingBuffer(
114-
np.empty(shape=size, dtype=float),
115-
sampling_period=sampling_period,
158+
np.empty(shape=num_samples, dtype=float),
159+
sampling_period=sampling,
116160
time_index_alignment=window_alignment,
117161
)
118-
self._copy_buffer = False
162+
163+
if self._resampler:
164+
self._configure_resampler()
165+
119166
self._update_window_task: asyncio.Task[None] = asyncio.create_task(
120167
self._run_impl()
121168
)
122-
log.debug("Cancelling MovingWindow task: %s", __name__)
123169

124170
async def _run_impl(self) -> None:
125-
"""Awaits samples from the receiver and updates the underlying ringbuffer."""
171+
"""Awaits samples from the receiver and updates the underlying ringbuffer.
172+
173+
Raises:
174+
asyncio.CancelledError: if the MovingWindow task is cancelled.
175+
"""
126176
try:
127177
async for sample in self._resampled_data_recv:
128178
log.debug("Received new sample: %s", sample)
129-
self._buffer.update(sample)
179+
if self._resampler and self._resampler_sender:
180+
await self._resampler_sender.send(sample)
181+
else:
182+
self._buffer.update(sample)
183+
130184
except asyncio.CancelledError:
131185
log.info("MovingWindow task has been cancelled.")
132-
return
186+
raise
133187

134188
log.error("Channel has been closed")
135189

136190
async def stop(self) -> None:
137-
"""Cancel the running task and stop the MovingWindow."""
191+
"""Cancel the running tasks and stop the MovingWindow."""
138192
await cancel_and_await(self._update_window_task)
193+
if self._resampler_task:
194+
await cancel_and_await(self._resampler_task)
195+
196+
def _configure_resampler(self) -> None:
197+
"""Configure the components needed to run the resampler."""
198+
assert self._resampler is not None
199+
200+
async def sink_buffer(sample: Sample) -> None:
201+
if sample.value is not None:
202+
self._buffer.update(sample)
203+
204+
resampler_channel = Broadcast[Sample]("average")
205+
self._resampler_sender = resampler_channel.new_sender()
206+
self._resampler.add_timeseries(
207+
"avg", resampler_channel.new_receiver(), sink_buffer
208+
)
209+
self._resampler_task = asyncio.create_task(self._resampler.resample())
139210

140211
def __len__(self) -> int:
141212
"""
@@ -198,7 +269,7 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik
198269
# we are doing runtime typechecks since there is no abstract slice type yet
199270
# see also (https://peps.python.org/pep-0696)
200271
if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
201-
return self._buffer.window(key.start, key.stop, self._copy_buffer)
272+
return self._buffer.window(key.start, key.stop)
202273
if isinstance(key.start, int) and isinstance(key.stop, int):
203274
return self._buffer[key]
204275
elif isinstance(key, datetime):

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,12 @@ class ResamplerConfig:
146146
Example:
147147
If `resampling_period_s` is 3, the input sampling period is
148148
1 and `max_data_age_in_periods` is 2, then data older than 3*2
149-
= 6 secods will be discarded when creating a new sample and never
149+
= 6 seconds will be discarded when creating a new sample and never
150150
passed to the resampling function.
151151
152152
If `resampling_period_s` is 3, the input sampling period is
153153
5 and `max_data_age_in_periods` is 2, then data older than 5*2
154-
= 10 secods will be discarded when creating a new sample and never
154+
= 10 seconds will be discarded when creating a new sample and never
155155
passed to the resampling function.
156156
"""
157157

@@ -177,7 +177,7 @@ class ResamplerConfig:
177177
"""The minimum length of the resampling buffer that will emit a warning.
178178
179179
If a buffer grows bigger than this value, it will emit a warning in the
180-
logs, so buffers don't grow too big inadvertly.
180+
logs, so buffers don't grow too big inadvertently.
181181
182182
It must be at least 1 and at most `max_buffer_len`.
183183
"""
@@ -242,12 +242,11 @@ def __init__(self, source: Source) -> None:
242242
"""Create an instance.
243243
244244
Args:
245-
source: The source of the timeseries that stopped producting
246-
samples.
245+
source: The source of the timeseries that stopped producing samples.
247246
"""
248247
super().__init__(f"Timeseries stopped producing samples, source: {source}")
249248
self.source = source
250-
"""The source of the timeseries that stopped producting samples."""
249+
"""The source of the timeseries that stopped producing samples."""
251250

252251
def __repr__(self) -> str:
253252
"""Return the representation of the instance.
@@ -672,7 +671,7 @@ def resample(self, timestamp: datetime) -> Sample:
672671
min_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None))
673672
max_index = bisect(self._buffer, Sample(timestamp, None))
674673
# Using itertools for slicing doesn't look very efficient, but
675-
# experiements with a custom (ring) buffer that can slice showed that
674+
# experiments with a custom (ring) buffer that can slice showed that
676675
# it is not that bad. See:
677676
# https://github.com/frequenz-floss/frequenz-sdk-python/pull/130
678677
# So if we need more performance beyond this point, we probably need to

src/frequenz/sdk/timeseries/_ringbuffer.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from collections.abc import Iterable
99
from copy import deepcopy
1010
from dataclasses import dataclass
11-
from datetime import datetime, timedelta
11+
from datetime import datetime, timedelta, timezone
1212
from typing import Generic, List, SupportsFloat, SupportsIndex, TypeVar, overload
1313

1414
import numpy as np
@@ -46,11 +46,14 @@ def contains(self, timestamp: datetime) -> bool:
4646
class OrderedRingBuffer(Generic[FloatArray]):
4747
"""Time aware ringbuffer that keeps its entries sorted by time."""
4848

49+
_DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
50+
_DATETIME_MAX = datetime.max.replace(tzinfo=timezone.utc)
51+
4952
def __init__(
5053
self,
5154
buffer: FloatArray,
5255
sampling_period: timedelta,
53-
time_index_alignment: datetime = datetime(1, 1, 1),
56+
time_index_alignment: datetime = datetime(1, 1, 1, tzinfo=timezone.utc),
5457
) -> None:
5558
"""Initialize the time aware ringbuffer.
5659
@@ -76,8 +79,8 @@ def __init__(
7679
self._time_index_alignment: datetime = time_index_alignment
7780

7881
self._gaps: list[Gap] = []
79-
self._datetime_newest: datetime = datetime.min
80-
self._datetime_oldest: datetime = datetime.max
82+
self._datetime_newest: datetime = self._DATETIME_MIN
83+
self._datetime_oldest: datetime = self._DATETIME_MAX
8184
self._time_range: timedelta = (len(self._buffer) - 1) * sampling_period
8285

8386
@property
@@ -130,7 +133,10 @@ def update(self, sample: Sample) -> None:
130133
timestamp = self._normalize_timestamp(sample.timestamp)
131134

132135
# Don't add outdated entries
133-
if timestamp < self._datetime_oldest and self._datetime_oldest != datetime.max:
136+
if (
137+
timestamp < self._datetime_oldest
138+
and self._datetime_oldest != self._DATETIME_MAX
139+
):
134140
raise IndexError(
135141
f"Timestamp {timestamp} too old (cut-off is at {self._datetime_oldest})."
136142
)
@@ -485,7 +491,7 @@ def __len__(self) -> int:
485491
Returns:
486492
The length.
487493
"""
488-
if self._datetime_newest == datetime.min:
494+
if self._datetime_newest == self._DATETIME_MIN:
489495
return 0
490496

491497
start_index = self.datetime_to_index(self._datetime_oldest)

0 commit comments

Comments
 (0)