Skip to content

Commit bd7822c

Browse files
authored
Support int indices and slice index behavior in ring buffer and moving window (#668)
To support int indices and slice index behavior in the moving window and the ring buffer, the following changes are made: * Support for int indices in addition to datetime indices is added to moving window and ring buffer. * Indices behave like usual slices (gracefully accept start >= end or index out of range). * A method to count the number of possible elements in the range between oldest and newest timestamp is added to ring buffer and the moving window. * `get_timestamp` is added to convert an int index passed by the user to the corresponding timestamp.
2 parents 202c504 + 7b5f967 commit bd7822c

File tree

5 files changed

+259
-52
lines changed

5 files changed

+259
-52
lines changed

RELEASE_NOTES.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,16 @@
2323

2424
- In `OrderedRingBuffer`:
2525
- Rename `datetime_to_index` to `to_internal_index` to avoid confusion between the internal index and the external index.
26+
- Add `index_to_datetime` method to convert external index to corresponding datetime.
2627
- Remove `__setitem__` method to enforce usage of dedicated `update` method only.
28+
- In `OrderedRingBuffer` and `MovingWindow`:
29+
- Support for integer indices is added.
30+
- Add `count_covered` method to count the number of elements covered by the used time range.
31+
32+
33+
2734

2835
## Bug Fixes
2936

3037
- Fix rendering of diagrams in the documentation.
38+
- The `__getitem__` magic of the `MovingWindow` is fixed to support the same functionality that the `window` method provides.

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,26 @@ def capacity(self) -> int:
243243

244244
def window(
245245
self,
246-
start: datetime,
247-
end: datetime,
246+
start: datetime | int | None,
247+
end: datetime | int | None,
248248
*,
249249
force_copy: bool = True,
250250
) -> ArrayLike:
251251
"""
252252
Return an array containing the samples in the given time interval.
253253
254254
Args:
255-
start: The start of the time interval. Only datetime objects are supported.
256-
end: The end of the time interval. Only datetime objects are supported.
255+
start: The start of the time interval. If `None`, the start of the
256+
window is used.
257+
end: The end of the time interval. If `None`, the end of the window
258+
is used.
257259
force_copy: If `True`, the returned array is a copy of the underlying
258260
data. Otherwise, if possible, a view of the underlying data is
259261
returned.
260262
261263
Returns:
262264
An array containing the samples in the given time interval.
263-
264-
Raises:
265-
IndexError: if `start` or `end` are not datetime objects.
266265
"""
267-
if not isinstance(start, datetime) or not isinstance(end, datetime):
268-
raise IndexError("Only datetime objects are supported as start and end.")
269-
270266
return self._buffer.window(start, end, force_copy=force_copy)
271267

272268
async def _run_impl(self) -> None:
@@ -315,6 +311,15 @@ def count_valid(self) -> int:
315311
"""
316312
return self._buffer.count_valid()
317313

314+
def count_covered(self) -> int:
315+
"""Count the number of samples that are covered by the oldest and newest valid samples.
316+
317+
Returns:
318+
The count of samples between the oldest and newest (inclusive) valid samples
319+
or 0 if there are is no time range covered.
320+
"""
321+
return self._buffer.count_covered()
322+
318323
@overload
319324
def __getitem__(self, key: SupportsIndex) -> float:
320325
"""See the main __getitem__ method.
@@ -362,30 +367,18 @@ def __getitem__(self, key: SupportsIndex | datetime | slice) -> float | ArrayLik
362367
A float if the key is a number or a timestamp.
363368
an numpy array if the key is a slice.
364369
"""
370+
if isinstance(key, slice):
371+
if not (key.step is None or key.step == 1):
372+
raise ValueError("Slicing with a step other than 1 is not supported.")
373+
return self.window(key.start, key.stop)
374+
365375
if self._buffer.count_valid() == 0:
366376
raise IndexError("The buffer is empty.")
367-
if isinstance(key, slice):
368-
if isinstance(key.start, int) or isinstance(key.stop, int):
369-
if key.start is None or key.stop is None:
370-
key = slice(slice(key.start, key.stop).indices(self.count_valid()))
371-
elif isinstance(key.start, datetime) or isinstance(key.stop, datetime):
372-
if key.start is None:
373-
key = slice(self._buffer.time_bound_oldest, key.stop)
374-
if key.stop is None:
375-
key = slice(key.start, self._buffer.time_bound_newest)
376-
377-
_logger.debug("Returning slice for [%s:%s].", key.start, key.stop)
378-
379-
# we are doing runtime typechecks since there is no abstract slice type yet
380-
# see also (https://peps.python.org/pep-0696)
381-
if isinstance(key.start, datetime) and isinstance(key.stop, datetime):
382-
return self._buffer.window(key.start, key.stop)
383-
if isinstance(key.start, int) and isinstance(key.stop, int):
384-
return self._buffer[key]
385-
elif isinstance(key, datetime):
377+
378+
if isinstance(key, datetime):
386379
_logger.debug("Returning value at time %s ", key)
387380
return self._buffer[self._buffer.to_internal_index(key)]
388-
elif isinstance(key, SupportsIndex):
381+
if isinstance(key, SupportsIndex):
389382
_logger.debug("Returning value at index %s ", key)
390383
return self._buffer[key]
391384

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,53 @@ def to_internal_index(
257257
)
258258
)
259259

260+
def get_timestamp(self, index: int) -> datetime | None:
261+
"""Convert the given index to the underlying timestamp.
262+
263+
Index 0 corresponds to the oldest timestamp in the buffer.
264+
If negative indices are used, the newest timestamp is used as reference.
265+
266+
!!!warning
267+
268+
The resulting timestamp can be outside the range of the buffer.
269+
270+
Args:
271+
index: Index to convert.
272+
273+
Returns:
274+
Datetime index where the value for the given index can be found.
275+
Or None if the buffer is empty.
276+
"""
277+
if self.oldest_timestamp is None:
278+
return None
279+
assert self.newest_timestamp is not None
280+
ref_ts = (
281+
self.oldest_timestamp
282+
if index >= 0
283+
else self.newest_timestamp + self._sampling_period
284+
)
285+
return ref_ts + index * self._sampling_period
286+
287+
def _to_covered_indices(
288+
self, start: int | None, end: int | None = None
289+
) -> tuple[int, int]:
290+
"""Project the given indices via slice onto the covered range.
291+
292+
Args:
293+
start: Start index.
294+
end: End index. Optional, defaults to None.
295+
296+
Returns:
297+
tuple of start and end indices on the range currently covered by the buffer.
298+
"""
299+
return slice(start, end).indices(self.count_covered())[:2]
300+
260301
def window(
261-
self, start: datetime, end: datetime, *, force_copy: bool = True
302+
self,
303+
start: datetime | int | None,
304+
end: datetime | int | None,
305+
*,
306+
force_copy: bool = True,
262307
) -> FloatArray:
263308
"""Request a copy or view on the data between start timestamp and end timestamp.
264309
@@ -283,17 +328,32 @@ def window(
283328
copy of the data.
284329
285330
Raises:
286-
IndexError: When requesting a window with invalid timestamps.
331+
IndexError: When start and end are not both datetime or index.
287332
288333
Returns:
289334
The requested window
290335
"""
291-
if start > end:
336+
if self.count_covered() == 0:
337+
return np.array([]) if isinstance(self._buffer, np.ndarray) else []
338+
339+
# If both are indices or None convert to datetime
340+
if not isinstance(start, datetime) and not isinstance(end, datetime):
341+
start, end = self._to_covered_indices(start, end)
342+
start = self.get_timestamp(start)
343+
end = self.get_timestamp(end)
344+
345+
# Here we should have both as datetime
346+
if not isinstance(start, datetime) or not isinstance(end, datetime):
292347
raise IndexError(
293-
f"end parameter {end} has to predate start parameter {start}"
348+
f"start ({start}) and end ({end}) must both be either datetime or index."
294349
)
295350

296-
if start == end:
351+
# Ensure that the window is within the bounds of the buffer
352+
assert self.oldest_timestamp is not None and self.newest_timestamp is not None
353+
start = max(start, self.oldest_timestamp)
354+
end = min(end, self.newest_timestamp + self._sampling_period)
355+
356+
if start >= end:
297357
return np.array([]) if isinstance(self._buffer, np.ndarray) else []
298358

299359
start_pos = self.to_internal_index(start)
@@ -539,6 +599,33 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr
539599
"""
540600
return self._buffer.__getitem__(index_or_slice)
541601

602+
def _covered_time_range(self) -> timedelta:
603+
"""Return the time range that is covered by the oldest and newest valid samples.
604+
605+
Returns:
606+
The time range between the oldest and newest valid samples or 0 if
607+
there are is no time range covered.
608+
"""
609+
if not self.oldest_timestamp:
610+
return timedelta(0)
611+
612+
assert (
613+
self.newest_timestamp is not None
614+
), "Newest timestamp cannot be None here."
615+
return self.newest_timestamp - self.oldest_timestamp + self._sampling_period
616+
617+
def count_covered(self) -> int:
618+
"""Count the number of samples that are covered by the oldest and newest valid samples.
619+
620+
Returns:
621+
The count of samples between the oldest and newest (inclusive) valid samples
622+
or 0 if there are is no time range covered.
623+
"""
624+
return int(
625+
self._covered_time_range().total_seconds()
626+
// self._sampling_period.total_seconds()
627+
)
628+
542629
def count_valid(self) -> int:
543630
"""Count the number of valid items that this buffer currently holds.
544631

tests/timeseries/test_moving_window.py

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,61 @@ async def test_access_window_by_int_slice() -> None:
104104
async with window:
105105
await push_logical_meter_data(sender, range(0, 5))
106106
assert np.array_equal(window[3:5], np.array([3.0, 4.0]))
107-
with pytest.raises(IndexError):
108-
window.window(3, 5) # type: ignore
107+
assert np.array_equal(window.window(3, 5), np.array([3.0, 4.0]))
108+
109109
data = [1, 2, 2.5, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1]
110110
await push_logical_meter_data(sender, data)
111111
assert np.array_equal(window[5:14], np.array(data[5:14]))
112+
assert np.array_equal(window.window(5, 14), np.array(data[5:14]))
113+
114+
# Test with step size (other than 1 not supported)
115+
assert np.array_equal(window[5:14:1], np.array(data[5:14]))
116+
assert np.array_equal(window[5:14:None], np.array(data[5:14]))
117+
with pytest.raises(ValueError):
118+
_ = window[5:14:2]
119+
with pytest.raises(ValueError):
120+
_ = window[14:5:-1]
121+
122+
window, sender = init_moving_window(timedelta(seconds=5))
123+
124+
def test_eq(expected: list[float], start: int | None, end: int | None) -> None:
125+
assert np.allclose(
126+
window.window(start, end), np.array(expected), equal_nan=True
127+
)
128+
assert np.allclose(window[start:end], np.array(expected), equal_nan=True)
129+
130+
async with window:
131+
test_eq([], 0, 1)
132+
133+
# Incomplete window
134+
await push_logical_meter_data(sender, [0.0, 1.0])
135+
test_eq([0.0, 1.0], 0, 2)
136+
test_eq([0.0, 1.0], 0, 9)
137+
test_eq([0.0, 1.0], 0, None)
138+
test_eq([0.0, 1.0], -9, None)
139+
test_eq([0.0, 1.0], None, None)
140+
test_eq([0.0], -2, -1)
141+
test_eq([1.0], -1, None)
142+
143+
# Incomplete window with gap
144+
await push_logical_meter_data(
145+
sender, [3.0], start_ts=UNIX_EPOCH + timedelta(seconds=3)
146+
)
147+
test_eq([0.0, 1.0], 0, 2)
148+
# gap fill not supported yet:
149+
# test_eq([0.0, 1.0, np.nan, 3.0], 0, None)
150+
# test_eq([0.0, 1.0, np.nan, 3.0], -9, None)
151+
# test_eq([np.nan, 3.0], -2, None)
152+
153+
# Complete window
154+
await push_logical_meter_data(sender, [0.0, 1.0, 2.0, 3.0, 4.0])
155+
test_eq([0.0, 1.0], 0, 2)
156+
test_eq([3.0, 4.0], -2, None)
157+
158+
# Complete window with nan
159+
await push_logical_meter_data(sender, [0.0, 1.0, np.nan])
160+
test_eq([0.0, 1.0, np.nan], 0, 3)
161+
test_eq([np.nan, 3.0, 4.0], -3, None)
112162

113163

114164
async def test_access_window_by_ts_slice() -> None:
@@ -121,13 +171,10 @@ async def test_access_window_by_ts_slice() -> None:
121171
assert np.array_equal(window[time_start:time_end], np.array([3.0, 4.0])) # type: ignore
122172
assert np.array_equal(window.window(dt(3), dt(5)), np.array([3.0, 4.0]))
123173
assert np.array_equal(window.window(dt(3), dt(3)), np.array([]))
124-
# Window only supports slicing with ascending indices within allowed range
125-
with pytest.raises(IndexError):
126-
window.window(dt(3), dt(1))
127-
with pytest.raises(IndexError):
128-
window.window(dt(3), dt(6))
129-
with pytest.raises(IndexError):
130-
window.window(dt(-1), dt(5))
174+
# Window also supports slicing with indices outside allowed range
175+
assert np.array_equal(window.window(dt(3), dt(1)), np.array([]))
176+
assert np.array_equal(window.window(dt(3), dt(6)), np.array([3, 4]))
177+
assert np.array_equal(window.window(dt(-1), dt(5)), np.array([0, 1, 2, 3, 4]))
131178

132179

133180
async def test_access_empty_window() -> None:
@@ -144,12 +191,15 @@ async def test_window_size() -> None:
144191
async with window:
145192
assert window.capacity == 5, "Wrong window capacity"
146193
assert window.count_valid() == 0, "Window should be empty"
194+
assert window.count_covered() == 0, "Window should be empty"
147195
await push_logical_meter_data(sender, range(0, 2))
148196
assert window.capacity == 5, "Wrong window capacity"
149197
assert window.count_valid() == 2, "Window should be partially full"
198+
assert window.count_covered() == 2, "Window should be partially full"
150199
await push_logical_meter_data(sender, range(2, 20))
151200
assert window.capacity == 5, "Wrong window capacity"
152201
assert window.count_valid() == 5, "Window should be full"
202+
assert window.count_covered() == 5, "Window should be full"
153203

154204

155205
# pylint: disable=redefined-outer-name

0 commit comments

Comments
 (0)