Skip to content

Commit 5f11abb

Browse files
Add PeriodicFeatureExtractor (frequenz-floss#299)
A class for creating a profile from periodically occurring windows in a buffer of historical data that can be used for creating features for ml models. closes frequenz-floss#120
2 parents ef3906f + 17642aa commit 5f11abb

File tree

9 files changed

+883
-28
lines changed

9 files changed

+883
-28
lines changed

RELEASE_NOTES.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ This release drops support for Python versions older than 3.11.
2828

2929
## New Features
3030

31-
<!-- Here goes the main new features and examples or instructions on how to use them -->
31+
* The `MovingWindow` as well as the OrderedRingBuffer are having new public methods that are returning the oldest and newest timestamp of all stored samples.
32+
33+
* The `PeriodicFeatureExtractor` has been added.
34+
35+
This is a tool to create certain profiles out of periodic reoccurring windows inside a `MovingWindow`.
36+
37+
As an example one can create a daily profile of specific weekdays which will be returned as numpy arrays.
3238

3339
## Bug Fixes
3440

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""
5+
Benchmarks for the PeriodicFeatureExtractor class.
6+
7+
This module contains benchmarks that are comparing
8+
the performance of a numpy implementation with a python
9+
implementation.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import asyncio
15+
import logging
16+
from datetime import datetime, timedelta, timezone
17+
from functools import partial
18+
from timeit import timeit
19+
from typing import List
20+
21+
import numpy as np
22+
from frequenz.channels import Broadcast
23+
from numpy.random import default_rng
24+
from numpy.typing import NDArray
25+
26+
from frequenz.sdk.timeseries import MovingWindow, PeriodicFeatureExtractor, Sample
27+
28+
29+
async def init_feature_extractor(period: int) -> PeriodicFeatureExtractor:
30+
"""Initialize the PeriodicFeatureExtractor class."""
31+
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
32+
lm_chan = Broadcast[Sample]("lm_net_power")
33+
moving_window = MovingWindow(
34+
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
35+
)
36+
37+
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), 0))
38+
39+
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
40+
# This works since the sampling period is set to 1 second.
41+
return PeriodicFeatureExtractor(moving_window, timedelta(seconds=period))
42+
43+
44+
def _calculate_avg_window(
45+
feature_extractor: PeriodicFeatureExtractor,
46+
window: NDArray[np.float_],
47+
window_size: int,
48+
) -> NDArray[np.float_]:
49+
"""
50+
Reshapes the window and calculates the average.
51+
52+
This method calculates the average of a window by averaging over all
53+
windows fully inside the passed numpy array having the period
54+
`self.period`.
55+
56+
Args:
57+
feature_extractor: The instance of the PeriodicFeatureExtractor to use.
58+
window: The window to calculate the average over.
59+
window_size: The size of the window to calculate the average over.
60+
weights: The weights to use for the average calculation.
61+
62+
Returns:
63+
The averaged window.
64+
"""
65+
reshaped = feature_extractor._reshape_np_array( # pylint: disable=protected-access
66+
window, window_size
67+
)
68+
# ignoring the type because np.average returns Any
69+
return np.average(reshaped[:, :window_size], axis=0) # type: ignore[no-any-return]
70+
71+
72+
def _calculate_avg_window_py(
73+
feature_extractor: PeriodicFeatureExtractor,
74+
window: NDArray[np.float_],
75+
window_size: int,
76+
weights: List[float] | None = None,
77+
) -> NDArray[np.float_]:
78+
"""
79+
Plain python version of the average calculator.
80+
81+
This method avoids copying in any case but is 15 to 600 slower then the
82+
numpy version.
83+
84+
This method is only used in these benchmarks.
85+
86+
Args:
87+
feature_extractor: The instance of the PeriodicFeatureExtractor to use.
88+
window: The window to calculate the average over.
89+
window_size: The size of the window to calculate the average over.
90+
weights: The weights to use for the average calculation.
91+
92+
Returns:
93+
The averaged window.
94+
"""
95+
96+
def _num_windows(
97+
window: NDArray[np.float_] | MovingWindow, window_size: int, period: int
98+
) -> int:
99+
"""
100+
Get the number of windows that are fully contained in the MovingWindow.
101+
102+
This method calculates how often a given window, defined by it size, is
103+
fully contained in the MovingWindow at its current state or any numpy
104+
ndarray given the period between two window neighbors.
105+
106+
Args:
107+
window: The buffer that is used for the average calculation.
108+
window_size: The size of the window in samples.
109+
110+
Returns:
111+
The number of windows that are fully contained in the MovingWindow.
112+
"""
113+
num_windows = len(window) // period
114+
if len(window) - num_windows * period >= window_size:
115+
num_windows += 1
116+
117+
return num_windows
118+
119+
period = feature_extractor._period # pylint: disable=protected-access
120+
121+
num_windows = _num_windows(
122+
window,
123+
window_size,
124+
period,
125+
)
126+
127+
res = np.empty(window_size)
128+
129+
for i in range(window_size):
130+
assert num_windows * period - len(window) <= period
131+
summe = 0
132+
for j in range(num_windows):
133+
if weights is None:
134+
summe += window[i + (j * period)]
135+
else:
136+
summe += weights[j] * window[i + (j * period)]
137+
138+
if not weights:
139+
res[i] = summe / num_windows
140+
else:
141+
res[i] = summe / np.sum(weights)
142+
143+
return res
144+
145+
146+
def run_benchmark(
147+
array: NDArray[np.float_],
148+
window_size: int,
149+
feature_extractor: PeriodicFeatureExtractor,
150+
) -> None:
151+
"""Run the benchmark for the given ndarray and window size."""
152+
153+
def run_avg_np(
154+
array: NDArray[np.float_],
155+
window_size: int,
156+
feature_extractor: PeriodicFeatureExtractor,
157+
) -> None:
158+
"""
159+
Run the FeatureExtractor.
160+
161+
The return value is discarded such that it can be used by timit.
162+
163+
Args:
164+
a: The array containing all data.
165+
window_size: The size of the window.
166+
feature_extractor: An instance of the PeriodicFeatureExtractor.
167+
"""
168+
_calculate_avg_window(feature_extractor, array, window_size)
169+
170+
def run_avg_py(
171+
array: NDArray[np.float_],
172+
window_size: int,
173+
feature_extractor: PeriodicFeatureExtractor,
174+
) -> None:
175+
"""
176+
Run the FeatureExtractor.
177+
178+
The return value is discarded such that it can be used by timit.
179+
180+
Args:
181+
a: The array containing all data.
182+
window_size: The size of the window.
183+
feature_extractor: An instance of the PeriodicFeatureExtractor.
184+
"""
185+
_calculate_avg_window_py(feature_extractor, array, window_size)
186+
187+
time_np = timeit(
188+
partial(run_avg_np, array, window_size, feature_extractor), number=10
189+
)
190+
time_py = timeit(
191+
partial(run_avg_py, array, window_size, feature_extractor), number=10
192+
)
193+
print(time_np)
194+
print(time_py)
195+
print(f"Numpy is {time_py / time_np} times faster!")
196+
197+
198+
DAY_S = 24 * 60 * 60
199+
200+
201+
async def main() -> None:
202+
"""
203+
Run the benchmarks.
204+
205+
The benchmark are comparing the numpy
206+
implementation with the python implementation.
207+
"""
208+
# initialize random number generator
209+
rng = default_rng()
210+
211+
# create a random ndarray with 29 days -5 seconds of data
212+
days_29_s = 29 * DAY_S
213+
feature_extractor = await init_feature_extractor(10)
214+
data = rng.standard_normal(days_29_s)
215+
run_benchmark(data, 4, feature_extractor)
216+
217+
days_29_s = 29 * DAY_S + 3
218+
data = rng.standard_normal(days_29_s)
219+
run_benchmark(data, 4, feature_extractor)
220+
221+
# create a random ndarray with 29 days +5 seconds of data
222+
data = rng.standard_normal(29 * DAY_S + 5)
223+
224+
feature_extractor = await init_feature_extractor(7 * DAY_S)
225+
# TEST one day window and 6 days distance. COPY (Case 3)
226+
run_benchmark(data, DAY_S, feature_extractor)
227+
# benchmark one day window and 6 days distance. NO COPY (Case 1)
228+
run_benchmark(data[: 28 * DAY_S], DAY_S, feature_extractor)
229+
230+
231+
logging.basicConfig(level=logging.DEBUG)
232+
asyncio.run(main())

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@
3737

3838
from ._base_types import UNIX_EPOCH, Sample, Sample3Phase
3939
from ._moving_window import MovingWindow
40+
from ._periodic_feature_extractor import PeriodicFeatureExtractor
4041
from ._resampling import ResamplerConfig
4142

4243
__all__ = [
4344
"MovingWindow",
45+
"PeriodicFeatureExtractor",
4446
"ResamplerConfig",
4547
"Sample",
4648
"Sample3Phase",

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ def __init__( # pylint: disable=too-many-arguments
164164
input_sampling_period <= size
165165
), "The input sampling period should be equal to or lower than the window size."
166166

167-
sampling = input_sampling_period
167+
self._sampling_period = input_sampling_period
168+
168169
self._resampler: Resampler | None = None
169170
self._resampler_sender: Sender[Sample] | None = None
170171
self._resampler_task: asyncio.Task[None] | None = None
@@ -175,15 +176,17 @@ def __init__( # pylint: disable=too-many-arguments
175176
), "The resampling period should be equal to or lower than the window size."
176177

177178
self._resampler = Resampler(resampler_config)
178-
sampling = resampler_config.resampling_period
179+
self._sampling_period = resampler_config.resampling_period
179180

180181
# Sampling period might not fit perfectly into the window size.
181-
num_samples = math.ceil(size.total_seconds() / sampling.total_seconds())
182+
num_samples = math.ceil(
183+
size.total_seconds() / self._sampling_period.total_seconds()
184+
)
182185

183186
self._resampled_data_recv = resampled_data_recv
184187
self._buffer = OrderedRingBuffer(
185188
np.empty(shape=num_samples, dtype=float),
186-
sampling_period=sampling,
189+
sampling_period=self._sampling_period,
187190
align_to=align_to,
188191
)
189192

@@ -194,6 +197,16 @@ def __init__( # pylint: disable=too-many-arguments
194197
self._run_impl()
195198
)
196199

200+
@property
201+
def sampling_period(self) -> timedelta:
202+
"""
203+
Return the sampling period of the MovingWindow.
204+
205+
Returns:
206+
The sampling period of the MovingWindow.
207+
"""
208+
return self._sampling_period
209+
197210
async def _run_impl(self) -> None:
198211
"""Awaits samples from the receiver and updates the underlying ring buffer.
199212
@@ -292,7 +305,17 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik
292305
an numpy array if the key is a slice.
293306
"""
294307
if isinstance(key, slice):
308+
if isinstance(key.start, int) or isinstance(key.stop, int):
309+
if key.start is None or key.stop is None:
310+
key = slice(slice(key.start, key.stop).indices(self.__len__()))
311+
elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
312+
if key.start is None:
313+
key = slice(self._buffer.time_bound_oldest, key.stop)
314+
if key.stop is None:
315+
key = slice(key.start, self._buffer.time_bound_newest)
316+
295317
_logger.debug("Returning slice for [%s:%s].", key.start, key.stop)
318+
296319
# we are doing runtime typechecks since there is no abstract slice type yet
297320
# see also (https://peps.python.org/pep-0696)
298321
if isinstance(key.start, datetime) and isinstance(key.stop, datetime):

0 commit comments

Comments
 (0)