Skip to content

Commit 9ea5468

Browse files
committed
Add serializing ringbuffer
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent bd03d35 commit 9ea5468

File tree

4 files changed

+275
-1
lines changed

4 files changed

+275
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* A `MovingWindow` class has been added that consumes a data stream from a logical meter and updates an `OrderedRingBuffer`.
1212
* Add EVChargerPool implementation. It has only streaming state changes for ev chargers, now.
1313
* Add 3-phase current formulas: `3-phase grid_current` and `3-phase ev_charger_current` to the LogicalMeter.
14-
14+
* A new class `SerializableRingbuffer` is now available, extending the `OrderedRingBuffer` class with the ability to load & dump the data to disk.
1515

1616
## Bug Fixes
1717

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Benchmarks the `SerializableRingbuffer` class."""
5+
6+
from __future__ import annotations
7+
8+
import fnmatch
9+
import os
10+
import time
11+
from datetime import datetime, timedelta
12+
from typing import Any
13+
14+
import numpy as np
15+
16+
from frequenz.sdk.timeseries import Sample
17+
from frequenz.sdk.timeseries._serializable_ringbuffer import SerializableRingBuffer
18+
19+
FILE_NAME = "ringbuffer.pkl"
20+
FIVE_MINUTES = timedelta(minutes=5)
21+
22+
# Size of the ringbuffer to dump/load
23+
SIZE = 4000_000
24+
# Number of iterations to run the benchmark
25+
ITERATIONS = 100
26+
27+
28+
def delete_files_with_prefix(prefix: str) -> None:
29+
"""Delete all files starting with the given prefix.
30+
31+
Args:
32+
prefix: Prefix of the files to delete
33+
"""
34+
for file in os.listdir():
35+
if fnmatch.fnmatch(file, prefix + "*"):
36+
os.remove(file)
37+
38+
39+
def benchmark_serialization(
40+
ringbuffer: SerializableRingBuffer[Any], iterations: int
41+
) -> float:
42+
"""Benchmark the given buffer `iteration` times.
43+
44+
Args:
45+
ringbuffer: Ringbuffer to benchmark to serialize.
46+
iterations: amount of iterations to run.
47+
"""
48+
total = 0.0
49+
for _ in range(iterations):
50+
start = time.time()
51+
ringbuffer.dump()
52+
SerializableRingBuffer.load(FILE_NAME)
53+
end = time.time()
54+
total += end - start
55+
delete_files_with_prefix(FILE_NAME)
56+
57+
return total / iterations
58+
59+
60+
def main() -> None:
61+
"""Run Benchmark."""
62+
ringbuffer = SerializableRingBuffer(
63+
np.arange(0, SIZE, dtype=np.float64), timedelta(minutes=5), FILE_NAME
64+
)
65+
66+
print("size:", SIZE)
67+
print("iterations:", ITERATIONS)
68+
69+
for i in range(0, SIZE, 10000):
70+
ringbuffer.update(
71+
Sample(datetime.fromtimestamp(200 + i * FIVE_MINUTES.total_seconds()), i)
72+
)
73+
74+
print(
75+
"Avg time for Pickle dump/load: "
76+
f"{benchmark_serialization(ringbuffer, ITERATIONS)}s"
77+
)
78+
79+
80+
if __name__ == "__main__":
81+
main()
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Ringbuffer implementation with serialization support."""
5+
6+
# For use of the class type hint inside the class itself.
7+
from __future__ import annotations
8+
9+
import pickle
10+
from datetime import datetime, timedelta
11+
from os.path import exists
12+
13+
from ._ringbuffer import FloatArray, OrderedRingBuffer
14+
15+
# Version of the file dumping/loading format
16+
FILE_FORMAT_VERSION: int = 1
17+
18+
19+
class SerializableRingBuffer(OrderedRingBuffer[FloatArray]):
20+
"""Sorted ringbuffer with serialization support."""
21+
22+
# pylint: disable=too-many-arguments
23+
def __init__(
24+
self,
25+
buffer: FloatArray,
26+
sampling_period: timedelta,
27+
path: str,
28+
time_index_alignment: datetime = datetime(1, 1, 1),
29+
) -> None:
30+
"""Initialize the time aware ringbuffer.
31+
32+
Args:
33+
buffer: Instance of a buffer container to use internally.
34+
sampling_period: Timedelta of the desired resampling period.
35+
path: Path to where the data should be saved to and loaded from.
36+
time_index_alignment: Arbitrary point in time used to align
37+
timestamped data with the index position in the buffer.
38+
Used to make the data stored in the buffer align with the
39+
beginning and end of the buffer borders.
40+
For example, if the `time_index_alignment` is set to
41+
"0001-01-01 12:00:00", and the `sampling_period` is set to
42+
1 hour and the length of the buffer is 24, then the data
43+
stored in the buffer could correspond to the time range from
44+
"2022-01-01 12:00:00" to "2022-01-02 12:00:00" (date chosen
45+
arbitrarily here).
46+
"""
47+
super().__init__(buffer, sampling_period, time_index_alignment)
48+
self._path = path
49+
self._file_format_version = FILE_FORMAT_VERSION
50+
51+
def dump(self) -> None:
52+
"""Dump data to disk.
53+
54+
Raises:
55+
I/O related exceptions when the file cannot be written.
56+
"""
57+
with open(self._path, mode="wb+") as fileobj:
58+
pickle.dump(self, fileobj)
59+
60+
@classmethod
61+
def load(cls, path: str) -> SerializableRingBuffer[FloatArray] | None:
62+
"""Load data from disk.
63+
64+
Args:
65+
path: Path to the file where the data is stored.
66+
67+
Raises:
68+
I/O related exceptions when file exists but isn't accessable for any
69+
reason.
70+
71+
Returns:
72+
`None` when the file doesn't exist, otherwise an instance of the
73+
`SerializableRingBuffer` class, loaded from disk.
74+
"""
75+
if not exists(path):
76+
return None
77+
78+
with open(path, mode="rb") as fileobj:
79+
instance: SerializableRingBuffer = pickle.load(fileobj)
80+
instance._path = path # pylint: disable=protected-access
81+
# Set latest file format version for next time it dumps.
82+
# pylint: disable=protected-access
83+
instance._file_format_version = FILE_FORMAT_VERSION
84+
85+
return instance
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the `SerializableRingBuffer` class."""
5+
6+
from __future__ import annotations
7+
8+
import random
9+
from datetime import datetime, timedelta
10+
from typing import Any
11+
12+
import numpy as np
13+
import pytest
14+
15+
from frequenz.sdk.timeseries import Sample
16+
from frequenz.sdk.timeseries._serializable_ringbuffer import SerializableRingBuffer
17+
18+
FIVE_MINUTES = timedelta(minutes=5)
19+
_29_DAYS = 60 * 24 * 29
20+
ONE_MINUTE = timedelta(minutes=1)
21+
22+
23+
def load_dump_test(dumped: SerializableRingBuffer[Any]) -> None:
24+
"""Test ordered ring buffer."""
25+
size = dumped.maxlen
26+
27+
random.seed(0)
28+
29+
# Fill with data so we have something to compare
30+
# Avoiding .update() because it takes very long for 40k entries
31+
for i in range(size):
32+
dumped[i] = i
33+
34+
# But use update a bit so the timestamp and gaps are initialized
35+
for i in range(0, size, 100):
36+
dumped.update(
37+
Sample(datetime.fromtimestamp(200 + i * FIVE_MINUTES.total_seconds()), i)
38+
)
39+
40+
dumped.dump()
41+
42+
# Load old data
43+
# pylint: disable=protected-access
44+
loaded = SerializableRingBuffer.load(dumped._path)
45+
assert loaded is not None
46+
47+
np.testing.assert_equal(dumped[:], loaded[:])
48+
49+
# pylint: disable=protected-access
50+
assert dumped._datetime_oldest == loaded._datetime_oldest
51+
# pylint: disable=protected-access
52+
assert dumped._datetime_newest == loaded._datetime_newest
53+
# pylint: disable=protected-access
54+
assert len(dumped._gaps) == len(loaded._gaps)
55+
# pylint: disable=protected-access
56+
assert dumped._gaps == loaded._gaps
57+
# pylint: disable=protected-access
58+
assert dumped._time_range == loaded._time_range
59+
# pylint: disable=protected-access
60+
assert dumped._sampling_period == loaded._sampling_period
61+
# pylint: disable=protected-access
62+
assert dumped._time_index_alignment == loaded._time_index_alignment
63+
64+
65+
def test_load_dump_short(tmp_path_factory: pytest.TempPathFactory) -> None:
66+
"""Short test to perform loading & dumping."""
67+
tmpdir = tmp_path_factory.mktemp("load_dump")
68+
69+
load_dump_test(
70+
SerializableRingBuffer(
71+
[0.0] * int(24 * FIVE_MINUTES.total_seconds()),
72+
FIVE_MINUTES,
73+
f"{tmpdir}/test_list.bin",
74+
datetime(2, 2, 2),
75+
)
76+
)
77+
78+
load_dump_test(
79+
SerializableRingBuffer(
80+
np.empty(shape=(24 * int(FIVE_MINUTES.total_seconds()),), dtype=np.float64),
81+
FIVE_MINUTES,
82+
f"{tmpdir}/test_array.bin",
83+
datetime(2, 2, 2),
84+
)
85+
)
86+
87+
88+
def test_load_dump(tmp_path_factory: pytest.TempPathFactory) -> None:
89+
"""Test to load/dump 29 days of 1-minute samples."""
90+
tmpdir = tmp_path_factory.mktemp("load_dump")
91+
92+
load_dump_test(
93+
SerializableRingBuffer(
94+
[0.0] * _29_DAYS,
95+
ONE_MINUTE,
96+
f"{tmpdir}/test_list_29.bin",
97+
datetime(2, 2, 2),
98+
)
99+
)
100+
101+
load_dump_test(
102+
SerializableRingBuffer(
103+
np.empty(shape=(_29_DAYS,), dtype=np.float64),
104+
ONE_MINUTE,
105+
f"{tmpdir}/test_array_29.bin",
106+
datetime(2, 2, 2),
107+
)
108+
)

0 commit comments

Comments
 (0)