1515from datetime import datetime , timedelta , timezone
1616from typing import AsyncIterator , Callable , Coroutine , Optional , Sequence
1717
18+ from frequenz .channels .util import Timer
19+ from frequenz .channels .util ._timer import _to_microseconds
20+
1821from .._internal ._asyncio import cancel_and_await
1922from ._base_types import UNIX_EPOCH , Sample
2023from ._quantities import Quantity , QuantityT
@@ -367,7 +370,8 @@ def __init__(self, config: ResamplerConfig) -> None:
367370 self ._resamplers : dict [Source , _StreamingHelper ] = {}
368371 """A mapping between sources and the streaming helper handling that source."""
369372
370- self ._window_end : datetime = self ._calculate_window_end ()
373+ window_end , start_delay_time = self ._calculate_window_end ()
374+ self ._window_end : datetime = window_end
371375 """The time in which the current window ends.
372376
373377 This is used to make sure every resampling window is generated at
@@ -380,6 +384,16 @@ def __init__(self, config: ResamplerConfig) -> None:
380384 the window end is deterministic.
381385 """
382386
387+ self ._timer : Timer = Timer .periodic (config .resampling_period )
388+ """The timer used to trigger the resampling windows."""
389+
390+ # Hack to align the timer, this should be implemented in the Timer class
391+ self ._timer ._next_tick_time = _to_microseconds (
392+ timedelta (seconds = asyncio .get_running_loop ().time ())
393+ + config .resampling_period
394+ + start_delay_time
395+ ) # pylint: disable=protected-access
396+
383397 @property
384398 def config (self ) -> ResamplerConfig :
385399 """Get the resampler configuration.
@@ -461,15 +475,32 @@ async def resample(self, *, one_shot: bool = False) -> None:
461475 timeseries from the resampler before calling this method
462476 again).
463477 """
464- while True :
465- await self ._wait_for_next_resampling_period ()
478+ # We use a tolerance of 10% of the resampling period
479+ tolerance = timedelta (
480+ seconds = self ._config .resampling_period .total_seconds () / 10.0
481+ )
482+
483+ async for drift in self ._timer :
484+ now = datetime .now (tz = timezone .utc )
485+
486+ if drift > tolerance :
487+ _logger .warning (
488+ "The resampling task woke up too late. Resampling should have "
489+ "started at %s, but it started at %s (tolerance: %s, "
490+ "difference: %s; resampling period: %s)" ,
491+ self ._window_end ,
492+ now ,
493+ tolerance ,
494+ drift ,
495+ self ._config .resampling_period ,
496+ )
466497
467498 results = await asyncio .gather (
468499 * [r .resample (self ._window_end ) for r in self ._resamplers .values ()],
469500 return_exceptions = True ,
470501 )
471502
472- self ._window_end = self . _window_end + self ._config .resampling_period
503+ self ._window_end += self ._config .resampling_period
473504 exceptions = {
474505 source : results [i ]
475506 for i , source in enumerate (self ._resamplers )
@@ -482,57 +513,43 @@ async def resample(self, *, one_shot: bool = False) -> None:
482513 if one_shot :
483514 break
484515
485- async def _wait_for_next_resampling_period (self ) -> None :
486- """Wait for next resampling period.
487-
488- If resampling period already started, then return without sleeping.
489- That would allow us to catch up with resampling.
490- Print warning if function woke up to late.
491- """
492- now = datetime .now (tz = timezone .utc )
493- if self ._window_end > now :
494- sleep_for = self ._window_end - now
495- await asyncio .sleep (sleep_for .total_seconds ())
496-
497- timer_error = now - self ._window_end
498- # We use a tolerance of 10% of the resampling period
499- tolerance = timedelta (
500- seconds = self ._config .resampling_period .total_seconds () / 10.0
501- )
502- if timer_error > tolerance :
503- _logger .warning (
504- "The resampling task woke up too late. Resampling should have "
505- "started at %s, but it started at %s (tolerance: %s, "
506- "difference: %s; resampling period: %s)" ,
507- self ._window_end ,
508- now ,
509- tolerance ,
510- timer_error ,
511- self ._config .resampling_period ,
512- )
513-
514- def _calculate_window_end (self ) -> datetime :
516+ def _calculate_window_end (self ) -> tuple [datetime , timedelta ]:
515517 """Calculate the end of the current resampling window.
516518
517519 The calculated resampling window end is a multiple of
518520 `self._config.resampling_period` starting at `self._config.align_to`.
519521
520522 if `self._config.align_to` is `None`, the current time is used.
521523
524+ If the current time is not aligned to `self._config.resampling_period`, then
525+ the end of the current resampling window will be more than one period away, to
526+ make sure to have some time to collect samples if the misalignment is too big.
527+
522528 Returns:
523- The end of the current resampling window aligned to
524- `self._config.align_to`.
529+ A tuple with the end of the current resampling window aligned to
530+ `self._config.align_to` as the first item and the time we need to
531+ delay the timer start to make sure it is also aligned.
525532 """
526533 now = datetime .now (timezone .utc )
527534 period = self ._config .resampling_period
528535 align_to = self ._config .align_to
529536
530537 if align_to is None :
531- return now + period
538+ return ( now + period , timedelta ( 0 ))
532539
533540 elapsed = (now - align_to ) % period
534541
535- return now + period - elapsed
542+ # If we are already in sync, we don't need to add an extra period
543+ if not elapsed :
544+ return (now + period , timedelta (0 ))
545+
546+ return (
547+ # We add an extra period when it is not aligned to make sure we collected
548+ # enough samples before the first resampling, otherwise the initial window
549+ # to collect samples could be too small.
550+ now + period * 2 - elapsed ,
551+ period - elapsed if elapsed else timedelta (0 ),
552+ )
536553
537554
538555class _ResamplingHelper :
0 commit comments