diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 2926cc45e..c48592503 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +- The `MovingWindow` now has an async `wait_for_samples` method that waits for a given number of samples to become available in the moving window and then returns. ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index 4ed8e52ee..e5e31f0c7 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -190,6 +190,8 @@ def __init__( # pylint: disable=too-many-arguments align_to=align_to, ) + self._condition_new_sample = asyncio.Condition() + def start(self) -> None: """Start the MovingWindow. @@ -318,6 +320,44 @@ def window( start, end, force_copy=force_copy, fill_value=fill_value ) + async def wait_for_samples(self, n: int) -> None: + """Wait until the next `n` samples are available in the MovingWindow. + + This function returns after `n` new samples are available in the MovingWindow, + without considering whether the new samples are valid. The validity of the + samples can be verified by calling the + [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method. + + Args: + n: The number of samples to wait for. + + Raises: + ValueError: If `n` is less than or equal to 0 or greater than the capacity + of the MovingWindow. + """ + if n == 0: + return + if n < 0: + raise ValueError("The number of samples to wait for must be 0 or greater.") + if n > self.capacity: + raise ValueError( + "The number of samples to wait for must be less than or equal to the " + + f"capacity of the MovingWindow ({self.capacity})." + ) + start_timestamp = ( + # Start from the next expected timestamp. + self.newest_timestamp + self.sampling_period + if self.newest_timestamp is not None + else None + ) + while True: + async with self._condition_new_sample: + # Every time a new sample is received, this condition gets notified and + # will wake up. + _ = await self._condition_new_sample.wait() + if self.count_covered(since=start_timestamp) >= n: + return + async def _run_impl(self) -> None: """Awaits samples from the receiver and updates the underlying ring buffer. @@ -331,6 +371,9 @@ async def _run_impl(self) -> None: await self._resampler_sender.send(sample) else: self._buffer.update(sample) + async with self._condition_new_sample: + # Wake up all coroutines waiting for new samples. + self._condition_new_sample.notify_all() except asyncio.CancelledError: _logger.info("MovingWindow task has been cancelled.") @@ -343,8 +386,10 @@ def _configure_resampler(self) -> None: assert self._resampler is not None async def sink_buffer(sample: Sample[Quantity]) -> None: - if sample.value is not None: - self._buffer.update(sample) + self._buffer.update(sample) + async with self._condition_new_sample: + # Wake up all coroutines waiting for new samples. + self._condition_new_sample.notify_all() resampler_channel = Broadcast[Sample[Quantity]](name="average") self._resampler_sender = resampler_channel.new_sender() @@ -355,23 +400,44 @@ async def sink_buffer(sample: Sample[Quantity]) -> None: asyncio.create_task(self._resampler.resample(), name="resample") ) - def count_valid(self) -> int: - """ - Count the number of valid samples in this `MovingWindow`. + def count_valid( + self, *, since: datetime | None = None, until: datetime | None = None + ) -> int: + """Count the number of valid samples in this `MovingWindow`. + + If `since` and `until` are provided, the count is limited to the samples between + (and including) the given timestamps. + + Args: + since: The timestamp from which to start counting. If `None`, the oldest + timestamp of the buffer is used. + until: The timestamp until (and including) which to count. If `None`, the + newest timestamp of the buffer is used. Returns: The number of valid samples in this `MovingWindow`. """ - return self._buffer.count_valid() + return self._buffer.count_valid(since=since, until=until) - def count_covered(self) -> int: + def count_covered( + self, *, since: datetime | None = None, until: datetime | None = None + ) -> int: """Count the number of samples that are covered by the oldest and newest valid samples. + If `since` and `until` are provided, the count is limited to the samples between + (and including) the given timestamps. + + Args: + since: The timestamp from which to start counting. If `None`, the oldest + timestamp of the buffer is used. + until: The timestamp until (and including) which to count. If `None`, the + newest timestamp of the buffer is used. + Returns: The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered. """ - return self._buffer.count_covered() + return self._buffer.count_covered(since=since, until=until) @overload def __getitem__(self, key: SupportsIndex) -> float: diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index 6f81498fd..d4f9777b2 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -651,9 +651,20 @@ def __getitem__(self, index_or_slice: SupportsIndex | slice) -> float | FloatArr """ return self._buffer.__getitem__(index_or_slice) - def _covered_time_range(self) -> timedelta: + def _covered_time_range( + self, since: datetime | None = None, until: datetime | None = None + ) -> timedelta: """Return the time range that is covered by the oldest and newest valid samples. + If `since` and `until` are provided, the time range is limited to the items + between (and including) the given timestamps. + + Args: + since: The timestamp from which to start counting. If `None`, the oldest + timestamp in the buffer is used. + until: The timestamp until (and including) which to count. If `None`, the + newest timestamp in the buffer is used. + Returns: The time range between the oldest and newest valid samples or 0 if there are is no time range covered. @@ -664,27 +675,63 @@ def _covered_time_range(self) -> timedelta: assert ( self.newest_timestamp is not None ), "Newest timestamp cannot be None here." - return self.newest_timestamp - self.oldest_timestamp + self._sampling_period - def count_covered(self) -> int: + if since is None or since < self.oldest_timestamp: + since = self.oldest_timestamp + if until is None or until > self.newest_timestamp: + until = self.newest_timestamp + + if until < since: + return timedelta(0) + + return until - since + self._sampling_period + + def count_covered( + self, *, since: datetime | None = None, until: datetime | None = None + ) -> int: """Count the number of samples that are covered by the oldest and newest valid samples. + If `since` and `until` are provided, the count is limited to the items between + (and including) the given timestamps. + + Args: + since: The timestamp from which to start counting. If `None`, the oldest + timestamp in the buffer is used. + until: The timestamp until (and including) which to count. If `None`, the + newest timestamp in the buffer is used. + Returns: The count of samples between the oldest and newest (inclusive) valid samples or 0 if there are is no time range covered. """ return int( - self._covered_time_range().total_seconds() + self._covered_time_range(since, until).total_seconds() // self._sampling_period.total_seconds() ) - def count_valid(self) -> int: - """Count the number of valid items that this buffer currently holds. + def count_valid( + self, *, since: datetime | None = None, until: datetime | None = None + ) -> int: + """Count the number of valid items in this buffer. + + If `since` and `until` are provided, the count is limited to the items between + (and including) the given timestamps. + + Args: + since: The timestamp from which to start counting. If `None`, the oldest + timestamp in the buffer is used. + until: The timestamp until (and including) which to count. If `None`, the + newest timestamp in the buffer is used. Returns: The number of valid items in this buffer. """ - if self._timestamp_newest == self._TIMESTAMP_MIN: + if since is None or since < self._timestamp_oldest: + since = self._timestamp_oldest + if until is None or until > self._timestamp_newest: + until = self._timestamp_newest + + if until == self._TIMESTAMP_MIN or until < since: return 0 # Sum of all elements in the gap ranges @@ -692,17 +739,18 @@ def count_valid(self) -> int: 0, sum( ( - gap.end + min(gap.end, until + self._sampling_period) # Don't look further back than oldest timestamp - - max(gap.start, self._timestamp_oldest) + - max(gap.start, since) ) // self._sampling_period for gap in self._gaps + if gap.start <= until and gap.end >= since ), ) - start_pos = self.to_internal_index(self._timestamp_oldest) - end_pos = self.to_internal_index(self._timestamp_newest) + start_pos = self.to_internal_index(since) + end_pos = self.to_internal_index(until) if end_pos < start_pos: return len(self._buffer) - start_pos + end_pos + 1 - sum_missing_entries diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index 3f0e4cd56..a1ddaf578 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -4,6 +4,7 @@ """Tests for the moving window.""" import asyncio +import re from collections.abc import Sequence from datetime import datetime, timedelta, timezone @@ -27,8 +28,9 @@ def event_loop_policy() -> async_solipsism.EventLoopPolicy: async def push_logical_meter_data( sender: Sender[Sample[Quantity]], - test_seq: Sequence[float], + test_seq: Sequence[float | None], start_ts: datetime = UNIX_EPOCH, + fake_time: time_machine.Coordinates | None = None, ) -> None: """Push data in the passed sender to mock `LogicalMeter` behaviour. @@ -38,21 +40,29 @@ async def push_logical_meter_data( sender: Sender for pushing resampled samples to the `MovingWindow`. test_seq: The Sequence that is pushed into the `MovingWindow`. start_ts: The start timestamp of the `MovingWindow`. + fake_time: The fake time object to shift the time. """ for i, j in zip(test_seq, range(0, len(test_seq))): timestamp = start_ts + timedelta(seconds=j) - await sender.send(Sample(timestamp, Quantity(float(i)))) + await sender.send( + Sample(timestamp, Quantity(float(i)) if i is not None else None) + ) + if fake_time is not None: + await asyncio.sleep(1.0) + fake_time.shift(1) await asyncio.sleep(0.0) def init_moving_window( size: timedelta, + resampler_config: ResamplerConfig | None = None, ) -> tuple[MovingWindow, Sender[Sample[Quantity]]]: """Initialize the moving window with given shape. Args: size: The size of the `MovingWindow` + resampler_config: The resampler configuration. Returns: tuple[MovingWindow, Sender[Sample]]: A pair of sender and `MovingWindow`. @@ -63,6 +73,7 @@ def init_moving_window( size=size, resampled_data_recv=lm_chan.new_receiver(), input_sampling_period=timedelta(seconds=1), + resampler_config=resampler_config, ) return window, lm_tx @@ -210,21 +221,298 @@ async def test_access_empty_window() -> None: _ = window[42] -async def test_window_size() -> None: +async def test_window_size() -> None: # pylint: disable=too-many-statements """Test the size of the window.""" - window, sender = init_moving_window(timedelta(seconds=5)) + window, sender = init_moving_window(timedelta(seconds=10)) async with window: - assert window.capacity == 5, "Wrong window capacity" + + def assert_valid_and_covered_counts( + *, + since: datetime | None = None, + until: datetime | None = None, + expected: int | None = None, + expected_valid: int | None = None, + expected_covered: int | None = None, + ) -> None: + if expected is not None: + assert window.count_valid(since=since, until=until) == expected + assert window.count_covered(since=since, until=until) == expected + return + + assert window.count_valid(since=since, until=until) == expected_valid + assert window.count_covered(since=since, until=until) == expected_covered + + assert window.capacity == 10, "Wrong window capacity" assert window.count_valid() == 0, "Window should be empty" assert window.count_covered() == 0, "Window should be empty" + await push_logical_meter_data(sender, range(0, 2)) - assert window.capacity == 5, "Wrong window capacity" + assert window.capacity == 10, "Wrong window capacity" assert window.count_valid() == 2, "Window should be partially full" assert window.count_covered() == 2, "Window should be partially full" - await push_logical_meter_data(sender, range(2, 20)) - assert window.capacity == 5, "Wrong window capacity" - assert window.count_valid() == 5, "Window should be full" - assert window.count_covered() == 5, "Window should be full" + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=1) + + await push_logical_meter_data(sender, range(2, 5), start_ts=newest_ts) + assert window.capacity == 10, "Wrong window capacity" + assert window.count_valid() == 4, "Window should be partially full" + assert window.count_covered() == 4, "Window should be partially full" + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=3) + + await push_logical_meter_data(sender, range(5, 12), start_ts=newest_ts) + assert window.capacity == 10, "Wrong window capacity" + assert window.count_valid() == 10, "Window should be full" + assert window.count_covered() == 10, "Window should be full" + + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=1), expected=9 + ) + assert_valid_and_covered_counts( + until=UNIX_EPOCH + timedelta(seconds=2), expected=3 + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=1), + until=UNIX_EPOCH + timedelta(seconds=1), + expected=1, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=3), + until=UNIX_EPOCH + timedelta(seconds=8), + expected=6, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=8), + until=UNIX_EPOCH + timedelta(seconds=3), + expected=0, + ) + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=9) + assert window.oldest_timestamp == UNIX_EPOCH + + await push_logical_meter_data(sender, range(5, 12), start_ts=newest_ts) + assert window.capacity == 10, "Wrong window capacity" + assert_valid_and_covered_counts(expected=10) + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=15) + assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=6) + + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=1), + until=UNIX_EPOCH + timedelta(seconds=5), + expected=0, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=3), + until=UNIX_EPOCH + timedelta(seconds=8), + expected=3, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=6), + until=UNIX_EPOCH + timedelta(seconds=20), + expected=10, + ) + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=15) + assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=6) + + await push_logical_meter_data( + sender, [3, 4, None, None, 10, 12, None], start_ts=newest_ts + ) + + # After the last insertion, the moving window would look like this: + # + # +------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----+ + # | MovingWindow timestamp | | | | | | | | | | | + # | (seconds after EPOCH) | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | + # |------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----| + # | value in buffer | 8. | 9. | 10. | 3. | 4. | nan | nan | 10. | 12. | nan | + # +------------------------+----+----+-----+----+----+-----+-----+-----+-----+-----+ + + newest_ts = window.newest_timestamp + assert newest_ts is not None and newest_ts == UNIX_EPOCH + timedelta(seconds=21) + assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=12) + + assert_valid_and_covered_counts( + expected_valid=7, + expected_covered=10, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=15), + expected_valid=4, + expected_covered=7, + ) + assert_valid_and_covered_counts( + until=UNIX_EPOCH + timedelta(seconds=19), + expected_valid=6, + expected_covered=8, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=12), + until=UNIX_EPOCH + timedelta(seconds=15), + expected_valid=4, + expected_covered=4, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=17), + until=UNIX_EPOCH + timedelta(seconds=18), + expected_valid=0, + expected_covered=2, + ) + assert_valid_and_covered_counts( + since=UNIX_EPOCH + timedelta(seconds=16), + until=UNIX_EPOCH + timedelta(seconds=20), + expected_valid=3, + expected_covered=5, + ) + + +async def test_wait_for_samples() -> None: + """Test waiting for samples in the window.""" + window, sender = init_moving_window(timedelta(seconds=10)) + async with window: + task = asyncio.create_task(window.wait_for_samples(5)) + await asyncio.sleep(0) + assert not task.done() + await push_logical_meter_data(sender, range(0, 5)) + await asyncio.sleep(0) + # After pushing 5 values, the `wait_for_samples` task should be done. + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(5)) + await asyncio.sleep(0) + await push_logical_meter_data( + sender, [1, 2, 3, 4], start_ts=UNIX_EPOCH + timedelta(seconds=5) + ) + await asyncio.sleep(0) + # The task should not be done yet, since we have only pushed 4 values. + assert not task.done() + + await push_logical_meter_data( + sender, [1], start_ts=UNIX_EPOCH + timedelta(seconds=9) + ) + await asyncio.sleep(0) + # After pushing the last value, the task should be done. + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(-1)) + with pytest.raises( + ValueError, + match=re.escape("The number of samples to wait for must be 0 or greater."), + ): + await task + + task = asyncio.create_task(window.wait_for_samples(20)) + with pytest.raises( + ValueError, + match=re.escape( + "The number of samples to wait for must be less than or equal to the " + + "capacity of the MovingWindow (10)." + ), + ): + await task + + task = asyncio.create_task(window.wait_for_samples(4)) + await asyncio.sleep(0) + await push_logical_meter_data( + sender, range(0, 10), start_ts=UNIX_EPOCH + timedelta(seconds=10) + ) + await asyncio.sleep(0) + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(10)) + await asyncio.sleep(0) + await push_logical_meter_data( + sender, range(0, 5), start_ts=UNIX_EPOCH + timedelta(seconds=20) + ) + await asyncio.sleep(0) + assert not task.done() + + await push_logical_meter_data( + sender, range(10, 15), start_ts=UNIX_EPOCH + timedelta(seconds=25) + ) + await asyncio.sleep(0) + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(5)) + await asyncio.sleep(0) + await push_logical_meter_data( + sender, [1, 2, None, 4, None], start_ts=UNIX_EPOCH + timedelta(seconds=30) + ) + await asyncio.sleep(0) + # `None` values *are* counted towards the number of samples to wait for. + assert task.done() + + +async def test_wait_for_samples_with_resampling( + fake_time: time_machine.Coordinates, +) -> None: + """Test waiting for samples in a moving window with resampling.""" + window, sender = init_moving_window( + timedelta(seconds=20), ResamplerConfig(resampling_period=timedelta(seconds=2)) + ) + async with window: + task = asyncio.create_task(window.wait_for_samples(3)) + await asyncio.sleep(0) + assert not task.done() + await push_logical_meter_data(sender, range(0, 7), fake_time=fake_time) + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(10)) + await push_logical_meter_data( + sender, + range(0, 11), + fake_time=fake_time, + start_ts=UNIX_EPOCH + timedelta(seconds=7), + ) + assert window.count_covered() == 8 + assert not task.done() + + await push_logical_meter_data( + sender, + range(0, 5), + fake_time=fake_time, + start_ts=UNIX_EPOCH + timedelta(seconds=18), + ) + assert window.count_covered() == 10 + assert not task.done() + + await push_logical_meter_data( + sender, + range(0, 6), + fake_time=fake_time, + start_ts=UNIX_EPOCH + timedelta(seconds=23), + ) + assert window.count_covered() == 10 + assert window.count_valid() == 10 + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(5)) + await push_logical_meter_data( + sender, + [1, 2, None, None, None, None, None, None, None, None], + fake_time=fake_time, + start_ts=UNIX_EPOCH + timedelta(seconds=29), + ) + assert window.count_covered() == 10 + assert window.count_valid() == 8 + assert task.done() + + task = asyncio.create_task(window.wait_for_samples(5)) + await push_logical_meter_data( + sender, + [None, 4, None, None, None, None, None, None, None, 5], + fake_time=fake_time, + start_ts=UNIX_EPOCH + timedelta(seconds=39), + ) + assert window.count_covered() == 10 + assert window.count_valid() == 7 + assert task.done() # pylint: disable=redefined-outer-name