@@ -429,6 +429,30 @@ def remove_timeseries(self, source: Source) -> bool:
429429 return False
430430 return True
431431
432+ async def _wait_for_next_resampling_period (self ) -> None :
433+ """Wait for next resampling period.
434+
435+ If resampling period already started, then return without sleeping.
436+ That would allow us to catch up with resampling.
437+ Print warning if function woke up to late.
438+ """
439+ now = datetime .now (tz = timezone .utc )
440+ if self ._window_end > now :
441+ sleep_for = self ._window_end - now
442+ await asyncio .sleep (sleep_for .total_seconds ())
443+
444+ timer_error_s = (now - self ._window_end ).total_seconds ()
445+ if timer_error_s > (self ._config .resampling_period_s / 10.0 ):
446+ _logger .warning (
447+ "The resampling task woke up too late. Resampling should have started "
448+ "at %s, but it started at %s (%s seconds difference; resampling "
449+ "period is %s seconds)" ,
450+ self ._window_end ,
451+ now ,
452+ timer_error_s ,
453+ self ._config .resampling_period_s ,
454+ )
455+
432456 async def resample (self , * , one_shot : bool = False ) -> None :
433457 """Start resampling all known timeseries.
434458
@@ -447,12 +471,17 @@ async def resample(self, *, one_shot: bool = False) -> None:
447471 timeseries from the resampler before calling this method
448472 again).
449473 """
450- async for timer_timestamp in self ._timer :
474+ while True :
475+ await self ._wait_for_next_resampling_period ()
476+
451477 results = await asyncio .gather (
452478 * [r .resample (self ._window_end ) for r in self ._resamplers .values ()],
453479 return_exceptions = True ,
454480 )
455- self ._update_window_end (timer_timestamp )
481+
482+ self ._window_end = self ._window_end + timedelta (
483+ seconds = self ._config .resampling_period_s
484+ )
456485 exceptions = {
457486 source : results [i ]
458487 for i , source in enumerate (self ._resamplers )
@@ -465,27 +494,6 @@ async def resample(self, *, one_shot: bool = False) -> None:
465494 if one_shot :
466495 break
467496
468- def _update_window_end (self , timer_timestamp : datetime ) -> None :
469- # We use abs() here to account for errors in the timer where the timer
470- # fires before its time even when in theory it shouldn't be possible,
471- # but we want to be resilient to timer implementation changes that
472- # could end up in this case, either because there were some time jump
473- # or some rounding error.
474- timer_error_s = abs ((timer_timestamp - self ._window_end ).total_seconds ())
475- if timer_error_s > (self ._config .resampling_period_s / 10.0 ):
476- _logger .warning (
477- "The resampling timer fired too late. It should have fired at "
478- "%s, but it fired at %s (%s seconds difference; resampling "
479- "period is %s seconds)" ,
480- self ._window_end ,
481- timer_timestamp ,
482- timer_error_s ,
483- self ._config .resampling_period_s ,
484- )
485- self ._window_end = self ._window_end + timedelta (
486- seconds = self ._config .resampling_period_s
487- )
488-
489497
490498class _ResamplingHelper :
491499 """Keeps track of *relevant* samples to pass them to the resampling function.
0 commit comments