Skip to content

Commit 30e3a73

Browse files
Use Sample type with the RingBuffer (#186)
fixes #165
2 parents 352e19c + 9702d2a commit 30e3a73

File tree

3 files changed

+150
-108
lines changed

3 files changed

+150
-108
lines changed

benchmarks/timeseries/benchmark_ringbuffer.py

Lines changed: 112 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
import random
77
import timeit
88
from datetime import datetime, timedelta
9-
from typing import Any, TypeVar
9+
from typing import Any, Dict, TypeVar
1010

1111
import numpy as np
1212

13+
from frequenz.sdk.timeseries import Sample
1314
from frequenz.sdk.timeseries._ringbuffer import OrderedRingBuffer
1415

1516
MINUTES_IN_A_DAY = 24 * 60
@@ -19,28 +20,22 @@
1920
T = TypeVar("T")
2021

2122

22-
def fill_buffer(
23-
days: int, buffer: OrderedRingBuffer[T, Any], element_type: type
24-
) -> None:
23+
def fill_buffer(days: int, buffer: OrderedRingBuffer[Any]) -> None:
2524
"""Fill the given buffer up to the given amount of days, one sample per minute."""
2625
random.seed(0)
2726
basetime = datetime(2022, 1, 1)
27+
print("..filling", end="", flush=True)
2828

2929
for day in range(days):
3030
# Push in random order
3131
for i in random.sample(range(MINUTES_IN_A_DAY), MINUTES_IN_A_DAY):
3232
buffer.update(
33-
basetime + timedelta(days=day, minutes=i, seconds=i % 3),
34-
element_type(i),
33+
Sample(basetime + timedelta(days=day, minutes=i, seconds=i % 3))
3534
)
3635

3736

38-
def test_days(days: int, buffer: OrderedRingBuffer[int, Any]) -> None:
39-
"""Fills a buffer completely up and then gets the data for each of the 29 days."""
40-
print(".", end="", flush=True)
41-
42-
fill_buffer(days, buffer, int)
43-
37+
def test_days(days: int, buffer: OrderedRingBuffer[Any]) -> None:
38+
"""Gets the data for each of the 29 days."""
4439
basetime = datetime(2022, 1, 1)
4540

4641
for day in range(days):
@@ -50,119 +45,166 @@ def test_days(days: int, buffer: OrderedRingBuffer[int, Any]) -> None:
5045
)
5146

5247

53-
def test_slices(days: int, buffer: OrderedRingBuffer[T, Any]) -> None:
48+
def test_slices(days: int, buffer: OrderedRingBuffer[Any], median: bool) -> None:
5449
"""Benchmark slicing.
5550
5651
Takes a buffer, fills it up and then excessively gets
5752
the data for each day to calculate the average/median.
5853
"""
59-
print(".", end="", flush=True)
60-
fill_buffer(days, buffer, float)
61-
62-
# Chose uneven starting point so that for the first/last window data has to
63-
# be copied
64-
basetime = datetime(2022, 1, 1, 0, 5, 13, 88)
54+
basetime = datetime(2022, 1, 1)
6555

66-
total_avg = 0.0
67-
total_median = 0.0
56+
total = 0.0
6857

69-
for _ in range(5):
58+
for _ in range(3):
7059
for day in range(days):
7160
minutes = buffer.window(
7261
basetime + timedelta(days=day), basetime + timedelta(days=day + 1)
7362
)
7463

75-
total_avg += float(np.average(minutes))
76-
total_median += float(np.median(minutes))
64+
if median:
65+
total += float(np.median(minutes))
66+
else:
67+
total += float(np.average(minutes))
7768

7869

79-
def test_29_days_list() -> None:
70+
def test_29_days_list(num_runs: int) -> Dict[str, float]:
8071
"""Run the 29 day test on the list backend."""
81-
test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1)))
72+
days = 29
73+
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
74+
75+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
76+
test_time = timeit.Timer(lambda: test_days(days, buffer)).timeit(number=num_runs)
77+
return {"fill": fill_time, "test": test_time}
8278

8379

84-
def test_29_days_array() -> None:
80+
def test_29_days_array(num_runs: int) -> Dict[str, float]:
8581
"""Run the 29 day test on the array backend."""
86-
test_days(
87-
29,
88-
OrderedRingBuffer(
89-
np.empty(
90-
shape=MINUTES_IN_29_DAYS,
91-
),
92-
timedelta(minutes=1),
82+
days = 29
83+
buffer = OrderedRingBuffer(
84+
np.empty(
85+
shape=MINUTES_IN_29_DAYS,
9386
),
87+
timedelta(minutes=1),
9488
)
9589

90+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
91+
test_time = timeit.Timer(lambda: test_days(days, buffer)).timeit(number=num_runs)
92+
return {"fill": fill_time, "test": test_time}
9693

97-
def test_29_days_slicing_list() -> None:
94+
95+
def test_29_days_slicing_list(num_runs: int) -> Dict[str, float]:
9896
"""Run slicing tests on list backend."""
99-
test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1)))
97+
days = 29
98+
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
99+
100+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
101+
median_test_time = timeit.Timer(
102+
lambda: test_slices(days, buffer, median=True)
103+
).timeit(number=num_runs)
104+
avg_test_time = timeit.Timer(
105+
lambda: test_slices(days, buffer, median=False)
106+
).timeit(number=num_runs)
100107

108+
return {"fill": fill_time, "median": median_test_time, "avg": avg_test_time}
101109

102-
def test_29_days_slicing_array() -> None:
110+
111+
def test_29_days_slicing_array(num_runs: int) -> Dict[str, float]:
103112
"""Run slicing tests on array backend."""
104-
test_slices(
105-
29,
106-
OrderedRingBuffer(
107-
np.empty(
108-
shape=MINUTES_IN_29_DAYS,
109-
),
110-
timedelta(minutes=1),
113+
days = 29
114+
buffer = OrderedRingBuffer(
115+
np.empty(
116+
shape=MINUTES_IN_29_DAYS,
111117
),
118+
timedelta(minutes=1),
112119
)
113120

121+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
122+
median_test_time = timeit.Timer(
123+
lambda: test_slices(days, buffer, median=True)
124+
).timeit(number=num_runs)
125+
avg_test_time = timeit.Timer(
126+
lambda: test_slices(days, buffer, median=False)
127+
).timeit(number=num_runs)
128+
129+
return {"fill": fill_time, "median": median_test_time, "avg": avg_test_time}
130+
114131

115132
def main() -> None:
116133
"""Run benchmark.
117134
118135
Result of previous run:
119136
120-
Date: Do 22. Dez 15:03:05 CET 2022
137+
Date: Mi 1. Feb 17:19:51 CET 2023
121138
Result:
122139
123-
=========================================
124-
Array: ........................................
125-
List: ........................................
140+
=====================
141+
Array: ..filling
142+
List: ..filling
126143
Time to fill 29 days with data:
127-
Array: 0.09411649959984061 seconds
128-
List: 0.0906366748000437 seconds
129-
Diff: 0.0034798247997969156
130-
=========================================
131-
Array: ........................................
132-
List: ........................................
133-
Filling 29 days and running average & mean on every day:
134-
Array: 0.09842290654996759 seconds
135-
List: 0.1316629376997298 seconds
136-
Diff: -0.03324003114976222
144+
Array: 7.214875044999644 seconds
145+
List: 7.174421657982748 seconds
146+
Diff: 0.04045338701689616
147+
Day-Slices into 29 days with data:
148+
Array: 0.0001304591482039541 seconds
149+
List: 0.00019963659869972616 seconds
150+
Diff: -6.917745049577205e-05
151+
=====================
152+
Array: ..filling
153+
List: ..filling
154+
Avg of windows of 29 days and running average & mean on every day:
155+
Array: 0.0007829780981410295 seconds
156+
List: 0.0042931242496706545 seconds
157+
Diff: -0.0035101461515296252
158+
Median of windows of 29 days and running average & mean on every day:
159+
Array: 0.0021195551002165304 seconds
160+
List: 0.00501448459981475 seconds
161+
Diff: -0.00289492949959822
137162
"""
138-
num_runs = 40
163+
num_runs = 20
139164

140165
print(f" {''.join(['='] * (num_runs + 1))}")
141166
print("Array: ", end="")
142-
duration_array = timeit.Timer(test_29_days_array).timeit(number=num_runs)
167+
array_times = test_29_days_array(num_runs)
168+
143169
print("\nList: ", end="")
144-
duration_list = timeit.Timer(test_29_days_list).timeit(number=num_runs)
170+
171+
list_times = test_29_days_list(num_runs)
145172
print("")
146173

147174
print(
148175
"Time to fill 29 days with data:\n\t"
149-
+ f"Array: {duration_array/num_runs} seconds\n\t"
150-
+ f"List: {duration_list/num_runs} seconds\n\t"
151-
+ f"Diff: {duration_array/num_runs - duration_list/num_runs}"
176+
+ f"Array: {array_times['fill']} seconds\n\t"
177+
+ f"List: {list_times['fill']} seconds\n\t"
178+
+ f"Diff: {array_times['fill'] - list_times['fill']}"
179+
)
180+
181+
print(
182+
"Day-Slices into 29 days with data:\n\t"
183+
+ f"Array: {array_times['test']/num_runs} seconds\n\t"
184+
+ f"List: {list_times['test']/num_runs} seconds\n\t"
185+
+ f"Diff: {array_times['test']/num_runs - list_times['test']/num_runs}"
152186
)
153187

154188
print(f" {''.join(['='] * (num_runs + 1))}")
155189
print("Array: ", end="")
156-
duration_array = timeit.Timer(test_29_days_slicing_array).timeit(number=num_runs)
190+
slicing_array_times = test_29_days_slicing_array(num_runs)
157191
print("\nList: ", end="")
158-
duration_list = timeit.Timer(test_29_days_slicing_list).timeit(number=num_runs)
192+
slicing_list_times = test_29_days_slicing_list(num_runs)
159193
print("")
160194

161195
print(
162-
"Filling 29 days and running average & mean on every day:\n\t"
163-
+ f"Array: {duration_array/num_runs} seconds\n\t"
164-
+ f"List: {duration_list/num_runs} seconds\n\t"
165-
+ f"Diff: {duration_array/num_runs - duration_list/num_runs}"
196+
"Avg of windows of 29 days and running average & mean on every day:\n\t"
197+
+ f"Array: {slicing_array_times['avg']/num_runs} seconds\n\t"
198+
+ f"List: {slicing_list_times['avg']/num_runs} seconds\n\t"
199+
+ f"Diff: {slicing_array_times['avg']/num_runs - slicing_list_times['avg']/num_runs}"
200+
)
201+
202+
print(
203+
"Median of windows of 29 days and running average & mean on every day:\n\t"
204+
+ f"Array: {slicing_array_times['median']/num_runs} seconds\n\t"
205+
+ f"List: {slicing_list_times['median']/num_runs} seconds\n\t"
206+
+ "Diff: "
207+
+ f"{slicing_array_times['median']/num_runs - slicing_list_times['median']/num_runs}"
166208
)
167209

168210

src/frequenz/sdk/timeseries/_ringbuffer.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from typing import Any, Generic, List, Sequence, TypeVar
1212

1313
import numpy as np
14+
from numpy.lib import math
1415

15-
T = TypeVar("T")
16+
from frequenz.sdk.timeseries import Sample
1617

1718
Container = TypeVar("Container", list, np.ndarray)
1819

@@ -41,7 +42,7 @@ def contains(self, timestamp: datetime):
4142
return False
4243

4344

44-
class OrderedRingBuffer(Generic[T, Container]):
45+
class OrderedRingBuffer(Generic[Container]):
4546
"""Time aware ringbuffer that keeps its entries sorted by time."""
4647

4748
def __init__(
@@ -105,28 +106,25 @@ def maxlen(self) -> int:
105106
"""
106107
return len(self._buffer)
107108

108-
def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
109+
def update(self, sample: Sample) -> None:
109110
"""Update the buffer with a new value for the given timestamp.
110111
112+
Missing values are written as NaN. Be advised that when
113+
`update()` is called with samples newer than the current time
114+
+ `sampling_period` (as is the case for loading historical data
115+
after an app restart), a gap of missing data exists. This gap
116+
does not contain NaN values but simply the old invalid values.
117+
The list of gaps returned by `gaps()` will reflect this and
118+
should be used as the only source of truth for unwritten data.
119+
111120
Args:
112-
timestamp: Timestamp of the new value.
113-
value: value to add.
114-
missing: if true, the given timestamp will be recorded as missing.
115-
The value will still be written.
116-
117-
Note: When relying on your own values to mark missing data (e.g.
118-
`math.nan`), be advised that when `update()` is called with
119-
timestamps newer than the current time + `sampling_period`
120-
(as is the case for loading historical data after an app
121-
restart), a gap of missing data exists.
122-
This gap does not contain NaN values but simply the old invalid values.
123-
The list of gaps returned by `gaps()` will reflect this.
121+
sample: Sample to add to the ringbuffer
124122
125123
Raises:
126124
IndexError: When the timestamp to be added is too old.
127125
"""
128126
# adjust timestamp to be exactly on the sample period time point
129-
timestamp = self._normalize_timestamp(timestamp)
127+
timestamp = self._normalize_timestamp(sample.timestamp)
130128

131129
# Don't add outdated entries
132130
if timestamp < self._datetime_oldest and self._datetime_oldest != datetime.max:
@@ -140,9 +138,10 @@ def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
140138
self._datetime_oldest = self._datetime_newest - self._time_range
141139

142140
# Update data
141+
value: float = math.nan if sample.value is None else sample.value
143142
self._buffer[self.datetime_to_index(timestamp)] = value
144143

145-
self._update_gaps(timestamp, prev_newest, missing)
144+
self._update_gaps(timestamp, prev_newest, sample.value is None)
146145

147146
def datetime_to_index(
148147
self, timestamp: datetime, allow_outside_range: bool = False
@@ -387,7 +386,7 @@ def _wrap(self, index: int) -> int:
387386
"""
388387
return index % self.maxlen
389388

390-
def __setitem__(self, index_or_slice: int | slice, value: T) -> None:
389+
def __setitem__(self, index_or_slice: int | slice, value: float) -> None:
391390
"""Set item or slice at requested position.
392391
393392
No wrapping of the index will be done.
@@ -399,7 +398,7 @@ def __setitem__(self, index_or_slice: int | slice, value: T) -> None:
399398
"""
400399
self._buffer.__setitem__(index_or_slice, value)
401400

402-
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
401+
def __getitem__(self, index_or_slice: int | slice) -> float | Sequence[float]:
403402
"""Get item or slice at requested position.
404403
405404
No wrapping of the index will be done.

0 commit comments

Comments
 (0)