Skip to content

Commit a06db70

Browse files
authored
Add option to impute missing values in window method for moving window (#669)
The `fill_value` option imputes missing values `NaN` or `None` in the ring buffer and the moving window with the corresponding value. If `None` is passed, no imputation is done. By default, missing values are imputed with `NaN`. This prevents returning outdated return values accidentally. If missing values should be filled, the force_copy argument has to be true to avoid overwriting the underlying data.
2 parents 33f9763 + 33c05d1 commit a06db70

File tree

5 files changed

+112
-9
lines changed

5 files changed

+112
-9
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
- In `OrderedRingBuffer` and `MovingWindow`:
2929
- Support for integer indices is added.
3030
- Add `count_covered` method to count the number of elements covered by the used time range.
31+
- Add `fill_value` option to window method to impute missing values. By default missing values are imputed with `NaN`.
3132
- Add `at` method to `MovingWindow` to access a single element and use it in `__getitem__` magic to fully support single element access.
3233

3334

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ def window(
289289
end: datetime | int | None,
290290
*,
291291
force_copy: bool = True,
292+
fill_value: float | None = np.nan,
292293
) -> ArrayLike:
293294
"""
294295
Return an array containing the samples in the given time interval.
@@ -305,11 +306,16 @@ def window(
305306
force_copy: If `True`, the returned array is a copy of the underlying
306307
data. Otherwise, if possible, a view of the underlying data is
307308
returned.
309+
fill_value: If not None, will use this value to fill missing values.
310+
If missing values should be set, force_copy must be True.
311+
Defaults to NaN to avoid returning outdated data unexpectedly.
308312
309313
Returns:
310314
An array containing the samples in the given time interval.
311315
"""
312-
return self._buffer.window(start, end, force_copy=force_copy)
316+
return self._buffer.window(
317+
start, end, force_copy=force_copy, fill_value=fill_value
318+
)
313319

314320
async def _run_impl(self) -> None:
315321
"""Awaits samples from the receiver and updates the underlying ring buffer.

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ def window(
304304
end: datetime | int | None,
305305
*,
306306
force_copy: bool = True,
307+
fill_value: float | None = np.nan,
307308
) -> FloatArray:
308309
"""Request a copy or view on the data between start timestamp and end timestamp.
309310
@@ -326,13 +327,21 @@ def window(
326327
end: end time of the window.
327328
force_copy: optional, default True. If True, will always create a
328329
copy of the data.
330+
fill_value: If not None, will use this value to fill missing values.
331+
If missing values should be filled, force_copy must be True.
332+
Defaults to NaN to avoid returning outdated data unexpectedly.
329333
330334
Raises:
331335
IndexError: When start and end are not both datetime or index.
336+
ValueError: When fill_value is not None and force_copy is False.
332337
333338
Returns:
334339
The requested window
335340
"""
341+
# We don't want to modify the original buffer
342+
if fill_value is not None and not force_copy:
343+
raise ValueError("fill_value only supported for force_copy=True")
344+
336345
if self.count_covered() == 0:
337346
return np.array([]) if isinstance(self._buffer, np.ndarray) else []
338347

@@ -359,7 +368,48 @@ def window(
359368
start_pos = self.to_internal_index(start)
360369
end_pos = self.to_internal_index(end)
361370

362-
return self._wrapped_buffer_window(self._buffer, start_pos, end_pos, force_copy)
371+
window = self._wrapped_buffer_window(
372+
self._buffer, start_pos, end_pos, force_copy
373+
)
374+
375+
if fill_value is not None:
376+
window = self._fill_gaps(window, fill_value, start, self.gaps)
377+
return window
378+
379+
def _fill_gaps(
380+
self,
381+
data: FloatArray,
382+
fill_value: float,
383+
oldest_timestamp: datetime,
384+
gaps: list[Gap],
385+
) -> FloatArray:
386+
"""Fill the gaps in the data with the given fill_value.
387+
388+
Args:
389+
data: The data to fill.
390+
fill_value: The value to fill the gaps with.
391+
oldest_timestamp: The oldest timestamp in the data.
392+
gaps: List of gaps to fill.
393+
394+
Returns:
395+
The filled data.
396+
"""
397+
assert isinstance(
398+
data, (np.ndarray, list)
399+
), f"Unsupported data type {type(data)}"
400+
for gap in gaps:
401+
start_index = (gap.start - oldest_timestamp) // self._sampling_period
402+
end_index = (gap.end - oldest_timestamp) // self._sampling_period
403+
start_index = max(start_index, 0)
404+
end_index = min(end_index, len(data))
405+
if start_index < end_index:
406+
if isinstance(data, np.ndarray):
407+
data[start_index:end_index] = fill_value
408+
elif isinstance(data, list):
409+
data[start_index:end_index] = [fill_value] * (
410+
end_index - start_index
411+
)
412+
return data
363413

364414
@staticmethod
365415
def _wrapped_buffer_window(

tests/timeseries/test_moving_window.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,16 @@ def test_eq(expected: list[float], start: int | None, end: int | None) -> None:
163163
sender, [3.0], start_ts=UNIX_EPOCH + timedelta(seconds=3)
164164
)
165165
test_eq([0.0, 1.0], 0, 2)
166-
# gap fill not supported yet:
167-
# test_eq([0.0, 1.0, np.nan, 3.0], 0, None)
168-
# test_eq([0.0, 1.0, np.nan, 3.0], -9, None)
169-
# test_eq([np.nan, 3.0], -2, None)
166+
# test gaps to be NaN
167+
test_eq([0.0, 1.0, np.nan, 3.0], 0, None)
168+
test_eq([np.nan, 3.0], -2, None)
169+
170+
# Test fill_value
171+
assert np.allclose(
172+
np.array([0.0, 1.0, 2.0, 3.0]),
173+
window.window(0, None, fill_value=2.0),
174+
equal_nan=True,
175+
)
170176

171177
# Complete window
172178
await push_logical_meter_data(sender, [0.0, 1.0, 2.0, 3.0, 4.0])

tests/timeseries/test_ringbuffer.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -526,12 +526,12 @@ def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]:
526526
def test_window_datetime() -> None:
527527
"""Test the window function with datetime."""
528528
buffer = get_orb(np.array([0, None, 2, 3, 4]))
529-
win = buffer.window(dt(0), dt(3), force_copy=False)
529+
win = buffer.window(dt(0), dt(3), force_copy=False, fill_value=None)
530530
assert [0, np.nan, 2] == list(win)
531531
buffer._buffer[1] = 1 # pylint: disable=protected-access
532532
# Test whether the window is a view or a copy
533533
assert [0, 1, 2] == list(win)
534-
win = buffer.window(dt(0), dt(3), force_copy=False)
534+
win = buffer.window(dt(0), dt(3), force_copy=False, fill_value=None)
535535
assert [0, 1, 2] == list(win)
536536
# Empty array
537537
assert 0 == buffer.window(dt(1), dt(1)).size
@@ -561,8 +561,45 @@ def test_window_index() -> None:
561561
assert [] == buffer.window(-3, 0)
562562

563563

564+
def test_window_index_fill_value() -> None:
565+
"""Test fill_value functionality of window function."""
566+
# Init with dummy data of size 3 and fill with [0, nan, 2]
567+
buffer = OrderedRingBuffer([4711.0] * 3, ONE_SECOND)
568+
buffer.update(Sample(dt(0), Quantity(0)))
569+
buffer.update(Sample(dt(1), None))
570+
buffer.update(Sample(dt(2), Quantity(2)))
571+
572+
# Test fill_value for explicitly set gaps
573+
assert [0.0, np.nan, 2.0] == buffer.window(0, None)
574+
assert [0.0, np.nan, 2.0] == buffer.window(0, None, fill_value=None)
575+
assert [0.0, 1.0, 2.0] == buffer.window(0, None, fill_value=1)
576+
577+
# initial nan is ignored (optional implementation decision)
578+
buffer.update(Sample(dt(3), Quantity(3))) # -> [nan, 2, 3]
579+
assert [2.0, 3.0] == buffer.window(0, None)
580+
assert [2.0, 3.0] == buffer.window(0, None, fill_value=1)
581+
582+
# Test fill_value for gaps implicitly created gaps by time jumps
583+
buffer.update(Sample(dt(4), Quantity(4)))
584+
buffer.update(Sample(dt(6), Quantity(6))) # -> [4, ?, 6]
585+
# Default is fill missing values with NaNs
586+
assert [4.0, np.nan, 6.0] == buffer.window(0, None)
587+
assert [4.0, np.nan, 6.0] == buffer.window(0, None, fill_value=np.nan)
588+
assert [4.0, 5.0, 6.0] == buffer.window(0, None, fill_value=5)
589+
# If missing values not filled, outdated values can be returned unexpectedly
590+
assert [4.0, 2, 6.0] == buffer.window(0, None, fill_value=None)
591+
592+
# Some edge cases
593+
buffer.update(Sample(dt(7), Quantity(7))) # -> [?, 6, 7]
594+
assert [6.0, 7.0] == buffer.window(0, None)
595+
buffer.update(Sample(dt(8), Quantity(np.nan))) # -> [6, 7, nan]
596+
assert [6.0, 7.0, np.nan] == buffer.window(0, None)
597+
buffer.update(Sample(dt(9), None)) # -> [7, nan, nan]
598+
assert [7.0, np.nan, np.nan] == buffer.window(0, None)
599+
600+
564601
def test_window_fail() -> None:
565-
"""Test the window function with invalid indices."""
602+
"""Test the window function with invalid arguments."""
566603
buffer = get_orb([0.0, 1.0, 2.0, 3.0, 4.0])
567604
# Go crazy with the indices
568605
with pytest.raises(IndexError):
@@ -573,6 +610,9 @@ def test_window_fail() -> None:
573610
buffer.window(None, dt(2))
574611
with pytest.raises(IndexError):
575612
buffer.window(dt(2), None)
613+
# Invalid argument combination
614+
with pytest.raises(ValueError):
615+
buffer.window(0, 1, force_copy=False, fill_value=0)
576616

577617

578618
def test_wrapped_buffer_window() -> None:

0 commit comments

Comments
 (0)