From e47baf4d6c2c08bfe6e454c25ca8df852c5981b1 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Wed, 23 Aug 2023 15:35:36 +0200 Subject: [PATCH 1/4] Add capacity property to moving window The capacity is the maximum number of values that the moving window can hold. Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- RELEASE_NOTES.md | 6 +++++- src/frequenz/sdk/timeseries/_moving_window.py | 13 +++++++++++++ tests/timeseries/test_moving_window.py | 12 ++++++++++-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 80d65c4f1..d1cdcc09f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -22,7 +22,11 @@ - A new class `Fuse` has been added to represent fuses. This class has a member variable `max_current` which represents the maximum current that can course through the fuse. If the current flowing through a fuse is greater than this limit, then the fuse will break the circuit. -- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`. + +- `MovingWindow` and `OrderedRingBuffer`: + - NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`. + - Provide access to `capacity` (maximum number of elements) in `MovingWindow`. + ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index 3c11e9edc..50536a11d 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -209,6 +209,19 @@ def sampling_period(self) -> timedelta: """ return self._sampling_period + @property + def capacity(self) -> int: + """ + Return the capacity of the MovingWindow. + + Capacity is the maximum number of samples that can be stored in the + MovingWindow. + + Returns: + The capacity of the MovingWindow. + """ + return self._buffer.maxlen + async def _run_impl(self) -> None: """Awaits samples from the receiver and updates the underlying ring buffer. diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index 4f37eea84..dac963fc9 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -125,8 +125,14 @@ async def test_window_size() -> None: """Test the size of the window.""" window, sender = init_moving_window(timedelta(seconds=5)) async with window: - await push_logical_meter_data(sender, range(0, 20)) - assert len(window) == 5 + assert window.capacity == 5, "Wrong window capacity" + assert len(window) == 0, "Window should be empty" + await push_logical_meter_data(sender, range(0, 2)) + assert window.capacity == 5, "Wrong window capacity" + assert len(window) == 2, "Window should be partially full" + await push_logical_meter_data(sender, range(2, 20)) + assert window.capacity == 5, "Wrong window capacity" + assert len(window) == 5, "Window should be full" # pylint: disable=redefined-outer-name @@ -146,6 +152,8 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None: input_sampling_period=input_sampling, resampler_config=resampler_config, ) as window: + assert window.capacity == window_size / output_sampling, "Wrong window capacity" + assert len(window) == 0, "Window should be empty at the beginning" stream_values = [4.0, 8.0, 2.0, 6.0, 5.0] * 100 for value in stream_values: timestamp = datetime.now(tz=timezone.utc) From f8862ddc057347828671f68162b118c18ec535a1 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Thu, 24 Aug 2023 10:27:35 +0200 Subject: [PATCH 2/4] Minor: Fix doc in test_moving_window Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- tests/timeseries/test_moving_window.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index dac963fc9..1ac96179e 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -40,7 +40,7 @@ async def push_logical_meter_data( ) -> None: """Push data in the passed sender to mock `LogicalMeter` behaviour. - Starting with the First of January 2023. + Starting with UNIX_EPOCH. Args: sender: Sender for pushing resampled samples to the `MovingWindow`. From 945ec5e790801503f4931877d9f16a0e0746beda Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Tue, 5 Sep 2023 22:03:19 +0200 Subject: [PATCH 3/4] Add oldest and newest timestamp to ring buffer Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- .../sdk/timeseries/_ringbuffer/buffer.py | 31 ++++++++++++++++++- tests/timeseries/test_ringbuffer.py | 24 +++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py index 5e0c2721e..b0ee7b010 100644 --- a/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py +++ b/src/frequenz/sdk/timeseries/_ringbuffer/buffer.py @@ -190,10 +190,39 @@ def time_bound_newest(self) -> datetime: Return the time bounds of the ring buffer. Returns: - The timestamp of the newest sample of the ring buffer. + The timestamp of the newest sample of the ring buffer + or None if the buffer is empty. """ return self._datetime_newest + @property + def oldest_timestamp(self) -> datetime | None: + """Return the oldest timestamp in the buffer. + + Returns: + The oldest timestamp in the buffer + or None if the buffer is empty. + """ + if len(self) == 0: + return None + + if self.is_missing(self.time_bound_oldest): + return min(g.end for g in self.gaps) + + return self.time_bound_oldest + + @property + def newest_timestamp(self) -> datetime | None: + """Return the newest timestamp in the buffer. + + Returns: + The newest timestamp in the buffer. + """ + if len(self) == 0: + return None + + return self.time_bound_newest + def datetime_to_index( self, timestamp: datetime, allow_outside_range: bool = False ) -> int: diff --git a/tests/timeseries/test_ringbuffer.py b/tests/timeseries/test_ringbuffer.py index 11ddae4a3..24ebbc618 100644 --- a/tests/timeseries/test_ringbuffer.py +++ b/tests/timeseries/test_ringbuffer.py @@ -204,51 +204,73 @@ def dt(i: int) -> datetime: # pylint: disable=invalid-name return datetime.fromtimestamp(i, tz=timezone.utc) -def test_gaps() -> None: +def test_gaps() -> None: # pylint: disable=too-many-statements """Test gap treatment in ordered ring buffer.""" buffer = OrderedRingBuffer([0.0] * 5, ONE_SECOND) + assert buffer.oldest_timestamp is None + assert buffer.newest_timestamp is None assert len(buffer) == 0 assert len(buffer.gaps) == 0 buffer.update(Sample(dt(0), Quantity(0))) + assert buffer.oldest_timestamp == dt(0) + assert buffer.newest_timestamp == dt(0) assert len(buffer) == 1 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(6), Quantity(0))) + assert buffer.oldest_timestamp == dt(6) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 1 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(2), Quantity(2))) buffer.update(Sample(dt(3), Quantity(3))) buffer.update(Sample(dt(4), Quantity(4))) + assert buffer.oldest_timestamp == dt(2) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 4 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(3), None)) + assert buffer.oldest_timestamp == dt(2) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 3 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(3), Quantity(np.nan))) + assert buffer.oldest_timestamp == dt(2) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 3 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(2), Quantity(np.nan))) + assert buffer.oldest_timestamp == dt(4) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 2 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(3), Quantity(3))) + assert buffer.oldest_timestamp == dt(3) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 3 assert len(buffer.gaps) == 2 buffer.update(Sample(dt(2), Quantity(2))) + assert buffer.oldest_timestamp == dt(2) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 4 assert len(buffer.gaps) == 1 buffer.update(Sample(dt(5), Quantity(5))) + assert buffer.oldest_timestamp == dt(2) + assert buffer.newest_timestamp == dt(6) assert len(buffer) == 5 assert len(buffer.gaps) == 0 buffer.update(Sample(dt(99), None)) + assert buffer.oldest_timestamp == dt(95) # bug: should be None + assert buffer.newest_timestamp == dt(99) # bug: should be None assert len(buffer) == 4 # bug: should be 0 (whole range gap) assert len(buffer.gaps) == 1 From 1c4acf799d3b0c79a9832e0be2fbcf479d3e41ce Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Wed, 6 Sep 2023 14:03:36 +0200 Subject: [PATCH 4/4] Expose oldest and newest timestamp in moving window Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- RELEASE_NOTES.md | 2 +- src/frequenz/sdk/timeseries/_moving_window.py | 20 +++++++++++++++++++ tests/timeseries/test_moving_window.py | 17 ++++++++++++++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d1cdcc09f..ca827bc98 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -22,10 +22,10 @@ - A new class `Fuse` has been added to represent fuses. This class has a member variable `max_current` which represents the maximum current that can course through the fuse. If the current flowing through a fuse is greater than this limit, then the fuse will break the circuit. - - `MovingWindow` and `OrderedRingBuffer`: - NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`. - Provide access to `capacity` (maximum number of elements) in `MovingWindow`. + - Methods to retrieve oldest and newest timestamp of valid samples are added to both. ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/_moving_window.py b/src/frequenz/sdk/timeseries/_moving_window.py index 50536a11d..0e675d2fa 100644 --- a/src/frequenz/sdk/timeseries/_moving_window.py +++ b/src/frequenz/sdk/timeseries/_moving_window.py @@ -209,6 +209,26 @@ def sampling_period(self) -> timedelta: """ return self._sampling_period + @property + def oldest_timestamp(self) -> datetime | None: + """ + Return the oldest timestamp of the MovingWindow. + + Returns: + The oldest timestamp of the MovingWindow or None if the buffer is empty. + """ + return self._buffer.oldest_timestamp + + @property + def newest_timestamp(self) -> datetime | None: + """ + Return the newest timestamp of the MovingWindow. + + Returns: + The newest timestamp of the MovingWindow or None if the buffer is empty. + """ + return self._buffer.newest_timestamp + @property def capacity(self) -> int: """ diff --git a/tests/timeseries/test_moving_window.py b/tests/timeseries/test_moving_window.py index 1ac96179e..ff984e895 100644 --- a/tests/timeseries/test_moving_window.py +++ b/tests/timeseries/test_moving_window.py @@ -36,7 +36,9 @@ def fake_time() -> Iterator[time_machine.Coordinates]: async def push_logical_meter_data( - sender: Sender[Sample[Quantity]], test_seq: Sequence[float] + sender: Sender[Sample[Quantity]], + test_seq: Sequence[float], + start_ts: datetime = UNIX_EPOCH, ) -> None: """Push data in the passed sender to mock `LogicalMeter` behaviour. @@ -45,8 +47,8 @@ async def push_logical_meter_data( Args: sender: Sender for pushing resampled samples to the `MovingWindow`. test_seq: The Sequence that is pushed into the `MovingWindow`. + start_ts: The start timestamp of the `MovingWindow`. """ - start_ts: datetime = UNIX_EPOCH for i, j in zip(test_seq, range(0, len(test_seq))): timestamp = start_ts + timedelta(seconds=j) await sender.send(Sample(timestamp, Quantity(float(i)))) @@ -165,3 +167,14 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None: assert len(window) == window_size / output_sampling for value in window: # type: ignore assert 4.9 < value < 5.1 + + +async def test_timestamps() -> None: + """Test indexing a window by timestamp.""" + window, sender = init_moving_window(timedelta(seconds=5)) + async with window: + await push_logical_meter_data( + sender, [1, 2], start_ts=UNIX_EPOCH + timedelta(seconds=1) + ) + assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=1) + assert window.newest_timestamp == UNIX_EPOCH + timedelta(seconds=2)