Skip to content

Commit aa1a99a

Browse files
committed
Fix scheduler pacing and stop condition logic
Resolved incorrect pacing and stop condition handling in the Scheduler class. - Replaced inaccurate duration checks with explicit `_start_time` tracking. - Introduced `_time_remaining()` and `_available_sleep_window()` for precise timing. - Refactored sleep logic into `_pace()` to account for task duration. - Ensured scheduler stops exactly when configured `duration` is reached. - Improved logging clarity and added test correction for edge timing behavior. Signed-off-by: cyiallou - Costas <[email protected]>
1 parent 8a2a810 commit aa1a99a

File tree

3 files changed

+73
-38
lines changed

3 files changed

+73
-38
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
## Bug Fixes
1616

1717
- Updated the Solar Maintenance notebook to fix the expected environment variable name for the reporting server url.
18+
- Fixed a bug in the notification `Scheduler` where tasks could overrun the configured duration due to imprecise sleep and stop logic. The scheduler now correctly tracks elapsed time, respects task execution duration, and stops reliably after the intended interval.

src/frequenz/lib/notebooks/notification_service.py

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ def __init__(self, config: SchedulerConfig) -> None:
333333
self._task_name: str | None = None
334334
self._stop_event = threading.Event()
335335
self._thread: threading.Thread | None = None
336-
self._time_awoke: float = 0.0 # time when the scheduler awoke from sleep
336+
self._start_time: float | None = None
337337

338338
def start(self, task: Callable[..., None], **kwargs: Any) -> None:
339339
"""Start the scheduler for a given task.
@@ -363,13 +363,15 @@ def stop(self) -> None:
363363
"""Stop the scheduler."""
364364
if self._thread is not None:
365365
if self._thread.is_alive():
366-
_log.info("Stopping scheduler for %s", self._task_name)
366+
_log.info("Stopping scheduler for task '%s'", self._task_name)
367367
self._stop_event.set()
368368
if not self._stop_event.is_set():
369-
_log.error("Failed to stop scheduler for %s", self._task_name)
369+
_log.error(
370+
"Failed to stop scheduler for task '%s'", self._task_name
371+
)
370372
else:
371373
_log.warning(
372-
"Attempted to stop scheduler for %s, but no active thread was found.",
374+
"Attempted to stop scheduler for task '%s', but no active thread was found.",
373375
self._task_name,
374376
)
375377
_log.info("Scheduler successfully stopped")
@@ -380,42 +382,38 @@ def _run_task(self, kwargs: dict[str, Any]) -> None:
380382
Args:
381383
kwargs: Arguments to pass to the task.
382384
"""
383-
start_time = time.time()
385+
self._start_time = time.time()
384386
if self._config.send_immediately:
385-
self._execute_task(kwargs)
387+
elapsed = self._execute_task(kwargs)
388+
self._pace(elapsed)
386389
else:
387390
_log.info(
388391
"Waiting for first interval before sending the first notification."
389392
)
390-
self._stop_event.wait(self._config.interval)
391-
self._time_awoke = time.time()
392-
393-
while not self._stop_event.is_set():
394-
if self._should_stop(start_time):
395-
break
396-
self._execute_task(kwargs)
397-
398-
def _should_stop(self, start_time: float) -> bool:
399-
"""Determine if the scheduler should stop.
400-
401-
Args:
402-
start_time: The time the scheduler started.
403-
404-
Returns:
405-
True if the scheduler should stop, False otherwise.
406-
"""
407-
if (
393+
self._pace(0)
394+
while not self._should_stop():
395+
elapsed = self._execute_task(kwargs)
396+
self._pace(elapsed)
397+
_log.info("Scheduler stopping: stop condition met.")
398+
self.stop()
399+
400+
def _should_stop(self) -> bool:
401+
"""Return True if the scheduler should stop."""
402+
_log.debug("Checking if scheduler for task '%s' should stop.", self._task_name)
403+
return self._stop_event.is_set() or (
408404
self._config.duration is not None
409-
and (time.time() - self._time_awoke - start_time) >= self._config.duration
410-
):
411-
return True
412-
return False
405+
and self._start_time is not None
406+
and self._time_remaining() <= 0
407+
)
413408

414-
def _execute_task(self, kwargs: dict[str, Any]) -> None:
409+
def _execute_task(self, kwargs: dict[str, Any]) -> float:
415410
"""Execute the scheduled task and handle interval waiting.
416411
417412
Args:
418413
kwargs: Arguments to pass to the task.
414+
415+
Returns:
416+
The time taken to execute the task in seconds.
419417
"""
420418
task_start_time = time.time()
421419
try:
@@ -429,14 +427,44 @@ def _execute_task(self, kwargs: dict[str, Any]) -> None:
429427
)
430428
finally:
431429
task_elapsed = time.time() - task_start_time
432-
sleep_duration = max(0, self._config.interval - task_elapsed)
433-
_log.info(
434-
"Scheduled execution completed for %s. Sleeping for %d seconds.",
430+
_log.debug(
431+
"Execution of task '%s' completed in %.2f seconds.",
435432
self._task_name,
436-
sleep_duration,
433+
task_elapsed,
437434
)
438-
self._stop_event.wait(sleep_duration)
439-
self._time_awoke = time.time()
435+
return task_elapsed
436+
437+
def _time_remaining(self) -> float:
438+
"""Return the remaining time before the scheduler should stop.
439+
440+
Returns:
441+
A float indicating the number of seconds remaining until the
442+
configured duration is exceeded. If no duration is configured,
443+
returns float('inf') to represent an unbounded schedule.
444+
"""
445+
if self._config.duration is None or self._start_time is None:
446+
return float("inf")
447+
return max(0.0, self._config.duration - (time.time() - self._start_time))
448+
449+
def _available_sleep_window(self) -> float:
450+
"""Calculate the maximum allowed sleep time given the interval and remaining time."""
451+
return min(self._config.interval, self._time_remaining())
452+
453+
def _pace(self, elapsed_task_time: float) -> None:
454+
"""Sleep for interval minus task duration, bounded by duration limit.
455+
456+
Args:
457+
elapsed_task_time: Time taken by the task in seconds.
458+
"""
459+
sleep_duration = self._available_sleep_window()
460+
if sleep_duration < self._config.interval:
461+
actual_sleep = max(0, sleep_duration)
462+
else:
463+
actual_sleep = max(0, sleep_duration - elapsed_task_time)
464+
if self._stop_event.is_set():
465+
return
466+
_log.info("Sleeping for %.2f seconds before next task execution.", actual_sleep)
467+
self._stop_event.wait(actual_sleep)
440468

441469

442470
class BaseNotification:

tests/test_notification_service.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,20 @@ def test_scheduler_behavior(config: SchedulerConfig, mock_send: MagicMock) -> No
9797
scheduler.start(mock_send)
9898
if config.duration:
9999
time.sleep(config.duration + 0.5)
100+
correction = -1 # -1 to account for the time lost in executing the task
100101
else:
101102
time.sleep(test_time_if_no_duration)
103+
correction = 0
102104
scheduler.stop()
103105

104106
expected_calls = (
105-
(config.duration if config.duration else test_time_if_no_duration)
106-
// config.interval
107-
) + (1 if config.send_immediately else 0)
107+
(
108+
(config.duration if config.duration else test_time_if_no_duration)
109+
// config.interval
110+
)
111+
+ (1 if config.send_immediately else 0)
112+
+ correction
113+
)
108114
assert mock_send.call_count == expected_calls
109115

110116

0 commit comments

Comments
 (0)