Skip to content

Commit 49b8738

Browse files
committed
[MovingWindow] Accept a time range in the count_covered method
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 329738e commit 49b8738

File tree

3 files changed

+155
-49
lines changed

3 files changed

+155
-49
lines changed

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,25 @@ def count_valid(
374374
"""
375375
return self._buffer.count_valid(since=since, until=until)
376376

377-
def count_covered(self) -> int:
377+
def count_covered(
378+
self, *, since: datetime | None = None, until: datetime | None = None
379+
) -> int:
378380
"""Count the number of samples that are covered by the oldest and newest valid samples.
379381
382+
If `since` and `until` are provided, the count is limited to the samples between
383+
(and including) the given timestamps.
384+
385+
Args:
386+
since: The timestamp from which to start counting. If `None`, the oldest
387+
timestamp of the buffer is used.
388+
until: The timestamp until (and including) which to count. If `None`, the
389+
newest timestamp of the buffer is used.
390+
380391
Returns:
381392
The count of samples between the oldest and newest (inclusive) valid samples
382393
or 0 if there are is no time range covered.
383394
"""
384-
return self._buffer.count_covered()
395+
return self._buffer.count_covered(since=since, until=until)
385396

386397
@overload
387398
def __getitem__(self, key: SupportsIndex) -> float:

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -651,9 +651,20 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr
651651
"""
652652
return self._buffer.__getitem__(index_or_slice)
653653

654-
def _covered_time_range(self) -> timedelta:
654+
def _covered_time_range(
655+
self, since: datetime | None = None, until: datetime | None = None
656+
) -> timedelta:
655657
"""Return the time range that is covered by the oldest and newest valid samples.
656658
659+
If `since` and `until` are provided, the time range is limited to the items
660+
between (and including) the given timestamps.
661+
662+
Args:
663+
since: The timestamp from which to start counting. If `None`, the oldest
664+
timestamp in the buffer is used.
665+
until: The timestamp until (and including) which to count. If `None`, the
666+
newest timestamp in the buffer is used.
667+
657668
Returns:
658669
The time range between the oldest and newest valid samples or 0 if
659670
there are is no time range covered.
@@ -664,17 +675,37 @@ def _covered_time_range(self) -> timedelta:
664675
assert (
665676
self.newest_timestamp is not None
666677
), "Newest timestamp cannot be None here."
667-
return self.newest_timestamp - self.oldest_timestamp + self._sampling_period
668678

669-
def count_covered(self) -> int:
679+
if since is None or since < self.oldest_timestamp:
680+
since = self.oldest_timestamp
681+
if until is None or until > self.newest_timestamp:
682+
until = self.newest_timestamp
683+
684+
if until < since:
685+
return timedelta(0)
686+
687+
return until - since + self._sampling_period
688+
689+
def count_covered(
690+
self, *, since: datetime | None = None, until: datetime | None = None
691+
) -> int:
670692
"""Count the number of samples that are covered by the oldest and newest valid samples.
671693
694+
If `since` and `until` are provided, the count is limited to the items between
695+
(and including) the given timestamps.
696+
697+
Args:
698+
since: The timestamp from which to start counting. If `None`, the oldest
699+
timestamp in the buffer is used.
700+
until: The timestamp until (and including) which to count. If `None`, the
701+
newest timestamp in the buffer is used.
702+
672703
Returns:
673704
The count of samples between the oldest and newest (inclusive) valid samples
674705
or 0 if there are is no time range covered.
675706
"""
676707
return int(
677-
self._covered_time_range().total_seconds()
708+
self._covered_time_range(since, until).total_seconds()
678709
// self._sampling_period.total_seconds()
679710
)
680711

tests/timeseries/test_moving_window.py

Lines changed: 107 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def event_loop_policy() -> async_solipsism.EventLoopPolicy:
2727

2828
async def push_logical_meter_data(
2929
sender: Sender[Sample[Quantity]],
30-
test_seq: Sequence[float],
30+
test_seq: Sequence[float | None],
3131
start_ts: datetime = UNIX_EPOCH,
3232
) -> None:
3333
"""Push data in the passed sender to mock `LogicalMeter` behaviour.
@@ -41,7 +41,9 @@ async def push_logical_meter_data(
4141
"""
4242
for i, j in zip(test_seq, range(0, len(test_seq))):
4343
timestamp = start_ts + timedelta(seconds=j)
44-
await sender.send(Sample(timestamp, Quantity(float(i))))
44+
await sender.send(
45+
Sample(timestamp, Quantity(float(i)) if i is not None else None)
46+
)
4547

4648
await asyncio.sleep(0.0)
4749

@@ -210,10 +212,27 @@ async def test_access_empty_window() -> None:
210212
_ = window[42]
211213

212214

213-
async def test_window_size() -> None:
215+
async def test_window_size() -> None: # pylint: disable=too-many-statements
214216
"""Test the size of the window."""
215217
window, sender = init_moving_window(timedelta(seconds=10))
216218
async with window:
219+
220+
def assert_valid_and_covered_counts(
221+
*,
222+
since: datetime | None = None,
223+
until: datetime | None = None,
224+
expected: int | None = None,
225+
expected_valid: int | None = None,
226+
expected_covered: int | None = None,
227+
) -> None:
228+
if expected is not None:
229+
assert window.count_valid(since=since, until=until) == expected
230+
assert window.count_covered(since=since, until=until) == expected
231+
return
232+
233+
assert window.count_valid(since=since, until=until) == expected_valid
234+
assert window.count_covered(since=since, until=until) == expected_covered
235+
217236
assert window.capacity == 10, "Wrong window capacity"
218237
assert window.count_valid() == 0, "Window should be empty"
219238
assert window.count_covered() == 0, "Window should be empty"
@@ -239,28 +258,26 @@ async def test_window_size() -> None:
239258
assert window.count_valid() == 10, "Window should be full"
240259
assert window.count_covered() == 10, "Window should be full"
241260

242-
assert window.count_valid(since=UNIX_EPOCH + timedelta(seconds=1)) == 9
243-
assert window.count_valid(until=UNIX_EPOCH + timedelta(seconds=2)) == 3
244-
assert (
245-
window.count_valid(
246-
since=UNIX_EPOCH + timedelta(seconds=1),
247-
until=UNIX_EPOCH + timedelta(seconds=1),
248-
)
249-
== 1
261+
assert_valid_and_covered_counts(
262+
since=UNIX_EPOCH + timedelta(seconds=1), expected=9
250263
)
251-
assert (
252-
window.count_valid(
253-
since=UNIX_EPOCH + timedelta(seconds=3),
254-
until=UNIX_EPOCH + timedelta(seconds=8),
255-
)
256-
== 6
264+
assert_valid_and_covered_counts(
265+
until=UNIX_EPOCH + timedelta(seconds=2), expected=3
257266
)
258-
assert (
259-
window.count_valid(
260-
since=UNIX_EPOCH + timedelta(seconds=8),
261-
until=UNIX_EPOCH + timedelta(seconds=3),
262-
)
263-
== 0
267+
assert_valid_and_covered_counts(
268+
since=UNIX_EPOCH + timedelta(seconds=1),
269+
until=UNIX_EPOCH + timedelta(seconds=1),
270+
expected=1,
271+
)
272+
assert_valid_and_covered_counts(
273+
since=UNIX_EPOCH + timedelta(seconds=3),
274+
until=UNIX_EPOCH + timedelta(seconds=8),
275+
expected=6,
276+
)
277+
assert_valid_and_covered_counts(
278+
since=UNIX_EPOCH + timedelta(seconds=8),
279+
until=UNIX_EPOCH + timedelta(seconds=3),
280+
expected=0,
264281
)
265282

266283
newest_ts = window.newest_timestamp
@@ -269,33 +286,80 @@ async def test_window_size() -> None:
269286

270287
await push_logical_meter_data(sender, range(5, 12), start_ts=newest_ts)
271288
assert window.capacity == 10, "Wrong window capacity"
272-
assert window.count_valid() == 10, "Window should be full"
273-
assert window.count_covered() == 10, "Window should be full"
289+
assert_valid_and_covered_counts(expected=10)
290+
291+
newest_ts = window.newest_timestamp
292+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=15)
293+
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=6)
294+
295+
assert_valid_and_covered_counts(
296+
since=UNIX_EPOCH + timedelta(seconds=1),
297+
until=UNIX_EPOCH + timedelta(seconds=5),
298+
expected=0,
299+
)
300+
assert_valid_and_covered_counts(
301+
since=UNIX_EPOCH + timedelta(seconds=3),
302+
until=UNIX_EPOCH + timedelta(seconds=8),
303+
expected=3,
304+
)
305+
assert_valid_and_covered_counts(
306+
since=UNIX_EPOCH + timedelta(seconds=6),
307+
until=UNIX_EPOCH + timedelta(seconds=20),
308+
expected=10,
309+
)
274310

275311
newest_ts = window.newest_timestamp
276312
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=15)
277313
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=6)
278314

279-
assert (
280-
window.count_valid(
281-
since=UNIX_EPOCH + timedelta(seconds=1),
282-
until=UNIX_EPOCH + timedelta(seconds=5),
283-
)
284-
== 0
315+
await push_logical_meter_data(
316+
sender, [3, 4, None, None, 10, 12, None], start_ts=newest_ts
317+
)
318+
319+
# After the last insertion, the moving window would look like this:
320+
#
321+
# +------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----+
322+
# | MovingWindow timestamp | | | | | | | | | | |
323+
# | (seconds after EPOCH) | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 |
324+
# |------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----|
325+
# | value in buffer | 8. | 9. | 10. | 3. | 4. | nan | nan | 10. | 12. | nan |
326+
# +------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----+
327+
328+
newest_ts = window.newest_timestamp
329+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=21)
330+
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=12)
331+
332+
assert_valid_and_covered_counts(
333+
expected_valid=7,
334+
expected_covered=10,
335+
)
336+
assert_valid_and_covered_counts(
337+
since=UNIX_EPOCH + timedelta(seconds=15),
338+
expected_valid=4,
339+
expected_covered=7,
340+
)
341+
assert_valid_and_covered_counts(
342+
until=UNIX_EPOCH + timedelta(seconds=19),
343+
expected_valid=6,
344+
expected_covered=8,
345+
)
346+
assert_valid_and_covered_counts(
347+
since=UNIX_EPOCH + timedelta(seconds=12),
348+
until=UNIX_EPOCH + timedelta(seconds=15),
349+
expected_valid=4,
350+
expected_covered=4,
285351
)
286-
assert (
287-
window.count_valid(
288-
since=UNIX_EPOCH + timedelta(seconds=3),
289-
until=UNIX_EPOCH + timedelta(seconds=8),
290-
)
291-
== 3
352+
assert_valid_and_covered_counts(
353+
since=UNIX_EPOCH + timedelta(seconds=17),
354+
until=UNIX_EPOCH + timedelta(seconds=18),
355+
expected_valid=0,
356+
expected_covered=2,
292357
)
293-
assert (
294-
window.count_valid(
295-
since=UNIX_EPOCH + timedelta(seconds=6),
296-
until=UNIX_EPOCH + timedelta(seconds=20),
297-
)
298-
== 10
358+
assert_valid_and_covered_counts(
359+
since=UNIX_EPOCH + timedelta(seconds=16),
360+
until=UNIX_EPOCH + timedelta(seconds=20),
361+
expected_valid=3,
362+
expected_covered=5,
299363
)
300364

301365

0 commit comments

Comments
 (0)