-
Notifications
You must be signed in to change notification settings - Fork 20
Resync resampler on system time changes #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ab73e1e
841e9b5
9c08b5e
b270af7
78e22b1
4cc5902
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -370,8 +370,10 @@ def __init__(self, config: ResamplerConfig) -> None: | |
| self._resamplers: dict[Source, _StreamingHelper] = {} | ||
| """A mapping between sources and the streaming helper handling that source.""" | ||
|
|
||
| window_end, start_delay_time = self._calculate_window_end() | ||
| self._window_end: datetime = window_end | ||
| self._timer: Timer = Timer.periodic(config.resampling_period) | ||
| """The timer used to trigger the resampling windows.""" | ||
|
|
||
| self._window_end = self._sync_timer() | ||
| """The time in which the current window ends. | ||
|
|
||
| This is used to make sure every resampling window is generated at | ||
|
|
@@ -384,16 +386,6 @@ def __init__(self, config: ResamplerConfig) -> None: | |
| the window end is deterministic. | ||
| """ | ||
|
|
||
| self._timer: Timer = Timer.periodic(config.resampling_period) | ||
| """The timer used to trigger the resampling windows.""" | ||
|
|
||
| # Hack to align the timer, this should be implemented in the Timer class | ||
| self._timer._next_tick_time = _to_microseconds( | ||
| timedelta(seconds=asyncio.get_running_loop().time()) | ||
| + config.resampling_period | ||
| + start_delay_time | ||
| ) # pylint: disable=protected-access | ||
|
|
||
| @property | ||
| def config(self) -> ResamplerConfig: | ||
| """Get the resampler configuration. | ||
|
|
@@ -457,6 +449,34 @@ def remove_timeseries(self, source: Source) -> bool: | |
| return False | ||
| return True | ||
|
|
||
| def _sync_timer(self, extra_period: bool = True) -> datetime: | ||
| """Resync the timer. | ||
|
|
||
| This method will resync the timer to the current time, so the next | ||
| resampling window will start at the next multiple of | ||
| `self._config.resampling_period` starting from now. | ||
|
|
||
| Args: | ||
| extra_period: Add an extra period when it is not aligned to make sure we | ||
| collected enough samples before the first resampling, otherwise the | ||
| initial window to collect samples could be too small. | ||
|
|
||
| Returns: | ||
| The end time of the resampling window. | ||
| """ | ||
| window_end, start_delay_time = self._calculate_window_end(extra_period) | ||
|
|
||
| # Hack to align the timer, this should be implemented in the Timer class | ||
| self._timer._next_tick_time = ( # pylint: disable=protected-access | ||
| _to_microseconds( | ||
| timedelta(seconds=asyncio.get_running_loop().time()) | ||
| + self.config.resampling_period | ||
| + start_delay_time | ||
| ) | ||
| ) | ||
llucax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return window_end | ||
|
|
||
| async def resample(self, *, one_shot: bool = False) -> None: | ||
| """Start resampling all known timeseries. | ||
|
|
||
|
|
@@ -483,6 +503,14 @@ async def resample(self, *, one_shot: bool = False) -> None: | |
| async for drift in self._timer: | ||
| now = datetime.now(tz=timezone.utc) | ||
|
|
||
| # If the system time changes, then `self._window_end` might drift to | ||
| # far away, such that the resampling bucket might be empty although | ||
| # new samples aligned to the new system time have been received. | ||
| # Thus we resync `self._window_end` to the new system time in case | ||
| # it drifted more then one resampling period away from the system time. | ||
| if abs(self._window_end - now) - drift > self._config.resampling_period: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sadly this is still not enough. Both drifts need to be handled independently, the mixing of both could end up hiding issues. For example: I don't know, from this maybe it's enough to use Don't we just want to always sync to the wall clock, so effectively make the resampler count periods in terms of the wall clock instead of the monotonic clock? O maybe we just want to emit warnings if the wall clock is drifting? Having a drifting wall clock is really an operational issue, so I'm not sure if we should try to fix it in the code, as it will probably bring all sorts of other issues too. And actually I'm not even sure we should warn about it in the SDK, probably we need some system-level monitoring for this instead. Is like if there were a disk failure and we try to warn the user about it and cope with it by reallocating stuff. I think is beyond the SDK's responsibilities. It really looks like we are trying to patch a broken system here, and I'm not sure if it is a good idea... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I quickly drafted a solution that I think makes everything much easier. I just created a new timer type that is attached to the wall clock especially for the Here is the proof of concept PR, against this PR: The resampler gets much simplied with this, as it doesn't need to care about any time management anymore. |
||
| self._window_end = self._sync_timer(extra_period=False) | ||
|
|
||
| if drift > tolerance: | ||
| _logger.warning( | ||
| "The resampling task woke up too late. Resampling should have " | ||
|
|
@@ -513,7 +541,9 @@ async def resample(self, *, one_shot: bool = False) -> None: | |
| if one_shot: | ||
| break | ||
|
|
||
| def _calculate_window_end(self) -> tuple[datetime, timedelta]: | ||
| def _calculate_window_end( | ||
| self, extra_period: bool = True | ||
| ) -> tuple[datetime, timedelta]: | ||
| """Calculate the end of the current resampling window. | ||
|
|
||
| The calculated resampling window end is a multiple of | ||
|
|
@@ -525,10 +555,14 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: | |
| the end of the current resampling window will be more than one period away, to | ||
| make sure to have some time to collect samples if the misalignment is too big. | ||
|
|
||
| Args: | ||
| extra_period: Add an extra period when it is not aligned to make sure we | ||
| collected enough samples before the first resampling, otherwise the | ||
| initial window to collect samples could be too small. | ||
| Returns: | ||
| A tuple with the end of the current resampling window aligned to | ||
| `self._config.align_to` as the first item and the time we need to | ||
| delay the timer start to make sure it is also aligned. | ||
| delay the timer to make sure it is also aligned. | ||
| """ | ||
| now = datetime.now(timezone.utc) | ||
| period = self._config.resampling_period | ||
|
|
@@ -543,12 +577,10 @@ def _calculate_window_end(self) -> tuple[datetime, timedelta]: | |
| if not elapsed: | ||
| return (now + period, timedelta(0)) | ||
|
|
||
| extra_period_factor = 2 if extra_period else 1 | ||
| return ( | ||
| # We add an extra period when it is not aligned to make sure we collected | ||
| # enough samples before the first resampling, otherwise the initial window | ||
| # to collect samples could be too small. | ||
| now + period * 2 - elapsed, | ||
| period - elapsed if elapsed else timedelta(0), | ||
| now + period * extra_period_factor - elapsed, | ||
| period - elapsed, | ||
| ) | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Silly, but I would either remove this extra period, as is it only a startup glitch we are trying to fix (I guess it made sense in the original code because it was free), or to switch the default, as the normal situation seems to be not adding an extra period.