Skip to content

Commit 9e8b167

Browse files
Add capacity, oldest and newest timestamp to moving window (#598)
The capacity of the moving window is exposed, which is the maximum number of values that the moving window can hold. Also the oldest and newest timestamps are exposed by the moving window.
2 parents 4c8e7ae + 1c4acf7 commit 9e8b167

File tree

5 files changed

+117
-8
lines changed

5 files changed

+117
-8
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
- A new class `Fuse` has been added to represent fuses. This class has a member variable `max_current` which represents the maximum current that can course through the fuse. If the current flowing through a fuse is greater than this limit, then the fuse will break the circuit.
2424

25-
- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
25+
- `MovingWindow` and `OrderedRingBuffer`:
26+
- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
27+
- Provide access to `capacity` (maximum number of elements) in `MovingWindow`.
28+
- Methods to retrieve oldest and newest timestamp of valid samples are added to both.
29+
2630

2731
- Now when printing `FormulaEngine` for debugging purposes the the formula will be shown in infix notation, which should be easier to read.
2832

src/frequenz/sdk/timeseries/_moving_window.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,39 @@ def sampling_period(self) -> timedelta:
208208
"""
209209
return self._sampling_period
210210

211+
@property
212+
def oldest_timestamp(self) -> datetime | None:
213+
"""
214+
Return the oldest timestamp of the MovingWindow.
215+
216+
Returns:
217+
The oldest timestamp of the MovingWindow or None if the buffer is empty.
218+
"""
219+
return self._buffer.oldest_timestamp
220+
221+
@property
222+
def newest_timestamp(self) -> datetime | None:
223+
"""
224+
Return the newest timestamp of the MovingWindow.
225+
226+
Returns:
227+
The newest timestamp of the MovingWindow or None if the buffer is empty.
228+
"""
229+
return self._buffer.newest_timestamp
230+
231+
@property
232+
def capacity(self) -> int:
233+
"""
234+
Return the capacity of the MovingWindow.
235+
236+
Capacity is the maximum number of samples that can be stored in the
237+
MovingWindow.
238+
239+
Returns:
240+
The capacity of the MovingWindow.
241+
"""
242+
return self._buffer.maxlen
243+
211244
async def _run_impl(self) -> None:
212245
"""Awaits samples from the receiver and updates the underlying ring buffer.
213246

src/frequenz/sdk/timeseries/_ringbuffer/buffer.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,39 @@ def time_bound_newest(self) -> datetime:
189189
Return the time bounds of the ring buffer.
190190
191191
Returns:
192-
The timestamp of the newest sample of the ring buffer.
192+
The timestamp of the newest sample of the ring buffer
193+
or None if the buffer is empty.
193194
"""
194195
return self._datetime_newest
195196

197+
@property
198+
def oldest_timestamp(self) -> datetime | None:
199+
"""Return the oldest timestamp in the buffer.
200+
201+
Returns:
202+
The oldest timestamp in the buffer
203+
or None if the buffer is empty.
204+
"""
205+
if len(self) == 0:
206+
return None
207+
208+
if self.is_missing(self.time_bound_oldest):
209+
return min(g.end for g in self.gaps)
210+
211+
return self.time_bound_oldest
212+
213+
@property
214+
def newest_timestamp(self) -> datetime | None:
215+
"""Return the newest timestamp in the buffer.
216+
217+
Returns:
218+
The newest timestamp in the buffer.
219+
"""
220+
if len(self) == 0:
221+
return None
222+
223+
return self.time_bound_newest
224+
196225
def datetime_to_index(
197226
self, timestamp: datetime, allow_outside_range: bool = False
198227
) -> int:

tests/timeseries/test_moving_window.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@ def event_loop() -> Iterator[async_solipsism.EventLoop]:
2929

3030

3131
async def push_logical_meter_data(
32-
sender: Sender[Sample[Quantity]], test_seq: Sequence[float]
32+
sender: Sender[Sample[Quantity]],
33+
test_seq: Sequence[float],
34+
start_ts: datetime = UNIX_EPOCH,
3335
) -> None:
3436
"""Push data in the passed sender to mock `LogicalMeter` behaviour.
3537
36-
Starting with the First of January 2023.
38+
Starting with UNIX_EPOCH.
3739
3840
Args:
3941
sender: Sender for pushing resampled samples to the `MovingWindow`.
4042
test_seq: The Sequence that is pushed into the `MovingWindow`.
43+
start_ts: The start timestamp of the `MovingWindow`.
4144
"""
42-
start_ts: datetime = UNIX_EPOCH
4345
for i, j in zip(test_seq, range(0, len(test_seq))):
4446
timestamp = start_ts + timedelta(seconds=j)
4547
await sender.send(Sample(timestamp, Quantity(float(i))))
@@ -118,8 +120,14 @@ async def test_window_size() -> None:
118120
"""Test the size of the window."""
119121
window, sender = init_moving_window(timedelta(seconds=5))
120122
async with window:
121-
await push_logical_meter_data(sender, range(0, 20))
122-
assert len(window) == 5
123+
assert window.capacity == 5, "Wrong window capacity"
124+
assert len(window) == 0, "Window should be empty"
125+
await push_logical_meter_data(sender, range(0, 2))
126+
assert window.capacity == 5, "Wrong window capacity"
127+
assert len(window) == 2, "Window should be partially full"
128+
await push_logical_meter_data(sender, range(2, 20))
129+
assert window.capacity == 5, "Wrong window capacity"
130+
assert len(window) == 5, "Window should be full"
123131

124132

125133
# pylint: disable=redefined-outer-name
@@ -139,6 +147,8 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
139147
input_sampling_period=input_sampling,
140148
resampler_config=resampler_config,
141149
) as window:
150+
assert window.capacity == window_size / output_sampling, "Wrong window capacity"
151+
assert len(window) == 0, "Window should be empty at the beginning"
142152
stream_values = [4.0, 8.0, 2.0, 6.0, 5.0] * 100
143153
for value in stream_values:
144154
timestamp = datetime.now(tz=timezone.utc)
@@ -150,3 +160,14 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
150160
assert len(window) == window_size / output_sampling
151161
for value in window: # type: ignore
152162
assert 4.9 < value < 5.1
163+
164+
165+
async def test_timestamps() -> None:
166+
"""Test indexing a window by timestamp."""
167+
window, sender = init_moving_window(timedelta(seconds=5))
168+
async with window:
169+
await push_logical_meter_data(
170+
sender, [1, 2], start_ts=UNIX_EPOCH + timedelta(seconds=1)
171+
)
172+
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=1)
173+
assert window.newest_timestamp == UNIX_EPOCH + timedelta(seconds=2)

tests/timeseries/test_ringbuffer.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,51 +203,73 @@ def dt(i: int) -> datetime: # pylint: disable=invalid-name
203203
return datetime.fromtimestamp(i, tz=timezone.utc)
204204

205205

206-
def test_gaps() -> None:
206+
def test_gaps() -> None: # pylint: disable=too-many-statements
207207
"""Test gap treatment in ordered ring buffer."""
208208
buffer = OrderedRingBuffer([0.0] * 5, ONE_SECOND)
209+
assert buffer.oldest_timestamp is None
210+
assert buffer.newest_timestamp is None
209211
assert len(buffer) == 0
210212
assert len(buffer.gaps) == 0
211213

212214
buffer.update(Sample(dt(0), Quantity(0)))
215+
assert buffer.oldest_timestamp == dt(0)
216+
assert buffer.newest_timestamp == dt(0)
213217
assert len(buffer) == 1
214218
assert len(buffer.gaps) == 1
215219

216220
buffer.update(Sample(dt(6), Quantity(0)))
221+
assert buffer.oldest_timestamp == dt(6)
222+
assert buffer.newest_timestamp == dt(6)
217223
assert len(buffer) == 1
218224
assert len(buffer.gaps) == 1
219225

220226
buffer.update(Sample(dt(2), Quantity(2)))
221227
buffer.update(Sample(dt(3), Quantity(3)))
222228
buffer.update(Sample(dt(4), Quantity(4)))
229+
assert buffer.oldest_timestamp == dt(2)
230+
assert buffer.newest_timestamp == dt(6)
223231
assert len(buffer) == 4
224232
assert len(buffer.gaps) == 1
225233

226234
buffer.update(Sample(dt(3), None))
235+
assert buffer.oldest_timestamp == dt(2)
236+
assert buffer.newest_timestamp == dt(6)
227237
assert len(buffer) == 3
228238
assert len(buffer.gaps) == 2
229239

230240
buffer.update(Sample(dt(3), Quantity(np.nan)))
241+
assert buffer.oldest_timestamp == dt(2)
242+
assert buffer.newest_timestamp == dt(6)
231243
assert len(buffer) == 3
232244
assert len(buffer.gaps) == 2
233245

234246
buffer.update(Sample(dt(2), Quantity(np.nan)))
247+
assert buffer.oldest_timestamp == dt(4)
248+
assert buffer.newest_timestamp == dt(6)
235249
assert len(buffer) == 2
236250
assert len(buffer.gaps) == 2
237251

238252
buffer.update(Sample(dt(3), Quantity(3)))
253+
assert buffer.oldest_timestamp == dt(3)
254+
assert buffer.newest_timestamp == dt(6)
239255
assert len(buffer) == 3
240256
assert len(buffer.gaps) == 2
241257

242258
buffer.update(Sample(dt(2), Quantity(2)))
259+
assert buffer.oldest_timestamp == dt(2)
260+
assert buffer.newest_timestamp == dt(6)
243261
assert len(buffer) == 4
244262
assert len(buffer.gaps) == 1
245263

246264
buffer.update(Sample(dt(5), Quantity(5)))
265+
assert buffer.oldest_timestamp == dt(2)
266+
assert buffer.newest_timestamp == dt(6)
247267
assert len(buffer) == 5
248268
assert len(buffer.gaps) == 0
249269

250270
buffer.update(Sample(dt(99), None))
271+
assert buffer.oldest_timestamp == dt(95) # bug: should be None
272+
assert buffer.newest_timestamp == dt(99) # bug: should be None
251273
assert len(buffer) == 4 # bug: should be 0 (whole range gap)
252274
assert len(buffer.gaps) == 1
253275

0 commit comments

Comments
 (0)