Skip to content

Commit 5308a85

Browse files
committed
Resampler emits sample as regularly as possible using Timer.periodic
Signed-off-by: Jack <[email protected]>
1 parent 58ca166 commit 5308a85

File tree

1 file changed

+23
-32
lines changed

1 file changed

+23
-32
lines changed

src/frequenz/sdk/timeseries/_resampling.py

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from datetime import datetime, timedelta, timezone
1616
from typing import AsyncIterator, Callable, Coroutine, Optional, Sequence
1717

18+
from frequenz.channels.util import Timer
19+
1820
from .._internal._asyncio import cancel_and_await
1921
from ._base_types import UNIX_EPOCH, Sample
2022
from ._quantities import Quantity, QuantityT
@@ -461,15 +463,33 @@ async def resample(self, *, one_shot: bool = False) -> None:
461463
timeseries from the resampler before calling this method
462464
again).
463465
"""
464-
while True:
465-
await self._wait_for_next_resampling_period()
466+
# We use a tolerance of 10% of the resampling period
467+
tolerance = timedelta(
468+
seconds=self._config.resampling_period.total_seconds() / 10.0
469+
)
470+
async for drift in Timer.periodic(
471+
timedelta(seconds=self._config.resampling_period.total_seconds())
472+
):
473+
now = datetime.now(tz=timezone.utc)
474+
475+
if drift > tolerance:
476+
_logger.warning(
477+
"The resampling task woke up too late. Resampling should have "
478+
"started at %s, but it started at %s (tolerance: %s, "
479+
"difference: %s; resampling period: %s)",
480+
self._window_end,
481+
now,
482+
tolerance,
483+
drift,
484+
self._config.resampling_period,
485+
)
466486

467487
results = await asyncio.gather(
468488
*[r.resample(self._window_end) for r in self._resamplers.values()],
469489
return_exceptions=True,
470490
)
471491

472-
self._window_end = self._window_end + self._config.resampling_period
492+
self._window_end += self._config.resampling_period
473493
exceptions = {
474494
source: results[i]
475495
for i, source in enumerate(self._resamplers)
@@ -482,35 +502,6 @@ async def resample(self, *, one_shot: bool = False) -> None:
482502
if one_shot:
483503
break
484504

485-
async def _wait_for_next_resampling_period(self) -> None:
486-
"""Wait for next resampling period.
487-
488-
If resampling period already started, then return without sleeping.
489-
That would allow us to catch up with resampling.
490-
Print warning if function woke up to late.
491-
"""
492-
now = datetime.now(tz=timezone.utc)
493-
if self._window_end > now:
494-
sleep_for = self._window_end - now
495-
await asyncio.sleep(sleep_for.total_seconds())
496-
497-
timer_error = now - self._window_end
498-
# We use a tolerance of 10% of the resampling period
499-
tolerance = timedelta(
500-
seconds=self._config.resampling_period.total_seconds() / 10.0
501-
)
502-
if timer_error > tolerance:
503-
_logger.warning(
504-
"The resampling task woke up too late. Resampling should have "
505-
"started at %s, but it started at %s (tolerance: %s, "
506-
"difference: %s; resampling period: %s)",
507-
self._window_end,
508-
now,
509-
tolerance,
510-
timer_error,
511-
self._config.resampling_period,
512-
)
513-
514505
def _calculate_window_end(self) -> datetime:
515506
"""Calculate the end of the current resampling window.
516507

0 commit comments

Comments
 (0)