Skip to content

Commit fd365dd

Browse files
authored
Merge pull request #127 from cyiallou/fix/scheduler
Fix Scheduler and its instantiation inside EmailNotification
2 parents 8a2a810 + 7bab830 commit fd365dd

File tree

3 files changed

+84
-41
lines changed

3 files changed

+84
-41
lines changed

RELEASE_NOTES.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66

77
## Upgrading
88

9-
- Updated the Alerts Notebook. You can now process multiple microgrid TOML files.
9+
<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
1010

1111
## New Features
1212

1313
<!-- Here goes the main new features and examples or instructions on how to use them -->
1414

1515
## Bug Fixes
1616

17-
- Updated the Solar Maintenance notebook to fix the expected environment variable name for the reporting server url.
17+
- Fixed a bug in the notification `Scheduler` where tasks could overrun the configured duration due to imprecise sleep and stop logic. The scheduler now correctly tracks elapsed time, respects task execution duration, and stops reliably after the intended interval.
18+
- Fixed an issue where `EmailNotification` did not properly initialise its scheduler. Also fixed an example in the docstring.

src/frequenz/lib/notebooks/notification_service.py

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
5050
# Create notification objects
5151
email_notification = EmailNotification(config=email_config)
52-
email_notification_2 = EmailConfig.from_dict(email_config_dict)
52+
email_notification_2 = EmailNotification(EmailConfig.from_dict(email_config_dict))
5353
5454
# Send one-off notification
5555
email_notification.send()
@@ -276,13 +276,15 @@ class EmailConfig(BaseNotificationConfig):
276276
)
277277

278278
smtp_user: str = field(
279+
repr=False,
279280
metadata={
280281
"description": "SMTP server username",
281282
"required": True,
282283
},
283284
)
284285

285286
smtp_password: str = field(
287+
repr=False,
286288
metadata={
287289
"description": "SMTP server password",
288290
"required": True,
@@ -333,7 +335,7 @@ def __init__(self, config: SchedulerConfig) -> None:
333335
self._task_name: str | None = None
334336
self._stop_event = threading.Event()
335337
self._thread: threading.Thread | None = None
336-
self._time_awoke: float = 0.0 # time when the scheduler awoke from sleep
338+
self._start_time: float | None = None
337339

338340
def start(self, task: Callable[..., None], **kwargs: Any) -> None:
339341
"""Start the scheduler for a given task.
@@ -363,13 +365,15 @@ def stop(self) -> None:
363365
"""Stop the scheduler."""
364366
if self._thread is not None:
365367
if self._thread.is_alive():
366-
_log.info("Stopping scheduler for %s", self._task_name)
368+
_log.info("Stopping scheduler for task '%s'", self._task_name)
367369
self._stop_event.set()
368370
if not self._stop_event.is_set():
369-
_log.error("Failed to stop scheduler for %s", self._task_name)
371+
_log.error(
372+
"Failed to stop scheduler for task '%s'", self._task_name
373+
)
370374
else:
371375
_log.warning(
372-
"Attempted to stop scheduler for %s, but no active thread was found.",
376+
"Attempted to stop scheduler for task '%s', but no active thread was found.",
373377
self._task_name,
374378
)
375379
_log.info("Scheduler successfully stopped")
@@ -380,42 +384,38 @@ def _run_task(self, kwargs: dict[str, Any]) -> None:
380384
Args:
381385
kwargs: Arguments to pass to the task.
382386
"""
383-
start_time = time.time()
387+
self._start_time = time.time()
384388
if self._config.send_immediately:
385-
self._execute_task(kwargs)
389+
elapsed = self._execute_task(kwargs)
390+
self._pace(elapsed)
386391
else:
387392
_log.info(
388393
"Waiting for first interval before sending the first notification."
389394
)
390-
self._stop_event.wait(self._config.interval)
391-
self._time_awoke = time.time()
392-
393-
while not self._stop_event.is_set():
394-
if self._should_stop(start_time):
395-
break
396-
self._execute_task(kwargs)
397-
398-
def _should_stop(self, start_time: float) -> bool:
399-
"""Determine if the scheduler should stop.
400-
401-
Args:
402-
start_time: The time the scheduler started.
403-
404-
Returns:
405-
True if the scheduler should stop, False otherwise.
406-
"""
407-
if (
395+
self._pace(0)
396+
while not self._should_stop():
397+
elapsed = self._execute_task(kwargs)
398+
self._pace(elapsed)
399+
_log.info("Scheduler stopping: stop condition met.")
400+
self.stop()
401+
402+
def _should_stop(self) -> bool:
403+
"""Return True if the scheduler should stop."""
404+
_log.debug("Checking if scheduler for task '%s' should stop.", self._task_name)
405+
return self._stop_event.is_set() or (
408406
self._config.duration is not None
409-
and (time.time() - self._time_awoke - start_time) >= self._config.duration
410-
):
411-
return True
412-
return False
407+
and self._start_time is not None
408+
and self._time_remaining() <= 0
409+
)
413410

414-
def _execute_task(self, kwargs: dict[str, Any]) -> None:
411+
def _execute_task(self, kwargs: dict[str, Any]) -> float:
415412
"""Execute the scheduled task and handle interval waiting.
416413
417414
Args:
418415
kwargs: Arguments to pass to the task.
416+
417+
Returns:
418+
The time taken to execute the task in seconds.
419419
"""
420420
task_start_time = time.time()
421421
try:
@@ -429,14 +429,44 @@ def _execute_task(self, kwargs: dict[str, Any]) -> None:
429429
)
430430
finally:
431431
task_elapsed = time.time() - task_start_time
432-
sleep_duration = max(0, self._config.interval - task_elapsed)
433-
_log.info(
434-
"Scheduled execution completed for %s. Sleeping for %d seconds.",
432+
_log.debug(
433+
"Execution of task '%s' completed in %.2f seconds.",
435434
self._task_name,
436-
sleep_duration,
435+
task_elapsed,
437436
)
438-
self._stop_event.wait(sleep_duration)
439-
self._time_awoke = time.time()
437+
return task_elapsed
438+
439+
def _time_remaining(self) -> float:
440+
"""Return the remaining time before the scheduler should stop.
441+
442+
Returns:
443+
A float indicating the number of seconds remaining until the
444+
configured duration is exceeded. If no duration is configured,
445+
returns float('inf') to represent an unbounded schedule.
446+
"""
447+
if self._config.duration is None or self._start_time is None:
448+
return float("inf")
449+
return max(0.0, self._config.duration - (time.time() - self._start_time))
450+
451+
def _available_sleep_window(self) -> float:
452+
"""Calculate the maximum allowed sleep time given the interval and remaining time."""
453+
return min(self._config.interval, self._time_remaining())
454+
455+
def _pace(self, elapsed_task_time: float) -> None:
456+
"""Sleep for interval minus task duration, bounded by duration limit.
457+
458+
Args:
459+
elapsed_task_time: Time taken by the task in seconds.
460+
"""
461+
sleep_duration = self._available_sleep_window()
462+
if sleep_duration < self._config.interval:
463+
actual_sleep = max(0, sleep_duration)
464+
else:
465+
actual_sleep = max(0, sleep_duration - elapsed_task_time)
466+
if self._stop_event.is_set():
467+
return
468+
_log.info("Sleeping for %.2f seconds before next task execution.", actual_sleep)
469+
self._stop_event.wait(actual_sleep)
440470

441471

442472
class BaseNotification:
@@ -528,6 +558,12 @@ def __init__(self, config: EmailConfig) -> None:
528558
"""
529559
super().__init__()
530560
self._config: EmailConfig = config
561+
if self._config.scheduler:
562+
_log.debug(
563+
"EmailNotification configured with scheduler: %s",
564+
self._config.scheduler,
565+
)
566+
self._scheduler = Scheduler(config=self._config.scheduler)
531567

532568
def send(self) -> None:
533569
"""Send the email notification."""

tests/test_notification_service.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,20 @@ def test_scheduler_behavior(config: SchedulerConfig, mock_send: MagicMock) -> No
9797
scheduler.start(mock_send)
9898
if config.duration:
9999
time.sleep(config.duration + 0.5)
100+
correction = -1 # -1 to account for the time lost in executing the task
100101
else:
101102
time.sleep(test_time_if_no_duration)
103+
correction = 0
102104
scheduler.stop()
103105

104106
expected_calls = (
105-
(config.duration if config.duration else test_time_if_no_duration)
106-
// config.interval
107-
) + (1 if config.send_immediately else 0)
107+
(
108+
(config.duration if config.duration else test_time_if_no_duration)
109+
// config.interval
110+
)
111+
+ (1 if config.send_immediately else 0)
112+
+ correction
113+
)
108114
assert mock_send.call_count == expected_calls
109115

110116

0 commit comments

Comments
 (0)