Skip to content

Commit 0adba10

Browse files
Add tests for the PeriodicFeatureExtractor
Signed-off-by: Matthias Wende <[email protected]>
1 parent 8220643 commit 0adba10

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the timeseries averager."""
5+
6+
from datetime import datetime, timedelta, timezone
7+
from typing import List
8+
9+
import numpy as np
10+
from frequenz.channels import Broadcast
11+
12+
from frequenz.sdk.timeseries import (
13+
UNIX_EPOCH,
14+
MovingWindow,
15+
PeriodicFeatureExtractor,
16+
Sample,
17+
)
18+
from tests.timeseries.test_moving_window import (
19+
init_moving_window,
20+
push_logical_meter_data,
21+
)
22+
23+
24+
async def init_feature_extractor(
25+
data: List[float], period: timedelta
26+
) -> PeriodicFeatureExtractor:
27+
"""
28+
Initialize a PeriodicFeatureExtractor with a `MovingWindow` that contains the data.
29+
30+
Args:
31+
data: The data that is pushed into the moving window.
32+
period: The distance between two successive windows.
33+
34+
Returns:
35+
PeriodicFeatureExtractor
36+
"""
37+
window, sender = init_moving_window(timedelta(seconds=len(data)))
38+
await push_logical_meter_data(sender, data)
39+
40+
return PeriodicFeatureExtractor(moving_window=window, period=period)
41+
42+
43+
async def init_feature_extractor_no_data(period: int) -> PeriodicFeatureExtractor:
44+
"""
45+
Initialize a PeriodicFeatureExtractor with a `MovingWindow` that contains no data.
46+
47+
Args:
48+
period: The distance between two successive windows.
49+
50+
Returns:
51+
PeriodicFeatureExtractor
52+
"""
53+
# We only need the moving window to initialize the PeriodicFeatureExtractor class.
54+
lm_chan = Broadcast[Sample]("lm_net_power")
55+
moving_window = MovingWindow(
56+
timedelta(seconds=1), lm_chan.new_receiver(), timedelta(seconds=1)
57+
)
58+
59+
await lm_chan.new_sender().send(Sample(datetime.now(tz=timezone.utc), 0))
60+
61+
# Initialize the PeriodicFeatureExtractor class with a period of period seconds.
62+
# This works since the sampling period is set to 1 second.
63+
return PeriodicFeatureExtractor(moving_window, timedelta(seconds=period))
64+
65+
66+
async def test_interval_shifting() -> None:
67+
"""
68+
Test if a interval is properly shifted into a moving window
69+
"""
70+
feature_extractor = await init_feature_extractor(
71+
[1, 2, 2, 1, 1, 1, 2, 2, 1, 1], timedelta(seconds=5)
72+
)
73+
74+
# Test if the timestamp is not shifted
75+
timestamp = datetime(2023, 1, 1, 0, 0, 1, tzinfo=timezone.utc)
76+
index_not_shifted = (
77+
feature_extractor._timestamp_to_rel_index( # pylint: disable=protected-access
78+
timestamp
79+
)
80+
% feature_extractor._period # pylint: disable=protected-access
81+
)
82+
assert index_not_shifted == 1
83+
84+
# Test if a timestamp in the window is shifted to the first appearance of the window
85+
timestamp = datetime(2023, 1, 1, 0, 0, 6, tzinfo=timezone.utc)
86+
index_shifted = (
87+
feature_extractor._timestamp_to_rel_index( # pylint: disable=protected-access
88+
timestamp
89+
)
90+
% feature_extractor._period # pylint: disable=protected-access
91+
)
92+
assert index_shifted == 1
93+
94+
# Test if a timestamp outside the window is shifted
95+
timestamp = datetime(2023, 1, 1, 0, 0, 11, tzinfo=timezone.utc)
96+
index_shifted = (
97+
feature_extractor._timestamp_to_rel_index( # pylint: disable=protected-access
98+
timestamp
99+
)
100+
% feature_extractor._period # pylint: disable=protected-access
101+
)
102+
assert index_shifted == 1
103+
104+
105+
async def test_feature_extractor() -> None:
106+
"""Test the feature extractor with a moving window that contains data."""
107+
start = UNIX_EPOCH + timedelta(seconds=1)
108+
end = start + timedelta(seconds=2)
109+
110+
data: List[float] = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 2, 2]
111+
112+
feature_extractor = await init_feature_extractor(data, timedelta(seconds=3))
113+
assert np.allclose(feature_extractor.avg(start, end), [5 / 3, 4 / 3])
114+
115+
feature_extractor = await init_feature_extractor(data, timedelta(seconds=4))
116+
assert np.allclose(feature_extractor.avg(start, end), [1, 2])
117+
118+
data: List[float] = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1] # type: ignore[no-redef]
119+
120+
feature_extractor = await init_feature_extractor(data, timedelta(seconds=5))
121+
assert np.allclose(feature_extractor.avg(start, end), [1.5, 1.5])
122+
123+
124+
async def test_profiler_calculate_np() -> None:
125+
"""
126+
Test the calculation of the average using a numpy array and compare it
127+
against the pure python method with the same functionality.
128+
"""
129+
data = np.array([2, 2.5, 1, 1, 1, 2])
130+
feature_extractor = await init_feature_extractor_no_data(4)
131+
window_size = 2
132+
reshaped = feature_extractor._reshape_np_array( # pylint: disable=protected-access
133+
data, window_size
134+
)
135+
result = np.average(reshaped[:, :window_size], axis=0)
136+
assert np.allclose(result, np.array([1.5, 2.25]))
137+
138+
data = np.array([2, 2, 1, 1, 2])
139+
feature_extractor = await init_feature_extractor_no_data(5)
140+
reshaped = feature_extractor._reshape_np_array( # pylint: disable=protected-access
141+
data, window_size
142+
)
143+
result = np.average(reshaped[:, :window_size], axis=0)
144+
assert np.allclose(result, np.array([2, 2]))

0 commit comments

Comments
 (0)