Skip to content

Commit bf1b22c

Browse files
Fixes on copy behavior in ring buffer window method (#638)
This PR introduces several copy-related updates to the ring buffer window functionality Changes: * Default Copy Behavior: Changed the default behavior to copy data in the ring buffer window. This is to prioritize data integrity over performance, which is a minor concern for most expected use-cases. Made `force_copy` as a keyword argument. * Moved the logic for extracting wrapped buffers into a separate method to fix a bug and allow for better testing and future maintainability. * Handling None Values: Fixed an issue where None values were being forcibly copied in the ring buffer window. The decision to enforce a copy is now left to the user, even when the data contains None values. * Test Coverage: Added tests for the window method in the ring buffer, including tests for expected copy behavior on missing values. * Fix to return an empty array in case start = end in ring buffer window. * Expose restricted ring buffer window by moving window. Part of #214
2 parents c32d84c + 4d8de2a commit bf1b22c

File tree

5 files changed

+202
-23
lines changed

5 files changed

+202
-23
lines changed

RELEASE_NOTES.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
2727
- Provide access to `capacity` (maximum number of elements) in `MovingWindow`.
2828
- Methods to retrieve oldest and newest timestamp of valid samples are added to both.
29+
- `MovingWindow` exposes underlying buffers `window` method.
30+
- `OrderedRingBuffer.window`:
31+
- By default returns a copy.
32+
- Can also return a view if the window contains `None` values and if `force_copy` is set to `True`.
2933

3034
- Now when printing `FormulaEngine` for debugging purposes the the formula will be shown in infix notation, which should be easier to read.
3135

@@ -35,4 +39,7 @@
3539

3640
## Bug Fixes
3741

38-
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
42+
- `OrderedRingBuffer.window`:
43+
- Fixed `force_copy` option for specific case.
44+
- Removed buggy enforcement of copies when None values in queried window.
45+
- Fixed behavior for start equals end case.

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,34 @@ def capacity(self) -> int:
241241
"""
242242
return self._buffer.maxlen
243243

244+
def window(
245+
self,
246+
start: datetime,
247+
end: datetime,
248+
*,
249+
force_copy: bool = True,
250+
) -> ArrayLike:
251+
"""
252+
Return an array containing the samples in the given time interval.
253+
254+
Args:
255+
start: The start of the time interval. Only datetime objects are supported.
256+
end: The end of the time interval. Only datetime objects are supported.
257+
force_copy: If `True`, the returned array is a copy of the underlying
258+
data. Otherwise, if possible, a view of the underlying data is
259+
returned.
260+
261+
Returns:
262+
An array containing the samples in the given time interval.
263+
264+
Raises:
265+
IndexError: if `start` or `end` are not datetime objects.
266+
"""
267+
if not isinstance(start, datetime) or not isinstance(end, datetime):
268+
raise IndexError("Only datetime objects are supported as start and end.")
269+
270+
return self._buffer.window(start, end, force_copy=force_copy)
271+
244272
async def _run_impl(self) -> None:
245273
"""Awaits samples from the receiver and updates the underlying ring buffer.
246274

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def datetime_to_index(
257257
)
258258

259259
def window(
260-
self, start: datetime, end: datetime, force_copy: bool = False
260+
self, start: datetime, end: datetime, *, force_copy: bool = True
261261
) -> FloatArray:
262262
"""Request a view on the data between start timestamp and end timestamp.
263263
@@ -266,8 +266,7 @@ def window(
266266
267267
Will return a copy in the following cases:
268268
* The requested time period is crossing the start/end of the buffer.
269-
* The requested time period contains missing entries.
270-
* The force_copy parameter was set to True (default False).
269+
* The force_copy parameter was set to True (default True).
271270
272271
The first case can be avoided by using the appropriate
273272
`align_to` value in the constructor so that the data lines up
@@ -279,7 +278,7 @@ def window(
279278
Args:
280279
start: start time of the window.
281280
end: end time of the window.
282-
force_copy: optional, default False. If True, will always create a
281+
force_copy: optional, default True. If True, will always create a
283282
copy of the data.
284283
285284
Raises:
@@ -293,28 +292,55 @@ def window(
293292
f"end parameter {end} has to predate start parameter {start}"
294293
)
295294

295+
if start == end:
296+
return np.array([]) if isinstance(self._buffer, np.ndarray) else []
297+
296298
start_index = self.datetime_to_index(start)
297299
end_index = self.datetime_to_index(end)
298300

301+
return self._wrapped_buffer_window(
302+
self._buffer, start_index, end_index, force_copy
303+
)
304+
305+
@staticmethod
306+
def _wrapped_buffer_window(
307+
buffer: FloatArray,
308+
start_pos: int,
309+
end_pos: int,
310+
force_copy: bool = True,
311+
) -> FloatArray:
312+
"""Get a wrapped window from the given buffer.
313+
314+
If start_pos == end_pos, the full wrapped buffer is returned starting at start_pos.
315+
316+
Copies can only be avoided for numpy arrays and when the window is not wrapped.
317+
Lists of floats are always copies.
318+
319+
Args:
320+
buffer: The buffer to get the window from.
321+
start_pos: The start position of the window in the buffer.
322+
end_pos: The end position of the window in the buffer (exclusive).
323+
force_copy: If True, will always create a copy of the data.
324+
325+
Returns:
326+
The requested window.
327+
"""
299328
# Requested window wraps around the ends
300-
if start_index >= end_index:
301-
if end_index > 0:
302-
if isinstance(self._buffer, list):
303-
return self._buffer[start_index:] + self._buffer[0:end_index]
304-
if isinstance(self._buffer, np.ndarray):
305-
return np.concatenate(
306-
(self._buffer[start_index:], self._buffer[0:end_index])
307-
)
308-
assert False, f"Unknown _buffer type: {type(self._buffer)}"
309-
return self._buffer[start_index:]
310-
311-
# Return a copy if there are none-values in the data
312-
if force_copy or any(
313-
map(lambda gap: gap.contains(start) or gap.contains(end), self._gaps)
314-
):
315-
return deepcopy(self[start_index:end_index])
329+
if start_pos >= end_pos:
330+
if isinstance(buffer, list):
331+
return buffer[start_pos:] + buffer[0:end_pos]
332+
assert isinstance(
333+
buffer, np.ndarray
334+
), f"Unsupported buffer type: {type(buffer)}"
335+
if end_pos > 0:
336+
return np.concatenate((buffer[start_pos:], buffer[0:end_pos]))
337+
arr = buffer[start_pos:]
338+
else:
339+
arr = buffer[start_pos:end_pos]
316340

317-
return self[start_index:end_index]
341+
if force_copy:
342+
return deepcopy(arr)
343+
return arr
318344

319345
def is_missing(self, timestamp: datetime) -> bool:
320346
"""Check if the given timestamp falls within a gap.

tests/timeseries/test_moving_window.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ def init_moving_window(
6666
return window, lm_tx
6767

6868

69+
def dt(i: int) -> datetime: # pylint: disable=invalid-name
70+
"""Create datetime objects from indices.
71+
72+
Args:
73+
i: Index to create datetime from.
74+
75+
Returns:
76+
Datetime object.
77+
"""
78+
return datetime.fromtimestamp(i, tz=timezone.utc)
79+
80+
6981
async def test_access_window_by_index() -> None:
7082
"""Test indexing a window by integer index."""
7183
window, sender = init_moving_window(timedelta(seconds=1))
@@ -92,7 +104,8 @@ async def test_access_window_by_int_slice() -> None:
92104
async with window:
93105
await push_logical_meter_data(sender, range(0, 5))
94106
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))
95-
107+
with pytest.raises(IndexError):
108+
window.window(3, 5) # type: ignore
96109
data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1]
97110
await push_logical_meter_data(sender, data)
98111
assert np.array_equal(window[5:14], np.array(data[5:14]))
@@ -106,6 +119,15 @@ async def test_access_window_by_ts_slice() -> None:
106119
time_start = UNIX_EPOCH + timedelta(seconds=3)
107120
time_end = time_start + timedelta(seconds=2)
108121
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore
122+
assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0]))
123+
assert np.array_equal(window.window(dt(3), dt(3)), np.array([]))
124+
# Window only supports slicing with ascending indices within allowed range
125+
with pytest.raises(IndexError):
126+
window.window(dt(3), dt(1))
127+
with pytest.raises(IndexError):
128+
window.window(dt(3), dt(6))
129+
with pytest.raises(IndexError):
130+
window.window(dt(-1), dt(5))
109131

110132

111133
async def test_access_empty_window() -> None:

tests/timeseries/test_ringbuffer.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from frequenz.sdk.timeseries import Sample
1616
from frequenz.sdk.timeseries._quantities import Quantity
1717
from frequenz.sdk.timeseries._ringbuffer import Gap, OrderedRingBuffer
18+
from frequenz.sdk.timeseries._ringbuffer.buffer import FloatArray
1819

1920
FIVE_MINUTES = timedelta(minutes=5)
2021
ONE_MINUTE = timedelta(minutes=1)
@@ -494,3 +495,98 @@ def test_delete_oudated_gap() -> None:
494495
buffer.update(Sample(datetime.fromtimestamp(202, tz=timezone.utc), Quantity(2)))
495496

496497
assert len(buffer.gaps) == 0
498+
499+
500+
def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]:
501+
"""Get OrderedRingBuffer with data.
502+
503+
Args:
504+
data: Data to fill the buffer with.
505+
506+
Returns:
507+
OrderedRingBuffer with data.
508+
"""
509+
buffer = OrderedRingBuffer(data, ONE_SECOND)
510+
for i, d in enumerate(data): # pylint: disable=invalid-name
511+
buffer.update(Sample(dt(i), Quantity(d) if d is not None else None))
512+
return buffer
513+
514+
515+
def test_window() -> None:
516+
"""Test the window function."""
517+
buffer = get_orb(np.array([0, None, 2, 3, 4]))
518+
win = buffer.window(dt(0), dt(3), force_copy=False)
519+
assert [0, np.nan, 2] == list(win)
520+
buffer._buffer[1] = 1 # pylint: disable=protected-access
521+
# Test whether the window is a view or a copy
522+
assert [0, 1, 2] == list(win)
523+
win = buffer.window(dt(0), dt(3), force_copy=False)
524+
assert [0, 1, 2] == list(win)
525+
# Empty array
526+
assert 0 == buffer.window(dt(1), dt(1)).size
527+
528+
buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0]) # type: ignore
529+
assert [0, 1, 2] == buffer.window(dt(0), dt(3))
530+
assert [] == buffer.window(dt(0), dt(0))
531+
assert [] == buffer.window(dt(1), dt(1))
532+
533+
534+
def test_wrapped_buffer_window() -> None:
535+
"""Test the wrapped buffer window function."""
536+
wbw = OrderedRingBuffer._wrapped_buffer_window # pylint: disable=protected-access
537+
538+
#
539+
# Tests for list buffer
540+
#
541+
buffer = [0.0, 1.0, 2.0, 3.0, 4.0]
542+
# start = end
543+
assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 0, force_copy=False)
544+
assert [4, 0, 1, 2, 3] == wbw(buffer, 4, 4, force_copy=False)
545+
# start < end
546+
assert [0] == wbw(buffer, 0, 1, force_copy=False)
547+
assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 5, force_copy=False)
548+
# start > end, end = 0
549+
assert [4] == wbw(buffer, 4, 0, force_copy=False)
550+
# start > end, end > 0
551+
assert [4, 0, 1] == wbw(buffer, 4, 2, force_copy=False)
552+
553+
# Lists are always shallow copies
554+
res_copy = wbw(buffer, 0, 5, force_copy=False)
555+
assert [0, 1, 2, 3, 4] == res_copy
556+
buffer[0] = 9
557+
assert [0, 1, 2, 3, 4] == res_copy
558+
559+
#
560+
# Tests for array buffer
561+
#
562+
buffer = np.array([0, 1, 2, 3, 4]) # type: ignore
563+
# start = end
564+
assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 0, force_copy=False))
565+
assert [4, 0, 1, 2, 3] == list(wbw(buffer, 4, 4, force_copy=False))
566+
# start < end
567+
assert [0] == list(wbw(buffer, 0, 1, force_copy=False))
568+
assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 5, force_copy=False))
569+
# start > end, end = 0
570+
assert [4] == list(wbw(buffer, 4, 0, force_copy=False))
571+
# start > end, end > 0
572+
assert [4, 0, 1] == list(wbw(buffer, 4, 2, force_copy=False))
573+
574+
# Get a view and a copy before modifying the buffer
575+
res1_view = wbw(buffer, 3, 5, force_copy=False)
576+
res1_copy = wbw(buffer, 3, 5, force_copy=True)
577+
res2_view = wbw(buffer, 3, 0, force_copy=False)
578+
res2_copy = wbw(buffer, 3, 0, force_copy=True)
579+
res3_copy = wbw(buffer, 4, 1, force_copy=False)
580+
assert [3, 4] == list(res1_view)
581+
assert [3, 4] == list(res1_copy)
582+
assert [3, 4] == list(res2_view)
583+
assert [3, 4] == list(res2_copy)
584+
assert [4, 0] == list(res3_copy)
585+
586+
# Modify the buffer and check that only the view is updated
587+
buffer[4] = 9
588+
assert [3, 9] == list(res1_view)
589+
assert [3, 4] == list(res1_copy)
590+
assert [3, 9] == list(res2_view)
591+
assert [3, 4] == list(res2_copy)
592+
assert [4, 0] == list(res3_copy)

0 commit comments

Comments
 (0)