Skip to content

Commit 179e763

Browse files
Add tests for MovingWindow
Signed-off-by: Matthias Wende <[email protected]>
1 parent cb24d3e commit 179e763

File tree

1 file changed

+77
-0
lines changed

1 file changed

+77
-0
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the moving window."""
5+
6+
import asyncio
7+
from datetime import datetime, timedelta
8+
from typing import Sequence, Tuple
9+
10+
import numpy as np
11+
from frequenz.channels import Broadcast, Sender
12+
13+
from frequenz.sdk.timeseries import Sample
14+
from frequenz.sdk.timeseries._moving_window import MovingWindow
15+
16+
17+
async def push_lm_data(sender: Sender[Sample], test_seq: Sequence[float]) -> None:
18+
"""
19+
Push data in the passed sender to mock `LogicalMeter` behaviour.
20+
Starting with the First of January 2023.
21+
22+
Args:
23+
sender: Sender for pushing resampled samples to the `MovingWindow`.
24+
test_seq: The Sequence that is pushed into the `MovingWindow`.
25+
"""
26+
start_ts: datetime = datetime(2023, 1, 1)
27+
for i, j in zip(test_seq, range(0, len(test_seq))):
28+
timestamp = start_ts + timedelta(seconds=j)
29+
await sender.send(Sample(timestamp, float(i)))
30+
31+
await asyncio.sleep(0.0)
32+
33+
34+
def init_moving_window(shape: int) -> Tuple[MovingWindow, Sender[Sample]]:
35+
"""
36+
Initialize the moving window with given shape
37+
38+
Args:
39+
shape: The size of the `MovingWindow`
40+
41+
Returns:
42+
tuple[MovingWindow, Sender[Sample]]: A pair of sender and `MovingWindow`.
43+
"""
44+
lm_chan = Broadcast[Sample]("lm_net_power")
45+
lm_tx = lm_chan.new_sender()
46+
window = MovingWindow(shape, lm_chan.new_receiver(), timedelta(seconds=1))
47+
return window, lm_tx
48+
49+
50+
async def test_access_window_by_index() -> None:
51+
"""Test indexing a window by integer index"""
52+
window, sender = init_moving_window(1)
53+
await push_lm_data(sender, [1])
54+
assert np.array_equal(window[0], 1.0)
55+
56+
57+
async def test_access_window_by_timestamp() -> None:
58+
"""Test indexing a window by timestamp"""
59+
window, sender = init_moving_window(1)
60+
await push_lm_data(sender, [1])
61+
assert np.array_equal(window[datetime(2023, 1, 1)], 1.0)
62+
63+
64+
async def test_access_window_by_int_slice() -> None:
65+
"""Test accessing a subwindow with an integer slice"""
66+
window, sender = init_moving_window(5)
67+
await push_lm_data(sender, range(0, 5))
68+
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))
69+
70+
71+
async def test_access_window_by_ts_slice() -> None:
72+
"""Test accessing a subwindow with a timestamp slice"""
73+
window, sender = init_moving_window(5)
74+
await push_lm_data(sender, range(0, 5))
75+
time_start = datetime(2023, 1, 1) + timedelta(seconds=3)
76+
time_end = time_start + timedelta(seconds=2)
77+
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore

0 commit comments

Comments
 (0)