diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 961e268c3..b36b51dbf 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -26,6 +26,10 @@ - NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`. - Provide access to `capacity` (maximum number of elements) in `MovingWindow`. - Methods to retrieve oldest and newest timestamp of valid samples are added to both. + - `MovingWindow` exposes underlying buffers `window` method. + - `OrderedRingBuffer.window`: + - By default returns a copy. + - Can also return a view if the window contains `None` values and if `force_copy` is set to `True`. - Now when printing `FormulaEngine` for debugging purposes the the formula will be shown in infix notation, which should be easier to read. @@ -35,4 +39,7 @@ ## Bug Fixes - +- `OrderedRingBuffer.window`: + - Fixed `force_copy` option for specific case. + - Removed buggy enforcement of copies when None values in queried window. + - Fixed behavior for start equals end case. diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index bea8cbff1..5b612f8ca 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -241,6 +241,34 @@ def capacity(self) -> int: """ return self._buffer.maxlen + def window( + self, + start: datetime, + end: datetime, + *, + force_copy: bool = True, + ) -> ArrayLike: + """ + Return an array containing the samples in the given time interval. + + Args: + start: The start of the time interval. Only datetime objects are supported. + end: The end of the time interval. Only datetime objects are supported. + force_copy: If `True`, the returned array is a copy of the underlying + data. Otherwise, if possible, a view of the underlying data is + returned. + + Returns: + An array containing the samples in the given time interval. + + Raises: + IndexError: if `start` or `end` are not datetime objects. + """ + if not isinstance(start, datetime) or not isinstance(end, datetime): + raise IndexError("Only datetime objects are supported as start and end.") + + return self._buffer.window(start, end, force_copy=force_copy) + async def _run_impl(self) -> None: """Awaits samples from the receiver and updates the underlying ring buffer. diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index e1319e1ef..5baf32f57 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -257,7 +257,7 @@ def datetime_to_index( ) def window( - self, start: datetime, end: datetime, force_copy: bool = False + self, start: datetime, end: datetime, *, force_copy: bool = True ) -> FloatArray: """Request a view on the data between start timestamp and end timestamp. @@ -266,8 +266,7 @@ def window( Will return a copy in the following cases: * The requested time period is crossing the start/end of the buffer. - * The requested time period contains missing entries. - * The force_copy parameter was set to True (default False). + * The force_copy parameter was set to True (default True). The first case can be avoided by using the appropriate `align_to` value in the constructor so that the data lines up @@ -279,7 +278,7 @@ def window( Args: start: start time of the window. end: end time of the window. - force_copy: optional, default False. If True, will always create a + force_copy: optional, default True. If True, will always create a copy of the data. Raises: @@ -293,28 +292,55 @@ def window( f"end parameter {end} has to predate start parameter {start}" ) + if start == end: + return np.array([]) if isinstance(self._buffer, np.ndarray) else [] + start_index = self.datetime_to_index(start) end_index = self.datetime_to_index(end) + return self._wrapped_buffer_window( + self._buffer, start_index, end_index, force_copy + ) + + @staticmethod + def _wrapped_buffer_window( + buffer: FloatArray, + start_pos: int, + end_pos: int, + force_copy: bool = True, + ) -> FloatArray: + """Get a wrapped window from the given buffer. + + If start_pos == end_pos, the full wrapped buffer is returned starting at start_pos. + + Copies can only be avoided for numpy arrays and when the window is not wrapped. + Lists of floats are always copies. + + Args: + buffer: The buffer to get the window from. + start_pos: The start position of the window in the buffer. + end_pos: The end position of the window in the buffer (exclusive). + force_copy: If True, will always create a copy of the data. + + Returns: + The requested window. + """ # Requested window wraps around the ends - if start_index >= end_index: - if end_index > 0: - if isinstance(self._buffer, list): - return self._buffer[start_index:] + self._buffer[0:end_index] - if isinstance(self._buffer, np.ndarray): - return np.concatenate( - (self._buffer[start_index:], self._buffer[0:end_index]) - ) - assert False, f"Unknown _buffer type: {type(self._buffer)}" - return self._buffer[start_index:] - - # Return a copy if there are none-values in the data - if force_copy or any( - map(lambda gap: gap.contains(start) or gap.contains(end), self._gaps) - ): - return deepcopy(self[start_index:end_index]) + if start_pos >= end_pos: + if isinstance(buffer, list): + return buffer[start_pos:] + buffer[0:end_pos] + assert isinstance( + buffer, np.ndarray + ), f"Unsupported buffer type: {type(buffer)}" + if end_pos > 0: + return np.concatenate((buffer[start_pos:], buffer[0:end_pos])) + arr = buffer[start_pos:] + else: + arr = buffer[start_pos:end_pos] - return self[start_index:end_index] + if force_copy: + return deepcopy(arr) + return arr def is_missing(self, timestamp: datetime) -> bool: """Check if the given timestamp falls within a gap. diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index 2b8af5027..479a03294 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -66,6 +66,18 @@ def init_moving_window( return window, lm_tx +def dt(i: int) -> datetime: # pylint: disable=invalid-name + """Create datetime objects from indices. + + Args: + i: Index to create datetime from. + + Returns: + Datetime object. + """ + return datetime.fromtimestamp(i, tz=timezone.utc) + + async def test_access_window_by_index() -> None: """Test indexing a window by integer index.""" window, sender = init_moving_window(timedelta(seconds=1)) @@ -92,7 +104,8 @@ async def test_access_window_by_int_slice() -> None: async with window: await push_logical_meter_data(sender, range(0, 5)) assert np.array_equal(window[3:5], np.array([3.0, 4.0])) - + with pytest.raises(IndexError): + window.window(3, 5) # type: ignore data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1] await push_logical_meter_data(sender, data) assert np.array_equal(window[5:14], np.array(data[5:14])) @@ -106,6 +119,15 @@ async def test_access_window_by_ts_slice() -> None: time_start = UNIX_EPOCH + timedelta(seconds=3) time_end = time_start + timedelta(seconds=2) assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore + assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0])) + assert np.array_equal(window.window(dt(3), dt(3)), np.array([])) + # Window only supports slicing with ascending indices within allowed range + with pytest.raises(IndexError): + window.window(dt(3), dt(1)) + with pytest.raises(IndexError): + window.window(dt(3), dt(6)) + with pytest.raises(IndexError): + window.window(dt(-1), dt(5)) async def test_access_empty_window() -> None: diff --git a/tests/timeseries/test_ringbuffer.py b/tests/timeseries/test_ringbuffer.py index 6be1557ef..e91d36f44 100644 --- a/tests/timeseries/test_ringbuffer.py +++ b/tests/timeseries/test_ringbuffer.py @@ -15,6 +15,7 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._quantities import Quantity from frequenz.sdk.timeseries._ringbuffer import Gap, OrderedRingBuffer +from frequenz.sdk.timeseries._ringbuffer.buffer import FloatArray FIVE_MINUTES = timedelta(minutes=5) ONE_MINUTE = timedelta(minutes=1) @@ -494,3 +495,98 @@ def test_delete_oudated_gap() -> None: buffer.update(Sample(datetime.fromtimestamp(202, tz=timezone.utc), Quantity(2))) assert len(buffer.gaps) == 0 + + +def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]: + """Get OrderedRingBuffer with data. + + Args: + data: Data to fill the buffer with. + + Returns: + OrderedRingBuffer with data. + """ + buffer = OrderedRingBuffer(data, ONE_SECOND) + for i, d in enumerate(data): # pylint: disable=invalid-name + buffer.update(Sample(dt(i), Quantity(d) if d is not None else None)) + return buffer + + +def test_window() -> None: + """Test the window function.""" + buffer = get_orb(np.array([0, None, 2, 3, 4])) + win = buffer.window(dt(0), dt(3), force_copy=False) + assert [0, np.nan, 2] == list(win) + buffer._buffer[1] = 1 # pylint: disable=protected-access + # Test whether the window is a view or a copy + assert [0, 1, 2] == list(win) + win = buffer.window(dt(0), dt(3), force_copy=False) + assert [0, 1, 2] == list(win) + # Empty array + assert 0 == buffer.window(dt(1), dt(1)).size + + buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0]) # type: ignore + assert [0, 1, 2] == buffer.window(dt(0), dt(3)) + assert [] == buffer.window(dt(0), dt(0)) + assert [] == buffer.window(dt(1), dt(1)) + + +def test_wrapped_buffer_window() -> None: + """Test the wrapped buffer window function.""" + wbw = OrderedRingBuffer._wrapped_buffer_window # pylint: disable=protected-access + + # + # Tests for list buffer + # + buffer = [0.0, 1.0, 2.0, 3.0, 4.0] + # start = end + assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 0, force_copy=False) + assert [4, 0, 1, 2, 3] == wbw(buffer, 4, 4, force_copy=False) + # start < end + assert [0] == wbw(buffer, 0, 1, force_copy=False) + assert [0, 1, 2, 3, 4] == wbw(buffer, 0, 5, force_copy=False) + # start > end, end = 0 + assert [4] == wbw(buffer, 4, 0, force_copy=False) + # start > end, end > 0 + assert [4, 0, 1] == wbw(buffer, 4, 2, force_copy=False) + + # Lists are always shallow copies + res_copy = wbw(buffer, 0, 5, force_copy=False) + assert [0, 1, 2, 3, 4] == res_copy + buffer[0] = 9 + assert [0, 1, 2, 3, 4] == res_copy + + # + # Tests for array buffer + # + buffer = np.array([0, 1, 2, 3, 4]) # type: ignore + # start = end + assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 0, force_copy=False)) + assert [4, 0, 1, 2, 3] == list(wbw(buffer, 4, 4, force_copy=False)) + # start < end + assert [0] == list(wbw(buffer, 0, 1, force_copy=False)) + assert [0, 1, 2, 3, 4] == list(wbw(buffer, 0, 5, force_copy=False)) + # start > end, end = 0 + assert [4] == list(wbw(buffer, 4, 0, force_copy=False)) + # start > end, end > 0 + assert [4, 0, 1] == list(wbw(buffer, 4, 2, force_copy=False)) + + # Get a view and a copy before modifying the buffer + res1_view = wbw(buffer, 3, 5, force_copy=False) + res1_copy = wbw(buffer, 3, 5, force_copy=True) + res2_view = wbw(buffer, 3, 0, force_copy=False) + res2_copy = wbw(buffer, 3, 0, force_copy=True) + res3_copy = wbw(buffer, 4, 1, force_copy=False) + assert [3, 4] == list(res1_view) + assert [3, 4] == list(res1_copy) + assert [3, 4] == list(res2_view) + assert [3, 4] == list(res2_copy) + assert [4, 0] == list(res3_copy) + + # Modify the buffer and check that only the view is updated + buffer[4] = 9 + assert [3, 9] == list(res1_view) + assert [3, 4] == list(res1_copy) + assert [3, 9] == list(res2_view) + assert [3, 4] == list(res2_copy) + assert [4, 0] == list(res3_copy)