44"""Tests for the moving window."""
55
66import asyncio
7+ import re
78from collections .abc import Sequence
89from datetime import datetime , timedelta , timezone
910
@@ -29,6 +30,7 @@ async def push_logical_meter_data(
2930 sender : Sender [Sample [Quantity ]],
3031 test_seq : Sequence [float | None ],
3132 start_ts : datetime = UNIX_EPOCH ,
33+ fake_time : time_machine .Coordinates | None = None ,
3234) -> None :
3335 """Push data in the passed sender to mock `LogicalMeter` behaviour.
3436
@@ -38,23 +40,29 @@ async def push_logical_meter_data(
3840 sender: Sender for pushing resampled samples to the `MovingWindow`.
3941 test_seq: The Sequence that is pushed into the `MovingWindow`.
4042 start_ts: The start timestamp of the `MovingWindow`.
43+ fake_time: The fake time object to shift the time.
4144 """
4245 for i , j in zip (test_seq , range (0 , len (test_seq ))):
4346 timestamp = start_ts + timedelta (seconds = j )
4447 await sender .send (
4548 Sample (timestamp , Quantity (float (i )) if i is not None else None )
4649 )
50+ if fake_time is not None :
51+ await asyncio .sleep (1.0 )
52+ fake_time .shift (1 )
4753
4854 await asyncio .sleep (0.0 )
4955
5056
5157def init_moving_window (
5258 size : timedelta ,
59+ resampler_config : ResamplerConfig | None = None ,
5360) -> tuple [MovingWindow , Sender [Sample [Quantity ]]]:
5461 """Initialize the moving window with given shape.
5562
5663 Args:
5764 size: The size of the `MovingWindow`
65+ resampler_config: The resampler configuration.
5866
5967 Returns:
6068 tuple[MovingWindow, Sender[Sample]]: A pair of sender and `MovingWindow`.
@@ -65,6 +73,7 @@ def init_moving_window(
6573 size = size ,
6674 resampled_data_recv = lm_chan .new_receiver (),
6775 input_sampling_period = timedelta (seconds = 1 ),
76+ resampler_config = resampler_config ,
6877 )
6978 return window , lm_tx
7079
@@ -363,6 +372,151 @@ def assert_valid_and_covered_counts(
363372 )
364373
365374
375+ async def test_wait_for_samples () -> None :
376+ """Test waiting for samples in the window."""
377+ window , sender = init_moving_window (timedelta (seconds = 10 ))
378+ async with window :
379+ task = asyncio .create_task (window .wait_for_samples (5 ))
380+ await asyncio .sleep (0 )
381+ assert not task .done ()
382+ await push_logical_meter_data (sender , range (0 , 5 ))
383+ await asyncio .sleep (0 )
384+ # After pushing 5 values, the `wait_for_samples` task should be done.
385+ assert task .done ()
386+
387+ task = asyncio .create_task (window .wait_for_samples (5 ))
388+ await asyncio .sleep (0 )
389+ await push_logical_meter_data (
390+ sender , [1 , 2 , 3 , 4 ], start_ts = UNIX_EPOCH + timedelta (seconds = 5 )
391+ )
392+ await asyncio .sleep (0 )
393+ # The task should not be done yet, since we have only pushed 4 values.
394+ assert not task .done ()
395+
396+ await push_logical_meter_data (
397+ sender , [1 ], start_ts = UNIX_EPOCH + timedelta (seconds = 9 )
398+ )
399+ await asyncio .sleep (0 )
400+ # After pushing the last value, the task should be done.
401+ assert task .done ()
402+
403+ task = asyncio .create_task (window .wait_for_samples (- 1 ))
404+ with pytest .raises (
405+ ValueError ,
406+ match = re .escape (
407+ "The number of samples to wait for must be greater than 0."
408+ ),
409+ ):
410+ await task
411+
412+ task = asyncio .create_task (window .wait_for_samples (20 ))
413+ with pytest .raises (
414+ ValueError ,
415+ match = re .escape (
416+ "The number of samples to wait for must be less than or equal to the "
417+ + "capacity of the MovingWindow (10)."
418+ ),
419+ ):
420+ await task
421+
422+ task = asyncio .create_task (window .wait_for_samples (4 ))
423+ await asyncio .sleep (0 )
424+ await push_logical_meter_data (
425+ sender , range (0 , 10 ), start_ts = UNIX_EPOCH + timedelta (seconds = 10 )
426+ )
427+ await asyncio .sleep (0 )
428+ assert task .done ()
429+
430+ task = asyncio .create_task (window .wait_for_samples (10 ))
431+ await asyncio .sleep (0 )
432+ await push_logical_meter_data (
433+ sender , range (0 , 5 ), start_ts = UNIX_EPOCH + timedelta (seconds = 20 )
434+ )
435+ await asyncio .sleep (0 )
436+ assert not task .done ()
437+
438+ await push_logical_meter_data (
439+ sender , range (10 , 15 ), start_ts = UNIX_EPOCH + timedelta (seconds = 25 )
440+ )
441+ await asyncio .sleep (0 )
442+ assert task .done ()
443+
444+ task = asyncio .create_task (window .wait_for_samples (5 ))
445+ await asyncio .sleep (0 )
446+ await push_logical_meter_data (
447+ sender , [1 , 2 , None , 4 , None ], start_ts = UNIX_EPOCH + timedelta (seconds = 30 )
448+ )
449+ await asyncio .sleep (0 )
450+ # `None` values *are* counted towards the number of samples to wait for.
451+ assert task .done ()
452+
453+
454+ async def test_wait_for_samples_with_resampling (
455+ fake_time : time_machine .Coordinates ,
456+ ) -> None :
457+ """Test waiting for samples in a moving window with resampling."""
458+ window , sender = init_moving_window (
459+ timedelta (seconds = 20 ), ResamplerConfig (resampling_period = timedelta (seconds = 2 ))
460+ )
461+ async with window :
462+ task = asyncio .create_task (window .wait_for_samples (3 ))
463+ await asyncio .sleep (0 )
464+ assert not task .done ()
465+ await push_logical_meter_data (sender , range (0 , 7 ), fake_time = fake_time )
466+ assert task .done ()
467+
468+ task = asyncio .create_task (window .wait_for_samples (10 ))
469+ await push_logical_meter_data (
470+ sender ,
471+ range (0 , 11 ),
472+ fake_time = fake_time ,
473+ start_ts = UNIX_EPOCH + timedelta (seconds = 7 ),
474+ )
475+ assert window .count_covered () == 8
476+ assert not task .done ()
477+
478+ await push_logical_meter_data (
479+ sender ,
480+ range (0 , 5 ),
481+ fake_time = fake_time ,
482+ start_ts = UNIX_EPOCH + timedelta (seconds = 18 ),
483+ )
484+ assert window .count_covered () == 10
485+ assert not task .done ()
486+
487+ await push_logical_meter_data (
488+ sender ,
489+ range (0 , 6 ),
490+ fake_time = fake_time ,
491+ start_ts = UNIX_EPOCH + timedelta (seconds = 23 ),
492+ )
493+ assert window .count_covered () == 10
494+ assert window .count_valid () == 10
495+ assert task .done ()
496+
497+ task = asyncio .create_task (window .wait_for_samples (5 ))
498+ await push_logical_meter_data (
499+ sender ,
500+ [1 , 2 , None , None , None , None , None , None , None , None ],
501+ fake_time = fake_time ,
502+ start_ts = UNIX_EPOCH + timedelta (seconds = 29 ),
503+ )
504+ assert window .count_covered () == 10
505+ assert window .count_valid () == 8
506+ assert task .done ()
507+
508+ task = asyncio .create_task (window .wait_for_samples (5 ))
509+ await push_logical_meter_data (
510+ sender ,
511+ [None , 4 , None , None , None , None , None , None , None , 5 ],
512+ fake_time = fake_time ,
513+ start_ts = UNIX_EPOCH + timedelta (seconds = 39 ),
514+ )
515+ assert window .count_covered () == 10
516+ assert window .count_valid () == 7
517+ assert task .done ()
518+
519+
366520# pylint: disable=redefined-outer-name
367521async def test_resampling_window (fake_time : time_machine .Coordinates ) -> None :
368522 """Test resampling in MovingWindow."""
0 commit comments