|
3 | 3 | # SPDX-License-Identifier: MIT |
4 | 4 |
|
5 | 5 | import logging |
6 | | -import sched |
7 | 6 | import signal |
8 | 7 | import sys |
9 | 8 | import threading |
10 | 9 | import time |
11 | | -from apscheduler.schedulers.background import BackgroundScheduler |
12 | | -from apscheduler.executors.pool import ThreadPoolExecutor |
13 | | -from apscheduler.triggers.interval import IntervalTrigger |
14 | 10 | from argparse import ArgumentParser |
15 | 11 | from collections.abc import Callable |
16 | 12 | from datetime import datetime, timedelta, timezone |
|
20 | 16 | from threading import Lock, RLock, active_count |
21 | 17 | from typing import Any, ClassVar, NamedTuple |
22 | 18 |
|
| 19 | +from apscheduler.executors.pool import ThreadPoolExecutor # type: ignore |
| 20 | +from apscheduler.schedulers.background import BackgroundScheduler # type: ignore |
| 21 | +from apscheduler.triggers.interval import IntervalTrigger # type: ignore |
| 22 | + |
23 | 23 | from .activation import ActivationConfig, ActivationType |
24 | 24 | from .callback import WrappedCallback |
25 | 25 | from .communication import CommunicationClient, DebugClient, HttpClient |
@@ -175,9 +175,9 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None): |
175 | 175 |
|
176 | 176 |
|
177 | 177 | class ExecutorType(str, Enum): |
178 | | - CALLBACKS = 'callbacks' |
179 | | - INTERNAL = 'internal' |
180 | | - HEARTBEAT = 'heartbeat' |
| 178 | + CALLBACKS = "callbacks" |
| 179 | + INTERNAL = "internal" |
| 180 | + HEARTBEAT = "heartbeat" |
181 | 181 |
|
182 | 182 |
|
183 | 183 | class Extension: |
@@ -249,11 +249,13 @@ def __init__(self, name: str = "") -> None: |
249 | 249 | self._running_callbacks_lock: Lock = Lock() |
250 | 250 |
|
251 | 251 | # Scheduler and executors for the callbacks and internal methods |
252 | | - self._scheduler = BackgroundScheduler(executors={ |
253 | | - ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), |
254 | | - ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), |
255 | | - ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE) |
256 | | - }) |
| 252 | + self._scheduler = BackgroundScheduler( |
| 253 | + executors={ |
| 254 | + ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), |
| 255 | + ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), |
| 256 | + ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE), |
| 257 | + } |
| 258 | + ) |
257 | 259 |
|
258 | 260 | # Extension metrics |
259 | 261 | self._metrics_lock = RLock() |
@@ -376,10 +378,12 @@ def _schedule_callback(self, callback: WrappedCallback): |
376 | 378 | callback.running_in_sim = self._running_in_sim |
377 | 379 | self._scheduled_callbacks.append(callback) |
378 | 380 |
|
379 | | - self._scheduler.add_job(self._run_callback, args=[callback], |
| 381 | + self._scheduler.add_job( |
| 382 | + self._run_callback, |
| 383 | + args=[callback], |
380 | 384 | executor=ExecutorType.CALLBACKS, |
381 | 385 | trigger=IntervalTrigger(seconds=callback.interval.total_seconds()), |
382 | | - next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()) |
| 386 | + next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()), |
383 | 387 | ) |
384 | 388 |
|
385 | 389 | def schedule( |
@@ -813,10 +817,15 @@ def _parse_args(self): |
813 | 817 |
|
814 | 818 | if not self._is_fastcheck: |
815 | 819 | try: |
816 | | - # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, which would starve heartbeat if any callback took too long |
| 820 | + # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, |
| 821 | + # which would starve heartbeat if any callback took too long |
817 | 822 | # On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay? |
818 | 823 | # Why did heartbeat have a different priority (higher or lower?) |
819 | | - self._scheduler.add_job(self._heartbeat, executor=ExecutorType.HEARTBEAT, trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds())) |
| 824 | + self._scheduler.add_job( |
| 825 | + self._heartbeat, |
| 826 | + executor=ExecutorType.HEARTBEAT, |
| 827 | + trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds()), |
| 828 | + ) |
820 | 829 | self.initialize() |
821 | 830 | if not self.is_helper: |
822 | 831 | self.schedule(self.query, timedelta(minutes=1)) |
@@ -870,40 +879,49 @@ def _run_callback(self, callback: WrappedCallback): |
870 | 879 | with self._running_callbacks_lock: |
871 | 880 | self._running_callbacks.pop(current_thread_id, None) |
872 | 881 |
|
873 | | - |
874 | 882 | def _start_extension_loop(self): |
875 | 883 | api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'") |
876 | 884 |
|
877 | 885 | # These were scheduled before the extension started, schedule them now |
878 | 886 | for callback in self._scheduled_callbacks_before_run: |
879 | 887 | self._schedule_callback(callback) |
880 | 888 |
|
881 | | - |
882 | | - self._scheduler.add_job(self._send_metrics, executor=ExecutorType.INTERNAL, |
| 889 | + self._scheduler.add_job( |
| 890 | + self._send_metrics, |
| 891 | + executor=ExecutorType.INTERNAL, |
883 | 892 | trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), |
884 | | - next_run_time=datetime.now()) |
| 893 | + next_run_time=datetime.now(), |
| 894 | + ) |
885 | 895 |
|
886 | | - self._scheduler.add_job(self._send_buffered_events, executor=ExecutorType.INTERNAL, |
| 896 | + self._scheduler.add_job( |
| 897 | + self._send_buffered_events, |
| 898 | + executor=ExecutorType.INTERNAL, |
887 | 899 | trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), |
888 | | - next_run_time=datetime.now()) |
| 900 | + next_run_time=datetime.now(), |
| 901 | + ) |
889 | 902 |
|
890 | | - self._scheduler.add_job(self._send_sfm_metrics, executor=ExecutorType.INTERNAL, |
| 903 | + self._scheduler.add_job( |
| 904 | + self._send_sfm_metrics, |
| 905 | + executor=ExecutorType.INTERNAL, |
891 | 906 | trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()), |
892 | | - next_run_time=datetime.now()) |
| 907 | + next_run_time=datetime.now(), |
| 908 | + ) |
893 | 909 |
|
894 | | - self._scheduler.add_job(self._update_cluster_time_diff, executor=ExecutorType.INTERNAL, |
| 910 | + self._scheduler.add_job( |
| 911 | + self._update_cluster_time_diff, |
| 912 | + executor=ExecutorType.INTERNAL, |
895 | 913 | trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()), |
896 | | - next_run_time=datetime.now()) |
| 914 | + next_run_time=datetime.now(), |
| 915 | + ) |
897 | 916 |
|
898 | 917 | self._scheduler.start() |
899 | | - |
| 918 | + |
900 | 919 | try: |
901 | 920 | while self._scheduler.running: |
902 | 921 | time.sleep(1) |
903 | 922 | except Exception: |
904 | 923 | self._scheduler.shutdown() |
905 | 924 |
|
906 | | - |
907 | 925 | def _send_metrics(self): |
908 | 926 | with self._metrics_lock: |
909 | 927 | with self._internal_callbacks_results_lock: |
@@ -1167,11 +1185,6 @@ def _send_buffered_events(self): |
1167 | 1185 | def _send_dt_event(self, event: dict[str, str | int | dict[str, str]]): |
1168 | 1186 | self._client.send_dt_event(event) |
1169 | 1187 |
|
1170 | | - def _get_and_set_next_internal_callback_timestamp(self, callback_name: str, interval: timedelta): |
1171 | | - next_timestamp = self._next_internal_callbacks_timestamps[callback_name] |
1172 | | - self._next_internal_callbacks_timestamps[callback_name] += interval |
1173 | | - return next_timestamp.timestamp() |
1174 | | - |
1175 | 1188 | def get_version(self) -> str: |
1176 | 1189 | """Return the extension version.""" |
1177 | 1190 | return self.activation_config.version |
|
0 commit comments