Skip to content

Commit 329738e

Browse files
committed
[MovingWindow] Accept a time range in the count_valid method
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent f208b10 commit 329738e

File tree

3 files changed

+117
-18
lines changed

3 files changed

+117
-18
lines changed

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,14 +355,24 @@ async def sink_buffer(sample: Sample[Quantity]) -> None:
355355
asyncio.create_task(self._resampler.resample(), name="resample")
356356
)
357357

358-
def count_valid(self) -> int:
359-
"""
360-
Count the number of valid samples in this `MovingWindow`.
358+
def count_valid(
359+
self, *, since: datetime | None = None, until: datetime | None = None
360+
) -> int:
361+
"""Count the number of valid samples in this `MovingWindow`.
362+
363+
If `since` and `until` are provided, the count is limited to the samples between
364+
(and including) the given timestamps.
365+
366+
Args:
367+
since: The timestamp from which to start counting. If `None`, the oldest
368+
timestamp of the buffer is used.
369+
until: The timestamp until (and including) which to count. If `None`, the
370+
newest timestamp of the buffer is used.
361371
362372
Returns:
363373
The number of valid samples in this `MovingWindow`.
364374
"""
365-
return self._buffer.count_valid()
375+
return self._buffer.count_valid(since=since, until=until)
366376

367377
def count_covered(self) -> int:
368378
"""Count the number of samples that are covered by the oldest and newest valid samples.

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -678,31 +678,48 @@ def count_covered(self) -> int:
678678
// self._sampling_period.total_seconds()
679679
)
680680

681-
def count_valid(self) -> int:
682-
"""Count the number of valid items that this buffer currently holds.
681+
def count_valid(
682+
self, *, since: datetime | None = None, until: datetime | None = None
683+
) -> int:
684+
"""Count the number of valid items in this buffer.
685+
686+
If `since` and `until` are provided, the count is limited to the items between
687+
(and including) the given timestamps.
688+
689+
Args:
690+
since: The timestamp from which to start counting. If `None`, the oldest
691+
timestamp in the buffer is used.
692+
until: The timestamp until (and including) which to count. If `None`, the
693+
newest timestamp in the buffer is used.
683694
684695
Returns:
685696
The number of valid items in this buffer.
686697
"""
687-
if self._timestamp_newest == self._TIMESTAMP_MIN:
698+
if since is None or since < self._timestamp_oldest:
699+
since = self._timestamp_oldest
700+
if until is None or until > self._timestamp_newest:
701+
until = self._timestamp_newest
702+
703+
if until == self._TIMESTAMP_MIN or until < since:
688704
return 0
689705

690706
# Sum of all elements in the gap ranges
691707
sum_missing_entries = max(
692708
0,
693709
sum(
694710
(
695-
gap.end
711+
min(gap.end, until + self._sampling_period)
696712
# Don't look further back than oldest timestamp
697-
- max(gap.start, self._timestamp_oldest)
713+
- max(gap.start, since)
698714
)
699715
// self._sampling_period
700716
for gap in self._gaps
717+
if gap.start <= until and gap.end >= since
701718
),
702719
)
703720

704-
start_pos = self.to_internal_index(self._timestamp_oldest)
705-
end_pos = self.to_internal_index(self._timestamp_newest)
721+
start_pos = self.to_internal_index(since)
722+
end_pos = self.to_internal_index(until)
706723

707724
if end_pos < start_pos:
708725
return len(self._buffer) - start_pos + end_pos + 1 - sum_missing_entries

tests/timeseries/test_moving_window.py

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,19 +212,91 @@ async def test_access_empty_window() -> None:
212212

213213
async def test_window_size() -> None:
214214
"""Test the size of the window."""
215-
window, sender = init_moving_window(timedelta(seconds=5))
215+
window, sender = init_moving_window(timedelta(seconds=10))
216216
async with window:
217-
assert window.capacity == 5, "Wrong window capacity"
217+
assert window.capacity == 10, "Wrong window capacity"
218218
assert window.count_valid() == 0, "Window should be empty"
219219
assert window.count_covered() == 0, "Window should be empty"
220+
220221
await push_logical_meter_data(sender, range(0, 2))
221-
assert window.capacity == 5, "Wrong window capacity"
222+
assert window.capacity == 10, "Wrong window capacity"
222223
assert window.count_valid() == 2, "Window should be partially full"
223224
assert window.count_covered() == 2, "Window should be partially full"
224-
await push_logical_meter_data(sender, range(2, 20))
225-
assert window.capacity == 5, "Wrong window capacity"
226-
assert window.count_valid() == 5, "Window should be full"
227-
assert window.count_covered() == 5, "Window should be full"
225+
226+
newest_ts = window.newest_timestamp
227+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=1)
228+
229+
await push_logical_meter_data(sender, range(2, 5), start_ts=newest_ts)
230+
assert window.capacity == 10, "Wrong window capacity"
231+
assert window.count_valid() == 4, "Window should be partially full"
232+
assert window.count_covered() == 4, "Window should be partially full"
233+
234+
newest_ts = window.newest_timestamp
235+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=3)
236+
237+
await push_logical_meter_data(sender, range(5, 12), start_ts=newest_ts)
238+
assert window.capacity == 10, "Wrong window capacity"
239+
assert window.count_valid() == 10, "Window should be full"
240+
assert window.count_covered() == 10, "Window should be full"
241+
242+
assert window.count_valid(since=UNIX_EPOCH + timedelta(seconds=1)) == 9
243+
assert window.count_valid(until=UNIX_EPOCH + timedelta(seconds=2)) == 3
244+
assert (
245+
window.count_valid(
246+
since=UNIX_EPOCH + timedelta(seconds=1),
247+
until=UNIX_EPOCH + timedelta(seconds=1),
248+
)
249+
== 1
250+
)
251+
assert (
252+
window.count_valid(
253+
since=UNIX_EPOCH + timedelta(seconds=3),
254+
until=UNIX_EPOCH + timedelta(seconds=8),
255+
)
256+
== 6
257+
)
258+
assert (
259+
window.count_valid(
260+
since=UNIX_EPOCH + timedelta(seconds=8),
261+
until=UNIX_EPOCH + timedelta(seconds=3),
262+
)
263+
== 0
264+
)
265+
266+
newest_ts = window.newest_timestamp
267+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=9)
268+
assert window.oldest_timestamp == UNIX_EPOCH
269+
270+
await push_logical_meter_data(sender, range(5, 12), start_ts=newest_ts)
271+
assert window.capacity == 10, "Wrong window capacity"
272+
assert window.count_valid() == 10, "Window should be full"
273+
assert window.count_covered() == 10, "Window should be full"
274+
275+
newest_ts = window.newest_timestamp
276+
assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=15)
277+
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=6)
278+
279+
assert (
280+
window.count_valid(
281+
since=UNIX_EPOCH + timedelta(seconds=1),
282+
until=UNIX_EPOCH + timedelta(seconds=5),
283+
)
284+
== 0
285+
)
286+
assert (
287+
window.count_valid(
288+
since=UNIX_EPOCH + timedelta(seconds=3),
289+
until=UNIX_EPOCH + timedelta(seconds=8),
290+
)
291+
== 3
292+
)
293+
assert (
294+
window.count_valid(
295+
since=UNIX_EPOCH + timedelta(seconds=6),
296+
until=UNIX_EPOCH + timedelta(seconds=20),
297+
)
298+
== 10
299+
)
228300

229301

230302
# pylint: disable=redefined-outer-name

0 commit comments

Comments
 (0)