@@ -190,6 +190,8 @@ def __init__( # pylint: disable=too-many-arguments
190190 align_to = align_to ,
191191 )
192192
193+ self ._condition_new_sample = asyncio .Condition ()
194+
193195 def start (self ) -> None :
194196 """Start the MovingWindow.
195197
@@ -318,6 +320,44 @@ def window(
318320 start , end , force_copy = force_copy , fill_value = fill_value
319321 )
320322
323+ async def wait_for_samples (self , n : int ) -> None :
324+ """Wait until the next `n` samples are available in the MovingWindow.
325+
326+ This function returns after `n` new samples are available in the MovingWindow,
327+ without considering whether the new samples are valid. The validity of the
328+ samples can be verified by calling the
329+ [`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.
330+
331+ Args:
332+ n: The number of samples to wait for.
333+
334+ Raises:
335+ ValueError: If `n` is less than or equal to 0 or greater than the capacity
336+ of the MovingWindow.
337+ """
338+ if n == 0 :
339+ return
340+ if n < 0 :
341+ raise ValueError ("The number of samples to wait for must be 0 or greater." )
342+ if n > self .capacity :
343+ raise ValueError (
344+ "The number of samples to wait for must be less than or equal to the "
345+ + f"capacity of the MovingWindow ({ self .capacity } )."
346+ )
347+ start_timestamp = (
348+ # Start from the next expected timestamp.
349+ self .newest_timestamp + self .sampling_period
350+ if self .newest_timestamp is not None
351+ else None
352+ )
353+ while True :
354+ async with self ._condition_new_sample :
355+ # Every time a new sample is received, this condition gets notified and
356+ # will wake up.
357+ _ = await self ._condition_new_sample .wait ()
358+ if self .count_covered (since = start_timestamp ) >= n :
359+ return
360+
321361 async def _run_impl (self ) -> None :
322362 """Awaits samples from the receiver and updates the underlying ring buffer.
323363
@@ -331,6 +371,9 @@ async def _run_impl(self) -> None:
331371 await self ._resampler_sender .send (sample )
332372 else :
333373 self ._buffer .update (sample )
374+ async with self ._condition_new_sample :
375+ # Wake up all coroutines waiting for new samples.
376+ self ._condition_new_sample .notify_all ()
334377
335378 except asyncio .CancelledError :
336379 _logger .info ("MovingWindow task has been cancelled." )
@@ -343,8 +386,10 @@ def _configure_resampler(self) -> None:
343386 assert self ._resampler is not None
344387
345388 async def sink_buffer (sample : Sample [Quantity ]) -> None :
346- if sample .value is not None :
347- self ._buffer .update (sample )
389+ self ._buffer .update (sample )
390+ async with self ._condition_new_sample :
391+ # Wake up all coroutines waiting for new samples.
392+ self ._condition_new_sample .notify_all ()
348393
349394 resampler_channel = Broadcast [Sample [Quantity ]](name = "average" )
350395 self ._resampler_sender = resampler_channel .new_sender ()
@@ -355,23 +400,44 @@ async def sink_buffer(sample: Sample[Quantity]) -> None:
355400 asyncio .create_task (self ._resampler .resample (), name = "resample" )
356401 )
357402
358- def count_valid (self ) -> int :
359- """
360- Count the number of valid samples in this `MovingWindow`.
403+ def count_valid (
404+ self , * , since : datetime | None = None , until : datetime | None = None
405+ ) -> int :
406+ """Count the number of valid samples in this `MovingWindow`.
407+
408+ If `since` and `until` are provided, the count is limited to the samples between
409+ (and including) the given timestamps.
410+
411+ Args:
412+ since: The timestamp from which to start counting. If `None`, the oldest
413+ timestamp of the buffer is used.
414+ until: The timestamp until (and including) which to count. If `None`, the
415+ newest timestamp of the buffer is used.
361416
362417 Returns:
363418 The number of valid samples in this `MovingWindow`.
364419 """
365- return self ._buffer .count_valid ()
420+ return self ._buffer .count_valid (since = since , until = until )
366421
367- def count_covered (self ) -> int :
422+ def count_covered (
423+ self , * , since : datetime | None = None , until : datetime | None = None
424+ ) -> int :
368425 """Count the number of samples that are covered by the oldest and newest valid samples.
369426
427+ If `since` and `until` are provided, the count is limited to the samples between
428+ (and including) the given timestamps.
429+
430+ Args:
431+ since: The timestamp from which to start counting. If `None`, the oldest
432+ timestamp of the buffer is used.
433+ until: The timestamp until (and including) which to count. If `None`, the
434+ newest timestamp of the buffer is used.
435+
370436 Returns:
371437 The count of samples between the oldest and newest (inclusive) valid samples
372438 or 0 if there are is no time range covered.
373439 """
374- return self ._buffer .count_covered ()
440+ return self ._buffer .count_covered (since = since , until = until )
375441
376442 @overload
377443 def __getitem__ (self , key : SupportsIndex ) -> float :
0 commit comments