diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 461b25d64..98462d3cf 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -13,8 +13,16 @@ - A tutorial section and a getting started tutorial - In `OrderedRingBuffer`: - Rename `datetime_to_index` to `to_internal_index` to avoid confusion between the internal index and the external index. + - Add `index_to_datetime` method to convert external index to corresponding datetime. - Remove `__setitem__` method to enforce usage of dedicated `update` method only. +- In `OrderedRingBuffer` and `MovingWindow`: + - Support for integer indices is added. + - Add `count_covered` method to count the number of elements covered by the used time range. + + + ## Bug Fixes - Fix rendering of diagrams in the documentation. +- The `__getitem__` magic of the `MovingWindow` is fixed to support the same functionality that the `window` method provides. diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index a72441920..eefe5d5a3 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -243,8 +243,8 @@ def capacity(self) -> int: def window( self, - start: datetime, - end: datetime, + start: datetime | int | None, + end: datetime | int | None, *, force_copy: bool = True, ) -> ArrayLike: @@ -252,21 +252,17 @@ def window( Return an array containing the samples in the given time interval. Args: - start: The start of the time interval. Only datetime objects are supported. - end: The end of the time interval. Only datetime objects are supported. + start: The start of the time interval. If `None`, the start of the + window is used. + end: The end of the time interval. If `None`, the end of the window + is used. force_copy: If `True`, the returned array is a copy of the underlying data. Otherwise, if possible, a view of the underlying data is returned. Returns: An array containing the samples in the given time interval. - - Raises: - IndexError: if `start` or `end` are not datetime objects. """ - if not isinstance(start, datetime) or not isinstance(end, datetime): - raise IndexError("Only datetime objects are supported as start and end.") - return self._buffer.window(start, end, force_copy=force_copy) async def _run_impl(self) -> None: @@ -315,6 +311,15 @@ def count_valid(self) -> int: """ return self._buffer.count_valid() + def count_covered(self) -> int: + """Count the number of samples that are covered by the oldest and newest valid samples. + + Returns: + The count of samples between the oldest and newest (inclusive) valid samples + or 0 if there are is no time range covered. + """ + return self._buffer.count_covered() + @overload def __getitem__(self, key: SupportsIndex) -> float: """See the main __getitem__ method. @@ -362,30 +367,18 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik A float if the key is a number or a timestamp. an numpy array if the key is a slice. """ + if isinstance(key, slice): + if not (key.step is None or key.step == 1): + raise ValueError("Slicing with a step other than 1 is not supported.") + return self.window(key.start, key.stop) + if self._buffer.count_valid() == 0: raise IndexError("The buffer is empty.") - if isinstance(key, slice): - if isinstance(key.start, int) or isinstance(key.stop, int): - if key.start is None or key.stop is None: - key = slice(slice(key.start, key.stop).indices(self.count_valid())) - elif isinstance(key.start, datetime) or isinstance(key.stop, datetime): - if key.start is None: - key = slice(self._buffer.time_bound_oldest, key.stop) - if key.stop is None: - key = slice(key.start, self._buffer.time_bound_newest) - - _logger.debug("Returning slice for [%s:%s].", key.start, key.stop) - - # we are doing runtime typechecks since there is no abstract slice type yet - # see also (https://peps.python.org/pep-0696) - if isinstance(key.start, datetime) and isinstance(key.stop, datetime): - return self._buffer.window(key.start, key.stop) - if isinstance(key.start, int) and isinstance(key.stop, int): - return self._buffer[key] - elif isinstance(key, datetime): + + if isinstance(key, datetime): _logger.debug("Returning value at time %s ", key) return self._buffer[self._buffer.to_internal_index(key)] - elif isinstance(key, SupportsIndex): + if isinstance(key, SupportsIndex): _logger.debug("Returning value at index %s ", key) return self._buffer[key] diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index 1d0c64e12..a71cead51 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -257,8 +257,53 @@ def to_internal_index( ) ) + def get_timestamp(self, index: int) -> datetime | None: + """Convert the given index to the underlying timestamp. + + Index 0 corresponds to the oldest timestamp in the buffer. + If negative indices are used, the newest timestamp is used as reference. + + !!!warning + + The resulting timestamp can be outside the range of the buffer. + + Args: + index: Index to convert. + + Returns: + Datetime index where the value for the given index can be found. + Or None if the buffer is empty. + """ + if self.oldest_timestamp is None: + return None + assert self.newest_timestamp is not None + ref_ts = ( + self.oldest_timestamp + if index >= 0 + else self.newest_timestamp + self._sampling_period + ) + return ref_ts + index * self._sampling_period + + def _to_covered_indices( + self, start: int | None, end: int | None = None + ) -> tuple[int, int]: + """Project the given indices via slice onto the covered range. + + Args: + start: Start index. + end: End index. Optional, defaults to None. + + Returns: + tuple of start and end indices on the range currently covered by the buffer. + """ + return slice(start, end).indices(self.count_covered())[:2] + def window( - self, start: datetime, end: datetime, *, force_copy: bool = True + self, + start: datetime | int | None, + end: datetime | int | None, + *, + force_copy: bool = True, ) -> FloatArray: """Request a copy or view on the data between start timestamp and end timestamp. @@ -283,17 +328,32 @@ def window( copy of the data. Raises: - IndexError: When requesting a window with invalid timestamps. + IndexError: When start and end are not both datetime or index. Returns: The requested window """ - if start > end: + if self.count_covered() == 0: + return np.array([]) if isinstance(self._buffer, np.ndarray) else [] + + # If both are indices or None convert to datetime + if not isinstance(start, datetime) and not isinstance(end, datetime): + start, end = self._to_covered_indices(start, end) + start = self.get_timestamp(start) + end = self.get_timestamp(end) + + # Here we should have both as datetime + if not isinstance(start, datetime) or not isinstance(end, datetime): raise IndexError( - f"end parameter {end} has to predate start parameter {start}" + f"start ({start}) and end ({end}) must both be either datetime or index." ) - if start == end: + # Ensure that the window is within the bounds of the buffer + assert self.oldest_timestamp is not None and self.newest_timestamp is not None + start = max(start, self.oldest_timestamp) + end = min(end, self.newest_timestamp + self._sampling_period) + + if start >= end: return np.array([]) if isinstance(self._buffer, np.ndarray) else [] start_pos = self.to_internal_index(start) @@ -539,6 +599,33 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr """ return self._buffer.__getitem__(index_or_slice) + def _covered_time_range(self) -> timedelta: + """Return the time range that is covered by the oldest and newest valid samples. + + Returns: + The time range between the oldest and newest valid samples or 0 if + there are is no time range covered. + """ + if not self.oldest_timestamp: + return timedelta(0) + + assert ( + self.newest_timestamp is not None + ), "Newest timestamp cannot be None here." + return self.newest_timestamp - self.oldest_timestamp + self._sampling_period + + def count_covered(self) -> int: + """Count the number of samples that are covered by the oldest and newest valid samples. + + Returns: + The count of samples between the oldest and newest (inclusive) valid samples + or 0 if there are is no time range covered. + """ + return int( + self._covered_time_range().total_seconds() + // self._sampling_period.total_seconds() + ) + def count_valid(self) -> int: """Count the number of valid items that this buffer currently holds. diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index 2f309ceda..2a79e05b4 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -104,11 +104,61 @@ async def test_access_window_by_int_slice() -> None: async with window: await push_logical_meter_data(sender, range(0, 5)) assert np.array_equal(window[3:5], np.array([3.0, 4.0])) - with pytest.raises(IndexError): - window.window(3, 5) # type: ignore + assert np.array_equal(window.window(3, 5), np.array([3.0, 4.0])) + data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1] await push_logical_meter_data(sender, data) assert np.array_equal(window[5:14], np.array(data[5:14])) + assert np.array_equal(window.window(5, 14), np.array(data[5:14])) + + # Test with step size (other than 1 not supported) + assert np.array_equal(window[5:14:1], np.array(data[5:14])) + assert np.array_equal(window[5:14:None], np.array(data[5:14])) + with pytest.raises(ValueError): + _ = window[5:14:2] + with pytest.raises(ValueError): + _ = window[14:5:-1] + + window, sender = init_moving_window(timedelta(seconds=5)) + + def test_eq(expected: list[float], start: int | None, end: int | None) -> None: + assert np.allclose( + window.window(start, end), np.array(expected), equal_nan=True + ) + assert np.allclose(window[start:end], np.array(expected), equal_nan=True) + + async with window: + test_eq([], 0, 1) + + # Incomplete window + await push_logical_meter_data(sender, [0.0, 1.0]) + test_eq([0.0, 1.0], 0, 2) + test_eq([0.0, 1.0], 0, 9) + test_eq([0.0, 1.0], 0, None) + test_eq([0.0, 1.0], -9, None) + test_eq([0.0, 1.0], None, None) + test_eq([0.0], -2, -1) + test_eq([1.0], -1, None) + + # Incomplete window with gap + await push_logical_meter_data( + sender, [3.0], start_ts=UNIX_EPOCH + timedelta(seconds=3) + ) + test_eq([0.0, 1.0], 0, 2) + # gap fill not supported yet: + # test_eq([0.0, 1.0, np.nan, 3.0], 0, None) + # test_eq([0.0, 1.0, np.nan, 3.0], -9, None) + # test_eq([np.nan, 3.0], -2, None) + + # Complete window + await push_logical_meter_data(sender, [0.0, 1.0, 2.0, 3.0, 4.0]) + test_eq([0.0, 1.0], 0, 2) + test_eq([3.0, 4.0], -2, None) + + # Complete window with nan + await push_logical_meter_data(sender, [0.0, 1.0, np.nan]) + test_eq([0.0, 1.0, np.nan], 0, 3) + test_eq([np.nan, 3.0, 4.0], -3, None) async def test_access_window_by_ts_slice() -> None: @@ -121,13 +171,10 @@ async def test_access_window_by_ts_slice() -> None: assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0])) assert np.array_equal(window.window(dt(3), dt(3)), np.array([])) - # Window only supports slicing with ascending indices within allowed range - with pytest.raises(IndexError): - window.window(dt(3), dt(1)) - with pytest.raises(IndexError): - window.window(dt(3), dt(6)) - with pytest.raises(IndexError): - window.window(dt(-1), dt(5)) + # Window also supports slicing with indices outside allowed range + assert np.array_equal(window.window(dt(3), dt(1)), np.array([])) + assert np.array_equal(window.window(dt(3), dt(6)), np.array([3, 4])) + assert np.array_equal(window.window(dt(-1), dt(5)), np.array([0, 1, 2, 3, 4])) async def test_access_empty_window() -> None: @@ -144,12 +191,15 @@ async def test_window_size() -> None: async with window: assert window.capacity == 5, "Wrong window capacity" assert window.count_valid() == 0, "Window should be empty" + assert window.count_covered() == 0, "Window should be empty" await push_logical_meter_data(sender, range(0, 2)) assert window.capacity == 5, "Wrong window capacity" assert window.count_valid() == 2, "Window should be partially full" + assert window.count_covered() == 2, "Window should be partially full" await push_logical_meter_data(sender, range(2, 20)) assert window.capacity == 5, "Wrong window capacity" assert window.count_valid() == 5, "Window should be full" + assert window.count_covered() == 5, "Window should be full" # pylint: disable=redefined-outer-name diff --git a/tests/timeseries/test_ringbuffer.py b/tests/timeseries/test_ringbuffer.py index c50a4389c..1739256ba 100644 --- a/tests/timeseries/test_ringbuffer.py +++ b/tests/timeseries/test_ringbuffer.py @@ -137,12 +137,11 @@ def test_timestamp_ringbuffer_gaps( Sample(datetime.fromtimestamp(500 + size, tz=timezone.utc), Quantity(9999)) ) - # Expect exception for the same window - with pytest.raises(IndexError): - buffer.window( - datetime.fromtimestamp(200, tz=timezone.utc), - datetime.fromtimestamp(202, tz=timezone.utc), - ) + # Allow still to request old (empty) window + buffer.window( + datetime.fromtimestamp(200, tz=timezone.utc), + datetime.fromtimestamp(202, tz=timezone.utc), + ) # Receive new window without exception buffer.window( @@ -210,18 +209,21 @@ def test_gaps() -> None: # pylint: disable=too-many-statements assert buffer.oldest_timestamp is None assert buffer.newest_timestamp is None assert buffer.count_valid() == 0 + assert buffer.count_covered() == 0 assert len(buffer.gaps) == 0 buffer.update(Sample(dt(0), Quantity(0))) assert buffer.oldest_timestamp == dt(0) assert buffer.newest_timestamp == dt(0) assert buffer.count_valid() == 1 + assert buffer.count_covered() == 1 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(6), Quantity(0))) assert buffer.oldest_timestamp == dt(6) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 1 + assert buffer.count_covered() == 1 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(2), Quantity(2))) @@ -230,48 +232,57 @@ def test_gaps() -> None: # pylint: disable=too-many-statements assert buffer.oldest_timestamp == dt(2) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 4 + assert buffer.count_covered() == 5 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(3), None)) assert buffer.oldest_timestamp == dt(2) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 3 + assert buffer.count_covered() == 5 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(3), Quantity(np.nan))) assert buffer.oldest_timestamp == dt(2) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 3 + assert buffer.count_covered() == 5 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(2), Quantity(np.nan))) assert buffer.oldest_timestamp == dt(4) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 2 + assert buffer.count_covered() == 3 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(3), Quantity(3))) assert buffer.oldest_timestamp == dt(3) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 3 + assert buffer.count_covered() == 4 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(2), Quantity(2))) assert buffer.oldest_timestamp == dt(2) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 4 + assert buffer.count_covered() == 5 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(5), Quantity(5))) assert buffer.oldest_timestamp == dt(2) assert buffer.newest_timestamp == dt(6) assert buffer.count_valid() == 5 + assert buffer.count_covered() == 5 assert len(buffer.gaps) == 0 + # whole range gap suffers from sdk#646 buffer.update(Sample(dt(99), None)) assert buffer.oldest_timestamp == dt(95) # bug: should be None assert buffer.newest_timestamp == dt(99) # bug: should be None assert buffer.count_valid() == 4 # bug: should be 0 (whole range gap) + assert buffer.count_covered() == 5 # bug: should be 0 assert len(buffer.gaps) == 1 @@ -512,8 +523,8 @@ def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]: return buffer -def test_window() -> None: - """Test the window function.""" +def test_window_datetime() -> None: + """Test the window function with datetime.""" buffer = get_orb(np.array([0, None, 2, 3, 4])) win = buffer.window(dt(0), dt(3), force_copy=False) assert [0, np.nan, 2] == list(win) @@ -531,6 +542,39 @@ def test_window() -> None: assert [] == buffer.window(dt(1), dt(1)) +def test_window_index() -> None: + """Test the window function with index.""" + buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0]) + assert [0, 1, 2] == buffer.window(0, 3) + assert [0, 1, 2, 3, 4] == buffer.window(0, 5) + assert [0, 1, 2, 3, 4] == buffer.window(0, 99) + assert [2, 3] == buffer.window(-3, -1) + assert [2, 3, 4] == buffer.window(-3, 5) + assert [0, 1, 2, 3] == buffer.window(-5, -1) + assert [0, 1, 2, 3, 4] == buffer.window(-99, None) + assert [0, 1, 2, 3, 4] == buffer.window(None, 99) + # start >= end + assert [] == buffer.window(0, 0) + assert [] == buffer.window(-5, 0) + assert [] == buffer.window(1, 0) + assert [] == buffer.window(-1, -2) + assert [] == buffer.window(-3, 0) + + +def test_window_fail() -> None: + """Test the window function with invalid indices.""" + buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0]) + # Go crazy with the indices + with pytest.raises(IndexError): + buffer.window(dt(1), 3) + with pytest.raises(IndexError): + buffer.window(1, dt(3)) + with pytest.raises(IndexError): + buffer.window(None, dt(2)) + with pytest.raises(IndexError): + buffer.window(dt(2), None) + + def test_wrapped_buffer_window() -> None: """Test the wrapped buffer window function.""" wbw = OrderedRingBuffer._wrapped_buffer_window # pylint: disable=protected-access @@ -590,3 +634,28 @@ def test_wrapped_buffer_window() -> None: assert [3, 9] == list(res2_view) assert [3, 4] == list(res2_copy) assert [4, 0] == list(res3_copy) + + +def test_get_timestamp() -> None: + """Test the get_timestamp function.""" + buffer = OrderedRingBuffer( + np.empty(shape=5, dtype=float), + sampling_period=timedelta(seconds=1), + ) + for i in range(5): + buffer.update(Sample(dt(i), Quantity(i))) + assert dt(4) == buffer.get_timestamp(-1) + assert dt(0) == buffer.get_timestamp(-5) + assert dt(-1) == buffer.get_timestamp(-6) + assert dt(0) == buffer.get_timestamp(0) + assert dt(5) == buffer.get_timestamp(5) + assert dt(6) == buffer.get_timestamp(6) + + for i in range(10, 15): + buffer.update(Sample(dt(i), Quantity(i))) + assert dt(14) == buffer.get_timestamp(-1) + assert dt(10) == buffer.get_timestamp(-5) + assert dt(9) == buffer.get_timestamp(-6) + assert dt(10) == buffer.get_timestamp(0) + assert dt(15) == buffer.get_timestamp(5) + assert dt(16) == buffer.get_timestamp(6)