|
8 | 8 | import sys |
9 | 9 | import threading |
10 | 10 | import time |
| 11 | +from apscheduler.schedulers.background import BackgroundScheduler |
| 12 | +from apscheduler.executors.pool import ThreadPoolExecutor |
| 13 | +from apscheduler.triggers.interval import IntervalTrigger |
11 | 14 | from argparse import ArgumentParser |
12 | 15 | from collections.abc import Callable |
13 | | -from concurrent.futures import ThreadPoolExecutor |
14 | 16 | from datetime import datetime, timedelta, timezone |
15 | 17 | from enum import Enum |
16 | 18 | from itertools import chain |
@@ -172,6 +174,12 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None): |
172 | 174 | sfm_metrics.append(metric) |
173 | 175 |
|
174 | 176 |
|
| 177 | +class ExecutorType(str, Enum): |
| 178 | + CALLBACKS = 'callbacks' |
| 179 | + INTERNAL = 'internal' |
| 180 | + HEARTBEAT = 'heartbeat' |
| 181 | + |
| 182 | + |
175 | 183 | class Extension: |
176 | 184 | """Base class for Python extensions. |
177 | 185 |
|
@@ -240,21 +248,12 @@ def __init__(self, name: str = "") -> None: |
240 | 248 | self._running_callbacks: dict[int, WrappedCallback] = {} |
241 | 249 | self._running_callbacks_lock: Lock = Lock() |
242 | 250 |
|
243 | | - self._scheduler = sched.scheduler(time.time, time.sleep) |
244 | | - |
245 | | - # Timestamps for scheduling of internal callbacks |
246 | | - self._next_internal_callbacks_timestamps: dict[str, datetime] = { |
247 | | - "timediff": datetime.now() + TIME_DIFF_INTERVAL, |
248 | | - "heartbeat": datetime.now() + HEARTBEAT_INTERVAL, |
249 | | - "metrics": datetime.now() + METRIC_SENDING_INTERVAL, |
250 | | - "events": datetime.now() + METRIC_SENDING_INTERVAL, |
251 | | - "sfm_metrics": datetime.now() + SFM_METRIC_SENDING_INTERVAL, |
252 | | - } |
253 | | - |
254 | | - # Executors for the callbacks and internal methods |
255 | | - self._callbacks_executor = ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE) |
256 | | - self._internal_executor = ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE) |
257 | | - self._heartbeat_executor = ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE) |
| 251 | + # Scheduler and executors for the callbacks and internal methods |
| 252 | + self._scheduler = BackgroundScheduler(executors={ |
| 253 | + ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), |
| 254 | + ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), |
| 255 | + ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE) |
| 256 | + }) |
258 | 257 |
|
259 | 258 | # Extension metrics |
260 | 259 | self._metrics_lock = RLock() |
@@ -376,7 +375,12 @@ def _schedule_callback(self, callback: WrappedCallback): |
376 | 375 | callback.cluster_time_diff = self._cluster_time_diff |
377 | 376 | callback.running_in_sim = self._running_in_sim |
378 | 377 | self._scheduled_callbacks.append(callback) |
379 | | - self._scheduler.enter(callback.initial_wait_time(), 1, self._callback_iteration, (callback,)) |
| 378 | + |
| 379 | + self._scheduler.add_job(self._run_callback, args=[callback], |
| 380 | + executor=ExecutorType.CALLBACKS, |
| 381 | + trigger=IntervalTrigger(seconds=callback.interval.total_seconds()), |
| 382 | + next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()) |
| 383 | + ) |
380 | 384 |
|
381 | 385 | def schedule( |
382 | 386 | self, |
@@ -809,7 +813,10 @@ def _parse_args(self): |
809 | 813 |
|
810 | 814 | if not self._is_fastcheck: |
811 | 815 | try: |
812 | | - self._heartbeat_iteration() |
| 816 | + # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, which would starve heartbeat if any callback took too long |
| 817 | + # On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay? |
| 818 | + # Why did heartbeat have a different priority (higher or lower?) |
| 819 | + self._scheduler.add_job(self._heartbeat, executor=ExecutorType.HEARTBEAT, trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds())) |
813 | 820 | self.initialize() |
814 | 821 | if not self.is_helper: |
815 | 822 | self.schedule(self.query, timedelta(minutes=1)) |
@@ -863,48 +870,39 @@ def _run_callback(self, callback: WrappedCallback): |
863 | 870 | with self._running_callbacks_lock: |
864 | 871 | self._running_callbacks.pop(current_thread_id, None) |
865 | 872 |
|
866 | | - def _callback_iteration(self, callback: WrappedCallback): |
867 | | - self._callbacks_executor.submit(self._run_callback, callback) |
868 | | - callback.iterations += 1 |
869 | | - next_timestamp = callback.get_next_execution_timestamp() |
870 | | - self._scheduler.enterabs(next_timestamp, 1, self._callback_iteration, (callback,)) |
871 | 873 |
|
872 | 874 | def _start_extension_loop(self): |
873 | 875 | api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'") |
874 | 876 |
|
875 | 877 | # These were scheduled before the extension started, schedule them now |
876 | 878 | for callback in self._scheduled_callbacks_before_run: |
877 | 879 | self._schedule_callback(callback) |
878 | | - self._metrics_iteration() |
879 | | - self._events_iteration() |
880 | | - self._sfm_metrics_iteration() |
881 | | - self._timediff_iteration() |
882 | | - self._scheduler.run() |
883 | | - |
884 | | - def _timediff_iteration(self): |
885 | | - self._internal_executor.submit(self._update_cluster_time_diff) |
886 | | - next_timestamp = self._get_and_set_next_internal_callback_timestamp("timediff", TIME_DIFF_INTERVAL) |
887 | | - self._scheduler.enterabs(next_timestamp, 1, self._timediff_iteration) |
888 | | - |
889 | | - def _heartbeat_iteration(self): |
890 | | - self._heartbeat_executor.submit(self._heartbeat) |
891 | | - next_timestamp = self._get_and_set_next_internal_callback_timestamp("heartbeat", HEARTBEAT_INTERVAL) |
892 | | - self._scheduler.enterabs(next_timestamp, 2, self._heartbeat_iteration) |
893 | | - |
894 | | - def _metrics_iteration(self): |
895 | | - self._internal_executor.submit(self._send_metrics) |
896 | | - next_timestamp = self._get_and_set_next_internal_callback_timestamp("metrics", METRIC_SENDING_INTERVAL) |
897 | | - self._scheduler.enterabs(next_timestamp, 1, self._metrics_iteration) |
898 | | - |
899 | | - def _events_iteration(self): |
900 | | - self._internal_executor.submit(self._send_buffered_events) |
901 | | - next_timestamp = self._get_and_set_next_internal_callback_timestamp("events", METRIC_SENDING_INTERVAL) |
902 | | - self._scheduler.enterabs(next_timestamp, 1, self._events_iteration) |
903 | | - |
904 | | - def _sfm_metrics_iteration(self): |
905 | | - self._internal_executor.submit(self._send_sfm_metrics) |
906 | | - next_timestamp = self._get_and_set_next_internal_callback_timestamp("sfm_metrics", SFM_METRIC_SENDING_INTERVAL) |
907 | | - self._scheduler.enterabs(next_timestamp, 1, self._sfm_metrics_iteration) |
| 880 | + |
| 881 | + |
| 882 | + self._scheduler.add_job(self._send_metrics, executor=ExecutorType.INTERNAL, |
| 883 | + trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), |
| 884 | + next_run_time=datetime.now()) |
| 885 | + |
| 886 | + self._scheduler.add_job(self._send_buffered_events, executor=ExecutorType.INTERNAL, |
| 887 | + trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), |
| 888 | + next_run_time=datetime.now()) |
| 889 | + |
| 890 | + self._scheduler.add_job(self._send_sfm_metrics, executor=ExecutorType.INTERNAL, |
| 891 | + trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()), |
| 892 | + next_run_time=datetime.now()) |
| 893 | + |
| 894 | + self._scheduler.add_job(self._update_cluster_time_diff, executor=ExecutorType.INTERNAL, |
| 895 | + trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()), |
| 896 | + next_run_time=datetime.now()) |
| 897 | + |
| 898 | + self._scheduler.start() |
| 899 | + |
| 900 | + try: |
| 901 | + while self._scheduler.running: |
| 902 | + time.sleep(1) |
| 903 | + except Exception: |
| 904 | + self._scheduler.shutdown() |
| 905 | + |
908 | 906 |
|
909 | 907 | def _send_metrics(self): |
910 | 908 | with self._metrics_lock: |
@@ -1105,8 +1103,8 @@ def _heartbeat(self): |
1105 | 1103 | api_logger.error(f"Heartbeat failed because {e}, response {response}", exc_info=True) |
1106 | 1104 |
|
1107 | 1105 | def __del__(self): |
1108 | | - self._callbacks_executor.shutdown() |
1109 | | - self._internal_executor.shutdown() |
| 1106 | + if self._scheduler.running: |
| 1107 | + self._scheduler.shutdown() |
1110 | 1108 |
|
1111 | 1109 | def _add_metric(self, metric: Metric): |
1112 | 1110 | metric.validate() |
@@ -1150,7 +1148,7 @@ def _send_events_internal(self, events: dict | list[dict]): |
1150 | 1148 |
|
1151 | 1149 | def _send_events(self, events: dict | list[dict], send_immediately: bool = False): |
1152 | 1150 | if send_immediately: |
1153 | | - self._internal_executor.submit(self._send_events_internal, events) |
| 1151 | + self._scheduler.add_job(self._send_events_internal, args=[events], executor=ExecutorType.INTERNAL) |
1154 | 1152 | return |
1155 | 1153 | with self._logs_lock: |
1156 | 1154 | if isinstance(events, dict): |
@@ -1247,4 +1245,4 @@ def _send_sfm_logs(self, logs: dict | list[dict]): |
1247 | 1245 | log["dt.extension.config.label"] = self.monitoring_config_name |
1248 | 1246 | log.pop("monitoring.configuration", None) |
1249 | 1247 |
|
1250 | | - self._internal_executor.submit(self._send_sfm_logs_internal, logs) |
| 1248 | + self._scheduler.add_job(self._send_sfm_logs_internal, args=[logs], executor=ExecutorType.INTERNAL) |
0 commit comments