Skip to content

Commit 9702d2a

Browse files
committed
Use Sample type for ringbuffer
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent db847b4 commit 9702d2a

File tree

3 files changed

+67
-71
lines changed

3 files changed

+67
-71
lines changed

benchmarks/timeseries/benchmark_ringbuffer.py

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
import random
77
import timeit
88
from datetime import datetime, timedelta
9-
from typing import Any, TypeVar
9+
from typing import Any, Dict, TypeVar
1010

1111
import numpy as np
1212

13+
from frequenz.sdk.timeseries import Sample
1314
from frequenz.sdk.timeseries._ringbuffer import OrderedRingBuffer
1415

1516
MINUTES_IN_A_DAY = 24 * 60
@@ -19,9 +20,7 @@
1920
T = TypeVar("T")
2021

2122

22-
def fill_buffer(
23-
days: int, buffer: OrderedRingBuffer[T, Any], element_type: type
24-
) -> None:
23+
def fill_buffer(days: int, buffer: OrderedRingBuffer[Any]) -> None:
2524
"""Fill the given buffer up to the given amount of days, one sample per minute."""
2625
random.seed(0)
2726
basetime = datetime(2022, 1, 1)
@@ -31,14 +30,12 @@ def fill_buffer(
3130
# Push in random order
3231
for i in random.sample(range(MINUTES_IN_A_DAY), MINUTES_IN_A_DAY):
3332
buffer.update(
34-
basetime + timedelta(days=day, minutes=i, seconds=i % 3),
35-
element_type(i),
33+
Sample(basetime + timedelta(days=day, minutes=i, seconds=i % 3))
3634
)
3735

3836

39-
def test_days(days: int, buffer: OrderedRingBuffer[T, Any]) -> None:
37+
def test_days(days: int, buffer: OrderedRingBuffer[Any]) -> None:
4038
"""Gets the data for each of the 29 days."""
41-
4239
basetime = datetime(2022, 1, 1)
4340

4441
for day in range(days):
@@ -48,13 +45,12 @@ def test_days(days: int, buffer: OrderedRingBuffer[T, Any]) -> None:
4845
)
4946

5047

51-
def test_slices(days: int, buffer: OrderedRingBuffer[T, Any], median: bool) -> None:
48+
def test_slices(days: int, buffer: OrderedRingBuffer[Any], median: bool) -> None:
5249
"""Benchmark slicing.
5350
5451
Takes a buffer, fills it up and then excessively gets
5552
the data for each day to calculate the average/median.
5653
"""
57-
5854
basetime = datetime(2022, 1, 1)
5955

6056
total = 0.0
@@ -71,18 +67,17 @@ def test_slices(days: int, buffer: OrderedRingBuffer[T, Any], median: bool) -> N
7167
total += float(np.average(minutes))
7268

7369

74-
def test_29_days_list(num_runs: int) -> dict:
70+
def test_29_days_list(num_runs: int) -> Dict[str, float]:
7571
"""Run the 29 day test on the list backend."""
76-
7772
days = 29
7873
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
7974

80-
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer, int)).timeit(number=1)
75+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
8176
test_time = timeit.Timer(lambda: test_days(days, buffer)).timeit(number=num_runs)
8277
return {"fill": fill_time, "test": test_time}
8378

8479

85-
def test_29_days_array(num_runs: int) -> dict:
80+
def test_29_days_array(num_runs: int) -> Dict[str, float]:
8681
"""Run the 29 day test on the array backend."""
8782
days = 29
8883
buffer = OrderedRingBuffer(
@@ -92,17 +87,17 @@ def test_29_days_array(num_runs: int) -> dict:
9287
timedelta(minutes=1),
9388
)
9489

95-
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer, int)).timeit(number=1)
90+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
9691
test_time = timeit.Timer(lambda: test_days(days, buffer)).timeit(number=num_runs)
9792
return {"fill": fill_time, "test": test_time}
9893

9994

100-
def test_29_days_slicing_list(num_runs: int) -> dict:
95+
def test_29_days_slicing_list(num_runs: int) -> Dict[str, float]:
10196
"""Run slicing tests on list backend."""
10297
days = 29
10398
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
10499

105-
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer, int)).timeit(number=1)
100+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
106101
median_test_time = timeit.Timer(
107102
lambda: test_slices(days, buffer, median=True)
108103
).timeit(number=num_runs)
@@ -113,7 +108,7 @@ def test_29_days_slicing_list(num_runs: int) -> dict:
113108
return {"fill": fill_time, "median": median_test_time, "avg": avg_test_time}
114109

115110

116-
def test_29_days_slicing_array(num_runs: int) -> dict:
111+
def test_29_days_slicing_array(num_runs: int) -> Dict[str, float]:
117112
"""Run slicing tests on array backend."""
118113
days = 29
119114
buffer = OrderedRingBuffer(
@@ -123,7 +118,7 @@ def test_29_days_slicing_array(num_runs: int) -> dict:
123118
timedelta(minutes=1),
124119
)
125120

126-
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer, int)).timeit(number=1)
121+
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
127122
median_test_time = timeit.Timer(
128123
lambda: test_slices(days, buffer, median=True)
129124
).timeit(number=num_runs)
@@ -139,31 +134,31 @@ def main() -> None:
139134
140135
Result of previous run:
141136
142-
Date: Mi 1. Feb 17:15:02 CET 2023
137+
Date: Mi 1. Feb 17:19:51 CET 2023
143138
Result:
144139
145140
=====================
146141
Array: ..filling
147142
List: ..filling
148143
Time to fill 29 days with data:
149-
Array: 7.190492740017362 seconds
150-
List: 7.209744154009968 seconds
151-
Diff: -0.019251413992606103
144+
Array: 7.214875044999644 seconds
145+
List: 7.174421657982748 seconds
146+
Diff: 0.04045338701689616
152147
Day-Slices into 29 days with data:
153-
Array: 0.0001254317001439631 seconds
154-
List: 0.00017958255193661898 seconds
155-
Diff: -5.4150851792655874e-05
148+
Array: 0.0001304591482039541 seconds
149+
List: 0.00019963659869972616 seconds
150+
Diff: -6.917745049577205e-05
156151
=====================
157152
Array: ..filling
158153
List: ..filling
159154
Avg of windows of 29 days and running average & mean on every day:
160-
Array: 0.0007975498505402356 seconds
161-
List: 0.0042349924508016555 seconds
162-
Diff: -0.0034374426002614198
155+
Array: 0.0007829780981410295 seconds
156+
List: 0.0042931242496706545 seconds
157+
Diff: -0.0035101461515296252
163158
Median of windows of 29 days and running average & mean on every day:
164-
Array: 0.0021774103515781462 seconds
165-
List: 0.004992740901070647 seconds
166-
Diff: -0.0028153305494925005
159+
Array: 0.0021195551002165304 seconds
160+
List: 0.00501448459981475 seconds
161+
Diff: -0.00289492949959822
167162
"""
168163
num_runs = 20
169164

@@ -208,7 +203,8 @@ def main() -> None:
208203
"Median of windows of 29 days and running average & mean on every day:\n\t"
209204
+ f"Array: {slicing_array_times['median']/num_runs} seconds\n\t"
210205
+ f"List: {slicing_list_times['median']/num_runs} seconds\n\t"
211-
+ f"Diff: {slicing_array_times['median']/num_runs - slicing_list_times['median']/num_runs}"
206+
+ "Diff: "
207+
+ f"{slicing_array_times['median']/num_runs - slicing_list_times['median']/num_runs}"
212208
)
213209

214210

src/frequenz/sdk/timeseries/_ringbuffer.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
from typing import Any, Generic, List, Sequence, TypeVar
1212

1313
import numpy as np
14+
from numpy.lib import math
1415

15-
T = TypeVar("T")
16+
from frequenz.sdk.timeseries import Sample
1617

1718
Container = TypeVar("Container", list, np.ndarray)
1819

@@ -41,7 +42,7 @@ def contains(self, timestamp: datetime):
4142
return False
4243

4344

44-
class OrderedRingBuffer(Generic[T, Container]):
45+
class OrderedRingBuffer(Generic[Container]):
4546
"""Time aware ringbuffer that keeps its entries sorted by time."""
4647

4748
def __init__(
@@ -105,28 +106,25 @@ def maxlen(self) -> int:
105106
"""
106107
return len(self._buffer)
107108

108-
def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
109+
def update(self, sample: Sample) -> None:
109110
"""Update the buffer with a new value for the given timestamp.
110111
112+
Missing values are written as NaN. Be advised that when
113+
`update()` is called with samples newer than the current time
114+
+ `sampling_period` (as is the case for loading historical data
115+
after an app restart), a gap of missing data exists. This gap
116+
does not contain NaN values but simply the old invalid values.
117+
The list of gaps returned by `gaps()` will reflect this and
118+
should be used as the only source of truth for unwritten data.
119+
111120
Args:
112-
timestamp: Timestamp of the new value.
113-
value: value to add.
114-
missing: if true, the given timestamp will be recorded as missing.
115-
The value will still be written.
116-
117-
Note: When relying on your own values to mark missing data (e.g.
118-
`math.nan`), be advised that when `update()` is called with
119-
timestamps newer than the current time + `sampling_period`
120-
(as is the case for loading historical data after an app
121-
restart), a gap of missing data exists.
122-
This gap does not contain NaN values but simply the old invalid values.
123-
The list of gaps returned by `gaps()` will reflect this.
121+
sample: Sample to add to the ringbuffer
124122
125123
Raises:
126124
IndexError: When the timestamp to be added is too old.
127125
"""
128126
# adjust timestamp to be exactly on the sample period time point
129-
timestamp = self._normalize_timestamp(timestamp)
127+
timestamp = self._normalize_timestamp(sample.timestamp)
130128

131129
# Don't add outdated entries
132130
if timestamp < self._datetime_oldest and self._datetime_oldest != datetime.max:
@@ -140,9 +138,10 @@ def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
140138
self._datetime_oldest = self._datetime_newest - self._time_range
141139

142140
# Update data
141+
value: float = math.nan if sample.value is None else sample.value
143142
self._buffer[self.datetime_to_index(timestamp)] = value
144143

145-
self._update_gaps(timestamp, prev_newest, missing)
144+
self._update_gaps(timestamp, prev_newest, sample.value is None)
146145

147146
def datetime_to_index(
148147
self, timestamp: datetime, allow_outside_range: bool = False
@@ -387,7 +386,7 @@ def _wrap(self, index: int) -> int:
387386
"""
388387
return index % self.maxlen
389388

390-
def __setitem__(self, index_or_slice: int | slice, value: T) -> None:
389+
def __setitem__(self, index_or_slice: int | slice, value: float) -> None:
391390
"""Set item or slice at requested position.
392391
393392
No wrapping of the index will be done.
@@ -399,7 +398,7 @@ def __setitem__(self, index_or_slice: int | slice, value: T) -> None:
399398
"""
400399
self._buffer.__setitem__(index_or_slice, value)
401400

402-
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
401+
def __getitem__(self, index_or_slice: int | slice) -> float | Sequence[float]:
403402
"""Get item or slice at requested position.
404403
405404
No wrapping of the index will be done.

0 commit comments

Comments
 (0)