|
15 | 15 | from frequenz.sdk.timeseries import Sample |
16 | 16 | from frequenz.sdk.timeseries._quantities import Quantity |
17 | 17 | from frequenz.sdk.timeseries._ringbuffer import Gap, OrderedRingBuffer |
| 18 | +from frequenz.sdk.timeseries._ringbuffer.buffer import FloatArray |
18 | 19 |
|
19 | 20 | FIVE_MINUTES = timedelta(minutes=5) |
20 | 21 | ONE_MINUTE = timedelta(minutes=1) |
@@ -494,3 +495,30 @@ def test_delete_oudated_gap() -> None: |
494 | 495 | buffer.update(Sample(datetime.fromtimestamp(202, tz=timezone.utc), Quantity(2))) |
495 | 496 |
|
496 | 497 | assert len(buffer.gaps) == 0 |
| 498 | + |
| 499 | + |
| 500 | +def get_orb(data: FloatArray) -> OrderedRingBuffer[FloatArray]: |
| 501 | + """Get OrderedRingBuffer with data. |
| 502 | +
|
| 503 | + Args: |
| 504 | + data: Data to fill the buffer with. |
| 505 | +
|
| 506 | + Returns: |
| 507 | + OrderedRingBuffer with data. |
| 508 | + """ |
| 509 | + buffer = OrderedRingBuffer(data, ONE_SECOND) |
| 510 | + for i, d in enumerate(data): # pylint: disable=invalid-name |
| 511 | + buffer.update(Sample(dt(i), Quantity(d) if d is not None else None)) |
| 512 | + return buffer |
| 513 | + |
| 514 | + |
| 515 | +def test_window() -> None: |
| 516 | + """Test the window function.""" |
| 517 | + buffer = get_orb(np.array([0, None, 2, 3, 4])) |
| 518 | + win = buffer.window(dt(0), dt(3), force_copy=False) |
| 519 | + assert [0, np.nan, 2] == list(win) |
| 520 | + buffer._buffer[1] = 1 # pylint: disable=protected-access |
| 521 | + # Test whether the window is a view or a copy |
| 522 | + assert [0, 1, 2] == list(win) # NB: second element should be NaN according to docs |
| 523 | + win = buffer.window(dt(0), dt(3), force_copy=False) |
| 524 | + assert [0, 1, 2] == list(win) |
0 commit comments