Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
- Provide access to `capacity` (maximum number of elements) in `MovingWindow`.
- Methods to retrieve oldest and newest timestamp of valid samples are added to both.
- `MovingWindow` exposes underlying buffers `window` method.
- `OrderedRingBuffer.window`:
- By default returns a copy.
- Can also return a view if the window contains `None` values and if `force_copy` is set to `True`.

- Now when printing `FormulaEngine` for debugging purposes the the formula will be shown in infix notation, which should be easier to read.

Expand All @@ -35,4 +39,7 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- `OrderedRingBuffer.window`:
- Fixed `force_copy` option for specific case.
- Removed buggy enforcement of copies when None values in queried window.
- Fixed behavior for start equals end case.
28 changes: 28 additions & 0 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,34 @@ def capacity(self) -> int:
"""
return self._buffer.maxlen

def window(
self,
start: datetime,
end: datetime,
*,
force_copy: bool = True,
) -> ArrayLike:
"""
Return an array containing the samples in the given time interval.

Args:
start: The start of the time interval. Only datetime objects are supported.
end: The end of the time interval. Only datetime objects are supported.
force_copy: If `True`, the returned array is a copy of the underlying
data. Otherwise, if possible, a view of the underlying data is
returned.

Returns:
An array containing the samples in the given time interval.

Raises:
IndexError: if `start` or `end` are not datetime objects.
"""
if not isinstance(start, datetime) or not isinstance(end, datetime):
raise IndexError("Only datetime objects are supported as start and end.")

return self._buffer.window(start, end, force_copy=force_copy)

async def _run_impl(self) -> None:
"""Awaits samples from the receiver and updates the underlying ring buffer.

Expand Down
68 changes: 47 additions & 21 deletions src/frequenz/sdk/timeseries/_ringbuffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def datetime_to_index(
)

def window(
self, start: datetime, end: datetime, force_copy: bool = False
self, start: datetime, end: datetime, *, force_copy: bool = True
) -> FloatArray:
"""Request a view on the data between start timestamp and end timestamp.

Expand All @@ -266,8 +266,7 @@ def window(

Will return a copy in the following cases:
* The requested time period is crossing the start/end of the buffer.
* The requested time period contains missing entries.
* The force_copy parameter was set to True (default False).
* The force_copy parameter was set to True (default True).

The first case can be avoided by using the appropriate
`align_to` value in the constructor so that the data lines up
Expand All @@ -279,7 +278,7 @@ def window(
Args:
start: start time of the window.
end: end time of the window.
force_copy: optional, default False. If True, will always create a
force_copy: optional, default True. If True, will always create a
copy of the data.

Raises:
Expand All @@ -293,28 +292,55 @@ def window(
f"end parameter {end} has to predate start parameter {start}"
)

if start == end:
return np.array([]) if isinstance(self._buffer, np.ndarray) else []

start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)

return self._wrapped_buffer_window(
self._buffer, start_index, end_index, force_copy
)

@staticmethod
def _wrapped_buffer_window(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be a static method? I'd suggest to make it a class method as long it's not used outside the class context.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you think so, or what is the advantage of a classmethod? I made it static since it's an isolated algorithmic piece which does not need any internal state of the class. Only the buffer argument would change here.

The main motivation was testing purpose (actually contained multiple bugs before), i.e. separate wrapping algorithm from index logic (which will become more tricky in follow-ups). But it can also easily be stripped out when needed elsewhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example for outside usage I could imagine is the periodic feature extractor, which I assume is using similar code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. It doesn't depend on the internal state. Let's keep it as is.

buffer: FloatArray,
start_pos: int,
end_pos: int,
force_copy: bool = True,
) -> FloatArray:
"""Get a wrapped window from the given buffer.

If start_pos == end_pos, the full wrapped buffer is returned starting at start_pos.

Copies can only be avoided for numpy arrays and when the window is not wrapped.
Lists of floats are always copies.

Args:
buffer: The buffer to get the window from.
start_pos: The start position of the window in the buffer.
end_pos: The end position of the window in the buffer (exclusive).
force_copy: If True, will always create a copy of the data.

Returns:
The requested window.
"""
# Requested window wraps around the ends
if start_index >= end_index:
if end_index > 0:
if isinstance(self._buffer, list):
return self._buffer[start_index:] + self._buffer[0:end_index]
if isinstance(self._buffer, np.ndarray):
return np.concatenate(
(self._buffer[start_index:], self._buffer[0:end_index])
)
assert False, f"Unknown _buffer type: {type(self._buffer)}"
return self._buffer[start_index:]

# Return a copy if there are none-values in the data
if force_copy or any(
map(lambda gap: gap.contains(start) or gap.contains(end), self._gaps)
):
return deepcopy(self[start_index:end_index])
if start_pos >= end_pos:
if isinstance(buffer, list):
return buffer[start_pos:] + buffer[0:end_pos]
assert isinstance(
buffer, np.ndarray
), f"Unsupported buffer type: {type(buffer)}"
if end_pos > 0:
return np.concatenate((buffer[start_pos:], buffer[0:end_pos]))
arr = buffer[start_pos:]
else:
arr = buffer[start_pos:end_pos]

return self[start_index:end_index]
if force_copy:
return deepcopy(arr)
return arr

def is_missing(self, timestamp: datetime) -> bool:
"""Check if the given timestamp falls within a gap.
Expand Down
24 changes: 23 additions & 1 deletion tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ def init_moving_window(
return window, lm_tx


def dt(i: int) -> datetime: # pylint: disable=invalid-name
"""Create datetime objects from indices.

Args:
i: Index to create datetime from.

Returns:
Datetime object.
"""
return datetime.fromtimestamp(i, tz=timezone.utc)


async def test_access_window_by_index() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if that's the correct function to add the tests. This was supposed to access the window by integer index, but the tests are accessing by datetime object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added datetime slice tests were added in test_access_window_by_ts_slice. Then a single failure test to for int slices in test_access_window_by_int_slice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed that. All good then ;).

"""Test indexing a window by integer index."""
window, sender = init_moving_window(timedelta(seconds=1))
Expand All @@ -92,7 +104,8 @@ async def test_access_window_by_int_slice() -> None:
async with window:
await push_logical_meter_data(sender, range(0, 5))
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))

with pytest.raises(IndexError):
window.window(3, 5) # type: ignore
data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1]
await push_logical_meter_data(sender, data)
assert np.array_equal(window[5:14], np.array(data[5:14]))
Expand All @@ -106,6 +119,15 @@ async def test_access_window_by_ts_slice() -> None:
time_start = UNIX_EPOCH + timedelta(seconds=3)
time_end = time_start + timedelta(seconds=2)
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore
assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0]))
assert np.array_equal(window.window(dt(3), dt(3)), np.array([]))
# Window only supports slicing with ascending indices within allowed range
with pytest.raises(IndexError):
window.window(dt(3), dt(1))
with pytest.raises(IndexError):
window.window(dt(3), dt(6))
with pytest.raises(IndexError):
window.window(dt(-1), dt(5))


async def test_access_empty_window() -> None:
Expand Down
96 changes: 96 additions & 0 deletions tests/timeseries/test_ringbuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._quantities import Quantity
from frequenz.sdk.timeseries._ringbuffer import Gap, OrderedRingBuffer
from frequenz.sdk.timeseries._ringbuffer.buffer import FloatArray

FIVE_MINUTES = timedelta(minutes=5)
ONE_MINUTE = timedelta(minutes=1)
Expand Down Expand Up @@ -494,3 +495,98 @@ def test_delete_oudated_gap() -> None:
buffer.update(Sample(datetime.fromtimestamp(202, tz=timezone.utc), Quantity(2)))

assert len(buffer.gaps) == 0


def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]:
"""Get OrderedRingBuffer with data.

Args:
data: Data to fill the buffer with.

Returns:
OrderedRingBuffer with data.
"""
buffer = OrderedRingBuffer(data, ONE_SECOND)
for i, d in enumerate(data): # pylint: disable=invalid-name
buffer.update(Sample(dt(i), Quantity(d) if d is not None else None))
return buffer


def test_window() -> None:
"""Test the window function."""
buffer = get_orb(np.array([0, None, 2, 3, 4]))
win = buffer.window(dt(0), dt(3), force_copy=False)
assert [0, np.nan, 2] == list(win)
buffer._buffer[1] = 1 # pylint: disable=protected-access
# Test whether the window is a view or a copy
assert [0, 1, 2] == list(win)
win = buffer.window(dt(0), dt(3), force_copy=False)
assert [0, 1, 2] == list(win)
# Empty array
assert 0 == buffer.window(dt(1), dt(1)).size

buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0]) # type: ignore
assert [0, 1, 2] == buffer.window(dt(0), dt(3))
assert [] == buffer.window(dt(0), dt(0))
assert [] == buffer.window(dt(1), dt(1))


def test_wrapped_buffer_window() -> None:
"""Test the wrapped buffer window function."""
wbw = OrderedRingBuffer._wrapped_buffer_window # pylint: disable=protected-access

#
# Tests for list buffer
#
buffer = [0.0, 1.0, 2.0, 3.0, 4.0]
# start = end
assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 0, force_copy=False)
assert [4, 0, 1, 2, 3] == wbw(buffer, 4, 4, force_copy=False)
# start < end
assert [0] == wbw(buffer, 0, 1, force_copy=False)
assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 5, force_copy=False)
# start > end, end = 0
assert [4] == wbw(buffer, 4, 0, force_copy=False)
# start > end, end > 0
assert [4, 0, 1] == wbw(buffer, 4, 2, force_copy=False)

# Lists are always shallow copies
res_copy = wbw(buffer, 0, 5, force_copy=False)
assert [0, 1, 2, 3, 4] == res_copy
buffer[0] = 9
assert [0, 1, 2, 3, 4] == res_copy

#
# Tests for array buffer
#
buffer = np.array([0, 1, 2, 3, 4]) # type: ignore
# start = end
assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 0, force_copy=False))
assert [4, 0, 1, 2, 3] == list(wbw(buffer, 4, 4, force_copy=False))
# start < end
assert [0] == list(wbw(buffer, 0, 1, force_copy=False))
assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 5, force_copy=False))
# start > end, end = 0
assert [4] == list(wbw(buffer, 4, 0, force_copy=False))
# start > end, end > 0
assert [4, 0, 1] == list(wbw(buffer, 4, 2, force_copy=False))

# Get a view and a copy before modifying the buffer
res1_view = wbw(buffer, 3, 5, force_copy=False)
res1_copy = wbw(buffer, 3, 5, force_copy=True)
res2_view = wbw(buffer, 3, 0, force_copy=False)
res2_copy = wbw(buffer, 3, 0, force_copy=True)
res3_copy = wbw(buffer, 4, 1, force_copy=False)
assert [3, 4] == list(res1_view)
assert [3, 4] == list(res1_copy)
assert [3, 4] == list(res2_view)
assert [3, 4] == list(res2_copy)
assert [4, 0] == list(res3_copy)

# Modify the buffer and check that only the view is updated
buffer[4] = 9
assert [3, 9] == list(res1_view)
assert [3, 4] == list(res1_copy)
assert [3, 9] == list(res2_view)
assert [3, 4] == list(res2_copy)
assert [4, 0] == list(res3_copy)