Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@
- A tutorial section and a getting started tutorial
- In `OrderedRingBuffer`:
- Rename `datetime_to_index` to `to_internal_index` to avoid confusion between the internal index and the external index.
- Add `index_to_datetime` method to convert external index to corresponding datetime.
- Remove `__setitem__` method to enforce usage of dedicated `update` method only.
- In `OrderedRingBuffer` and `MovingWindow`:
- Support for integer indices is added.
- Add `count_covered` method to count the number of elements covered by the used time range.




## Bug Fixes

- Fix rendering of diagrams in the documentation.
- The `__getitem__` magic of the `MovingWindow` is fixed to support the same functionality that the `window` method provides.
53 changes: 23 additions & 30 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,30 +243,26 @@ def capacity(self) -> int:

def window(
self,
start: datetime,
end: datetime,
start: datetime | int | None,
end: datetime | int | None,
*,
force_copy: bool = True,
) -> ArrayLike:
"""
Return an array containing the samples in the given time interval.

Args:
start: The start of the time interval. Only datetime objects are supported.
end: The end of the time interval. Only datetime objects are supported.
start: The start of the time interval. If `None`, the start of the
window is used.
end: The end of the time interval. If `None`, the end of the window
is used.
force_copy: If `True`, the returned array is a copy of the underlying
data. Otherwise, if possible, a view of the underlying data is
returned.

Returns:
An array containing the samples in the given time interval.

Raises:
IndexError: if `start` or `end` are not datetime objects.
"""
if not isinstance(start, datetime) or not isinstance(end, datetime):
raise IndexError("Only datetime objects are supported as start and end.")

return self._buffer.window(start, end, force_copy=force_copy)

async def _run_impl(self) -> None:
Expand Down Expand Up @@ -315,6 +311,15 @@ def count_valid(self) -> int:
"""
return self._buffer.count_valid()

def count_covered(self) -> int:
"""Count the number of samples that are covered by the oldest and newest valid samples.

Returns:
The count of samples between the oldest and newest (inclusive) valid samples
or 0 if there are is no time range covered.
"""
return self._buffer.count_covered()

@overload
def __getitem__(self, key: SupportsIndex) -> float:
"""See the main __getitem__ method.
Expand Down Expand Up @@ -362,30 +367,18 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik
A float if the key is a number or a timestamp.
an numpy array if the key is a slice.
"""
if isinstance(key, slice):
if not (key.step is None or key.step == 1):
raise ValueError("Slicing with a step other than 1 is not supported.")
return self.window(key.start, key.stop)

if self._buffer.count_valid() == 0:
raise IndexError("The buffer is empty.")
if isinstance(key, slice):
if isinstance(key.start, int) or isinstance(key.stop, int):
if key.start is None or key.stop is None:
key = slice(slice(key.start, key.stop).indices(self.count_valid()))
elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
if key.start is None:
key = slice(self._buffer.time_bound_oldest, key.stop)
if key.stop is None:
key = slice(key.start, self._buffer.time_bound_newest)

_logger.debug("Returning slice for [%s:%s].", key.start, key.stop)

# we are doing runtime typechecks since there is no abstract slice type yet
# see also (https://peps.python.org/pep-0696)
if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
return self._buffer.window(key.start, key.stop)
if isinstance(key.start, int) and isinstance(key.stop, int):
return self._buffer[key]
elif isinstance(key, datetime):

if isinstance(key, datetime):
_logger.debug("Returning value at time %s ", key)
return self._buffer[self._buffer.to_internal_index(key)]
elif isinstance(key, SupportsIndex):
if isinstance(key, SupportsIndex):
_logger.debug("Returning value at index %s ", key)
return self._buffer[key]

Expand Down
97 changes: 92 additions & 5 deletions src/frequenz/sdk/timeseries/_ringbuffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,53 @@ def to_internal_index(
)
)

def get_timestamp(self, index: int) -> datetime | None:
"""Convert the given index to the underlying timestamp.

Index 0 corresponds to the oldest timestamp in the buffer.
If negative indices are used, the newest timestamp is used as reference.

!!!warning

The resulting timestamp can be outside the range of the buffer.

Args:
index: Index to convert.

Returns:
Datetime index where the value for the given index can be found.
Or None if the buffer is empty.
"""
if self.oldest_timestamp is None:
return None
assert self.newest_timestamp is not None
ref_ts = (
self.oldest_timestamp
if index >= 0
else self.newest_timestamp + self._sampling_period
)
return ref_ts + index * self._sampling_period

def _to_covered_indices(
self, start: int | None, end: int | None = None
) -> tuple[int, int]:
"""Project the given indices via slice onto the covered range.

Args:
start: Start index.
end: End index. Optional, defaults to None.

Returns:
tuple of start and end indices on the range currently covered by the buffer.
"""
return slice(start, end).indices(self.count_covered())[:2]

def window(
self, start: datetime, end: datetime, *, force_copy: bool = True
self,
start: datetime | int | None,
end: datetime | int | None,
*,
force_copy: bool = True,
) -> FloatArray:
"""Request a copy or view on the data between start timestamp and end timestamp.

Expand All @@ -283,17 +328,32 @@ def window(
copy of the data.

Raises:
IndexError: When requesting a window with invalid timestamps.
IndexError: When start and end are not both datetime or index.

Returns:
The requested window
"""
if start > end:
if self.count_covered() == 0:
return np.array([]) if isinstance(self._buffer, np.ndarray) else []

# If both are indices or None convert to datetime
if not isinstance(start, datetime) and not isinstance(end, datetime):
start, end = self._to_covered_indices(start, end)
start = self.get_timestamp(start)
end = self.get_timestamp(end)

# Here we should have both as datetime
if not isinstance(start, datetime) or not isinstance(end, datetime):
raise IndexError(
f"end parameter {end} has to predate start parameter {start}"
f"start ({start}) and end ({end}) must both be either datetime or index."
)

if start == end:
# Ensure that the window is within the bounds of the buffer
assert self.oldest_timestamp is not None and self.newest_timestamp is not None
start = max(start, self.oldest_timestamp)
end = min(end, self.newest_timestamp + self._sampling_period)

if start >= end:
return np.array([]) if isinstance(self._buffer, np.ndarray) else []

start_pos = self.to_internal_index(start)
Expand Down Expand Up @@ -539,6 +599,33 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr
"""
return self._buffer.__getitem__(index_or_slice)

def _covered_time_range(self) -> timedelta:
"""Return the time range that is covered by the oldest and newest valid samples.

Returns:
The time range between the oldest and newest valid samples or 0 if
there are is no time range covered.
"""
if not self.oldest_timestamp:
return timedelta(0)

assert (
self.newest_timestamp is not None
), "Newest timestamp cannot be None here."
return self.newest_timestamp - self.oldest_timestamp + self._sampling_period

def count_covered(self) -> int:
"""Count the number of samples that are covered by the oldest and newest valid samples.

Returns:
The count of samples between the oldest and newest (inclusive) valid samples
or 0 if there are is no time range covered.
"""
return int(
self._covered_time_range().total_seconds()
// self._sampling_period.total_seconds()
)

def count_valid(self) -> int:
"""Count the number of valid items that this buffer currently holds.

Expand Down
68 changes: 59 additions & 9 deletions tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,61 @@ async def test_access_window_by_int_slice() -> None:
async with window:
await push_logical_meter_data(sender, range(0, 5))
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))
with pytest.raises(IndexError):
window.window(3, 5) # type: ignore
assert np.array_equal(window.window(3, 5), np.array([3.0, 4.0]))

data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1]
await push_logical_meter_data(sender, data)
assert np.array_equal(window[5:14], np.array(data[5:14]))
assert np.array_equal(window.window(5, 14), np.array(data[5:14]))

# Test with step size (other than 1 not supported)
assert np.array_equal(window[5:14:1], np.array(data[5:14]))
assert np.array_equal(window[5:14:None], np.array(data[5:14]))
with pytest.raises(ValueError):
_ = window[5:14:2]
with pytest.raises(ValueError):
_ = window[14:5:-1]

window, sender = init_moving_window(timedelta(seconds=5))

def test_eq(expected: list[float], start: int | None, end: int | None) -> None:
assert np.allclose(
window.window(start, end), np.array(expected), equal_nan=True
)
assert np.allclose(window[start:end], np.array(expected), equal_nan=True)

async with window:
test_eq([], 0, 1)

# Incomplete window
await push_logical_meter_data(sender, [0.0, 1.0])
test_eq([0.0, 1.0], 0, 2)
test_eq([0.0, 1.0], 0, 9)
test_eq([0.0, 1.0], 0, None)
test_eq([0.0, 1.0], -9, None)
test_eq([0.0, 1.0], None, None)
test_eq([0.0], -2, -1)
test_eq([1.0], -1, None)

# Incomplete window with gap
await push_logical_meter_data(
sender, [3.0], start_ts=UNIX_EPOCH + timedelta(seconds=3)
)
test_eq([0.0, 1.0], 0, 2)
# gap fill not supported yet:
# test_eq([0.0, 1.0, np.nan, 3.0], 0, None)
# test_eq([0.0, 1.0, np.nan, 3.0], -9, None)
# test_eq([np.nan, 3.0], -2, None)

# Complete window
await push_logical_meter_data(sender, [0.0, 1.0, 2.0, 3.0, 4.0])
test_eq([0.0, 1.0], 0, 2)
test_eq([3.0, 4.0], -2, None)

# Complete window with nan
await push_logical_meter_data(sender, [0.0, 1.0, np.nan])
test_eq([0.0, 1.0, np.nan], 0, 3)
test_eq([np.nan, 3.0, 4.0], -3, None)


async def test_access_window_by_ts_slice() -> None:
Expand All @@ -121,13 +171,10 @@ async def test_access_window_by_ts_slice() -> None:
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore
assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0]))
assert np.array_equal(window.window(dt(3), dt(3)), np.array([]))
# Window only supports slicing with ascending indices within allowed range
with pytest.raises(IndexError):
window.window(dt(3), dt(1))
with pytest.raises(IndexError):
window.window(dt(3), dt(6))
with pytest.raises(IndexError):
window.window(dt(-1), dt(5))
# Window also supports slicing with indices outside allowed range
assert np.array_equal(window.window(dt(3), dt(1)), np.array([]))
assert np.array_equal(window.window(dt(3), dt(6)), np.array([3, 4]))
assert np.array_equal(window.window(dt(-1), dt(5)), np.array([0, 1, 2, 3, 4]))


async def test_access_empty_window() -> None:
Expand All @@ -144,12 +191,15 @@ async def test_window_size() -> None:
async with window:
assert window.capacity == 5, "Wrong window capacity"
assert window.count_valid() == 0, "Window should be empty"
assert window.count_covered() == 0, "Window should be empty"
await push_logical_meter_data(sender, range(0, 2))
assert window.capacity == 5, "Wrong window capacity"
assert window.count_valid() == 2, "Window should be partially full"
assert window.count_covered() == 2, "Window should be partially full"
await push_logical_meter_data(sender, range(2, 20))
assert window.capacity == 5, "Wrong window capacity"
assert window.count_valid() == 5, "Window should be full"
assert window.count_covered() == 5, "Window should be full"


# pylint: disable=redefined-outer-name
Expand Down
Loading