Skip to content

Commit 2000da0

Browse files
Serializing Ringbuffer (#167)
based on #139
2 parents b4e1efe + 9ea5468 commit 2000da0

File tree

7 files changed

+350
-28
lines changed

7 files changed

+350
-28
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* A `MovingWindow` class has been added that consumes a data stream from a logical meter and updates an `OrderedRingBuffer`.
1212
* Add EVChargerPool implementation. It has only streaming state changes for ev chargers, now.
1313
* Add 3-phase current formulas: `3-phase grid_current` and `3-phase ev_charger_current` to the LogicalMeter.
14-
14+
* A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk.
1515

1616
## Bug Fixes
1717

benchmarks/timeseries/benchmark_ringbuffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def test_slices(days: int, buffer: OrderedRingBuffer[Any], median: bool) -> None
7070
def test_29_days_list(num_runs: int) -> Dict[str, float]:
7171
"""Run the 29 day test on the list backend."""
7272
days = 29
73-
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
73+
buffer = OrderedRingBuffer([0.0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
7474

7575
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
7676
test_time = timeit.Timer(lambda: test_days(days, buffer)).timeit(number=num_runs)
@@ -95,7 +95,7 @@ def test_29_days_array(num_runs: int) -> Dict[str, float]:
9595
def test_29_days_slicing_list(num_runs: int) -> Dict[str, float]:
9696
"""Run slicing tests on list backend."""
9797
days = 29
98-
buffer = OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
98+
buffer = OrderedRingBuffer([0.0] * MINUTES_IN_29_DAYS, timedelta(minutes=1))
9999

100100
fill_time = timeit.Timer(lambda: fill_buffer(days, buffer)).timeit(number=1)
101101
median_test_time = timeit.Timer(
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Benchmarks the `SerializableRingbuffer` class."""
5+
6+
from __future__ import annotations
7+
8+
import fnmatch
9+
import os
10+
import time
11+
from datetime import datetime, timedelta
12+
from typing import Any
13+
14+
import numpy as np
15+
16+
from frequenz.sdk.timeseries import Sample
17+
from frequenz.sdk.timeseries._serializable_ringbuffer import SerializableRingBuffer
18+
19+
FILE_NAME = "ringbuffer.pkl"
20+
FIVE_MINUTES = timedelta(minutes=5)
21+
22+
# Size of the ringbuffer to dump/load
23+
SIZE = 4000_000
24+
# Number of iterations to run the benchmark
25+
ITERATIONS = 100
26+
27+
28+
def delete_files_with_prefix(prefix: str) -> None:
29+
"""Delete all files starting with the given prefix.
30+
31+
Args:
32+
prefix: Prefix of the files to delete
33+
"""
34+
for file in os.listdir():
35+
if fnmatch.fnmatch(file, prefix + "*"):
36+
os.remove(file)
37+
38+
39+
def benchmark_serialization(
40+
ringbuffer: SerializableRingBuffer[Any], iterations: int
41+
) -> float:
42+
"""Benchmark the given buffer `iteration` times.
43+
44+
Args:
45+
ringbuffer: Ringbuffer to benchmark to serialize.
46+
iterations: amount of iterations to run.
47+
"""
48+
total = 0.0
49+
for _ in range(iterations):
50+
start = time.time()
51+
ringbuffer.dump()
52+
SerializableRingBuffer.load(FILE_NAME)
53+
end = time.time()
54+
total += end - start
55+
delete_files_with_prefix(FILE_NAME)
56+
57+
return total / iterations
58+
59+
60+
def main() -> None:
61+
"""Run Benchmark."""
62+
ringbuffer = SerializableRingBuffer(
63+
np.arange(0, SIZE, dtype=np.float64), timedelta(minutes=5), FILE_NAME
64+
)
65+
66+
print("size:", SIZE)
67+
print("iterations:", ITERATIONS)
68+
69+
for i in range(0, SIZE, 10000):
70+
ringbuffer.update(
71+
Sample(datetime.fromtimestamp(200 + i * FIVE_MINUTES.total_seconds()), i)
72+
)
73+
74+
print(
75+
"Avg time for Pickle dump/load: "
76+
f"{benchmark_serialization(ringbuffer, ITERATIONS)}s"
77+
)
78+
79+
80+
if __name__ == "__main__":
81+
main()

src/frequenz/sdk/timeseries/_ringbuffer.py

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55

66
from __future__ import annotations
77

8+
from collections.abc import Iterable
89
from copy import deepcopy
910
from dataclasses import dataclass
1011
from datetime import datetime, timedelta
11-
from typing import Any, Generic, List, Sequence, TypeVar
12+
from typing import Any, Generic, List, TypeVar, overload
1213

1314
import numpy as np
14-
from numpy.lib import math
15+
import numpy.typing as npt
1516

16-
from frequenz.sdk.timeseries import Sample
17+
from . import Sample
1718

18-
Container = TypeVar("Container", list, np.ndarray)
19+
FloatArray = TypeVar("FloatArray", List[float], npt.NDArray[np.float64])
1920

2021

2122
@dataclass
@@ -42,12 +43,12 @@ def contains(self, timestamp: datetime):
4243
return False
4344

4445

45-
class OrderedRingBuffer(Generic[Container]):
46+
class OrderedRingBuffer(Generic[FloatArray]):
4647
"""Time aware ringbuffer that keeps its entries sorted by time."""
4748

4849
def __init__(
4950
self,
50-
buffer: Container,
51+
buffer: FloatArray,
5152
sampling_period: timedelta,
5253
time_index_alignment: datetime = datetime(1, 1, 1),
5354
) -> None:
@@ -129,7 +130,7 @@ def update(self, sample: Sample) -> None:
129130
# Don't add outdated entries
130131
if timestamp < self._datetime_oldest and self._datetime_oldest != datetime.max:
131132
raise IndexError(
132-
f"Timestamp too old (cut-off is at {self._datetime_oldest})."
133+
f"Timestamp {timestamp} too old (cut-off is at {self._datetime_oldest})."
133134
)
134135

135136
# Update timestamps
@@ -138,7 +139,7 @@ def update(self, sample: Sample) -> None:
138139
self._datetime_oldest = self._datetime_newest - self._time_range
139140

140141
# Update data
141-
value: float = math.nan if sample.value is None else sample.value
142+
value: float = np.nan if sample.value is None else sample.value
142143
self._buffer[self.datetime_to_index(timestamp)] = value
143144

144145
self._update_gaps(timestamp, prev_newest, sample.value is None)
@@ -179,7 +180,7 @@ def datetime_to_index(
179180

180181
def window(
181182
self, start: datetime, end: datetime, force_copy: bool = False
182-
) -> Container:
183+
) -> FloatArray:
183184
"""Request a view on the data between start timestamp and end timestamp.
184185
185186
If the data is not used immediately it could be overwritten.
@@ -204,7 +205,7 @@ def window(
204205
copy of the data.
205206
206207
Raises:
207-
IndexError: when requesting a window with invalid timestamps.
208+
IndexError: When requesting a window with invalid timestamps.
208209
209210
Returns:
210211
The requested window
@@ -228,14 +229,13 @@ def window(
228229
window = np.concatenate((window, self._buffer[0:end_index]))
229230
return window
230231

231-
def in_gap(gap) -> bool:
232-
return gap.contains(start) or gap.contains(end)
233-
234232
# Return a copy if there are none-values in the data
235-
if force_copy or any(map(in_gap, self._gaps)):
236-
return deepcopy(self._buffer[start_index:end_index])
233+
if force_copy or any(
234+
map(lambda gap: gap.contains(start) or gap.contains(end), self._gaps)
235+
):
236+
return deepcopy(self[start_index:end_index])
237237

238-
return self._buffer[start_index:end_index]
238+
return self[start_index:end_index]
239239

240240
def is_missing(self, timestamp: datetime) -> bool:
241241
"""Check if the given timestamp falls within a gap.
@@ -386,7 +386,33 @@ def _wrap(self, index: int) -> int:
386386
"""
387387
return index % self.maxlen
388388

389-
def __setitem__(self, index_or_slice: int | slice, value: float) -> None:
389+
@overload
390+
def __setitem__(self, index_or_slice: slice, value: Iterable[float]) -> None:
391+
"""Set values at the request slice positions.
392+
393+
No wrapping of the index will be done.
394+
Create a feature request if you require this function.
395+
396+
Args:
397+
index_or_slice: Slice specification of the requested data.
398+
value: Sequence of value to set at the given range.
399+
"""
400+
401+
@overload
402+
def __setitem__(self, index_or_slice: int, value: float) -> None:
403+
"""Set value at requested index.
404+
405+
No wrapping of the index will be done.
406+
Create a feature request if you require this function.
407+
408+
Args:
409+
index_or_slice: Index of the data.
410+
value: Value to set at the given position.
411+
"""
412+
413+
def __setitem__(
414+
self, index_or_slice: int | slice, value: float | Iterable[float]
415+
) -> None:
390416
"""Set item or slice at requested position.
391417
392418
No wrapping of the index will be done.
@@ -398,7 +424,29 @@ def __setitem__(self, index_or_slice: int | slice, value: float) -> None:
398424
"""
399425
self._buffer.__setitem__(index_or_slice, value)
400426

401-
def __getitem__(self, index_or_slice: int | slice) -> float | Sequence[float]:
427+
@overload
428+
def __getitem__(self, index_or_slice: int) -> float:
429+
"""Get item at requested position.
430+
431+
No wrapping of the index will be done.
432+
Create a feature request if you require this function.
433+
434+
Args:
435+
index_or_slice: Index of the requested data.
436+
"""
437+
438+
@overload
439+
def __getitem__(self, index_or_slice: slice) -> FloatArray:
440+
"""Get the data described by the given slice.
441+
442+
No wrapping of the index will be done.
443+
Create a feature request if you require this function.
444+
445+
Args:
446+
index_or_slice: Slice specification of where the requested data is.
447+
"""
448+
449+
def __getitem__(self, index_or_slice: int | slice) -> float | FloatArray:
402450
"""Get item or slice at requested position.
403451
404452
No wrapping of the index will be done.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Ringbuffer implementation with serialization support."""
5+
6+
# For use of the class type hint inside the class itself.
7+
from __future__ import annotations
8+
9+
import pickle
10+
from datetime import datetime, timedelta
11+
from os.path import exists
12+
13+
from ._ringbuffer import FloatArray, OrderedRingBuffer
14+
15+
# Version of the file dumping/loading format
16+
FILE_FORMAT_VERSION: int = 1
17+
18+
19+
class SerializableRingBuffer(OrderedRingBuffer[FloatArray]):
20+
"""Sorted ringbuffer with serialization support."""
21+
22+
# pylint: disable=too-many-arguments
23+
def __init__(
24+
self,
25+
buffer: FloatArray,
26+
sampling_period: timedelta,
27+
path: str,
28+
time_index_alignment: datetime = datetime(1, 1, 1),
29+
) -> None:
30+
"""Initialize the time aware ringbuffer.
31+
32+
Args:
33+
buffer: Instance of a buffer container to use internally.
34+
sampling_period: Timedelta of the desired resampling period.
35+
path: Path to where the data should be saved to and loaded from.
36+
time_index_alignment: Arbitrary point in time used to align
37+
timestamped data with the index position in the buffer.
38+
Used to make the data stored in the buffer align with the
39+
beginning and end of the buffer borders.
40+
For example, if the `time_index_alignment` is set to
41+
"0001-01-01 12:00:00", and the `sampling_period` is set to
42+
1 hour and the length of the buffer is 24, then the data
43+
stored in the buffer could correspond to the time range from
44+
"2022-01-01 12:00:00" to "2022-01-02 12:00:00" (date chosen
45+
arbitrarily here).
46+
"""
47+
super().__init__(buffer, sampling_period, time_index_alignment)
48+
self._path = path
49+
self._file_format_version = FILE_FORMAT_VERSION
50+
51+
def dump(self) -> None:
52+
"""Dump data to disk.
53+
54+
Raises:
55+
I/O related exceptions when the file cannot be written.
56+
"""
57+
with open(self._path, mode="wb+") as fileobj:
58+
pickle.dump(self, fileobj)
59+
60+
@classmethod
61+
def load(cls, path: str) -> SerializableRingBuffer[FloatArray] | None:
62+
"""Load data from disk.
63+
64+
Args:
65+
path: Path to the file where the data is stored.
66+
67+
Raises:
68+
I/O related exceptions when file exists but isn't accessable for any
69+
reason.
70+
71+
Returns:
72+
`None` when the file doesn't exist, otherwise an instance of the
73+
`SerializableRingBuffer` class, loaded from disk.
74+
"""
75+
if not exists(path):
76+
return None
77+
78+
with open(path, mode="rb") as fileobj:
79+
instance: SerializableRingBuffer = pickle.load(fileobj)
80+
instance._path = path # pylint: disable=protected-access
81+
# Set latest file format version for next time it dumps.
82+
# pylint: disable=protected-access
83+
instance._file_format_version = FILE_FORMAT_VERSION
84+
85+
return instance

0 commit comments

Comments
 (0)