diff --git a/dynatrace_extension/__about__.py b/dynatrace_extension/__about__.py index b04502b..31f13df 100644 --- a/dynatrace_extension/__about__.py +++ b/dynatrace_extension/__about__.py @@ -3,4 +3,4 @@ # SPDX-License-Identifier: MIT -__version__ = "1.7.2" +__version__ = "1.7.3" diff --git a/dynatrace_extension/sdk/callback.py b/dynatrace_extension/sdk/callback.py index 14d0bad..610f423 100644 --- a/dynatrace_extension/sdk/callback.py +++ b/dynatrace_extension/sdk/callback.py @@ -33,7 +33,7 @@ def __init__( self.interval: timedelta = interval self.logger = logger self.running: bool = False - self.status = Status(StatusValue.OK) + self.status = IgnoreStatus() self.executions_total = 0 # global counter self.executions_per_interval = 0 # counter per interval = 1 min by default self.duration = 0 # global counter @@ -45,7 +45,6 @@ def __init__( self.ok_count = 0 # counter per interval = 1 min by default self.timeouts_count = 0 # counter per interval = 1 min by default self.exception_count = 0 # counter per interval = 1 min by default - self.iterations = 0 # how many times we ran the callback iterator for this callback def get_current_time_with_cluster_diff(self): return datetime.now() + timedelta(milliseconds=self.cluster_time_diff) @@ -142,13 +141,3 @@ def clear_sfm_metrics(self): self.duration_interval_total = 0 self.exception_count = 0 self.executions_per_interval = 0 - - def get_next_execution_timestamp(self) -> float: - """ - Get the timestamp for the next execution of the callback - This is done using execution total, the interval and the start timestamp - :return: datetime - """ - return ( - self.start_timestamp + timedelta(seconds=self.interval.total_seconds() * (self.iterations or 1)) - ).timestamp() diff --git a/dynatrace_extension/sdk/extension.py b/dynatrace_extension/sdk/extension.py index 36fbe86..1de185f 100644 --- a/dynatrace_extension/sdk/extension.py +++ b/dynatrace_extension/sdk/extension.py @@ -3,14 +3,12 @@ # SPDX-License-Identifier: MIT import logging -import sched import signal import sys import threading import time from argparse import ArgumentParser from collections.abc import Callable -from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone from enum import Enum from itertools import chain @@ -18,6 +16,10 @@ from threading import Lock, RLock, active_count from typing import Any, ClassVar, NamedTuple +from apscheduler.executors.pool import ThreadPoolExecutor # type: ignore +from apscheduler.schedulers.background import BackgroundScheduler # type: ignore +from apscheduler.triggers.interval import IntervalTrigger # type: ignore + from .activation import ActivationConfig, ActivationType from .callback import WrappedCallback from .communication import CommunicationClient, DebugClient, HttpClient @@ -172,6 +174,12 @@ def _add_sfm_metric(metric: Metric, sfm_metrics: list[Metric] | None = None): sfm_metrics.append(metric) +class ExecutorType(str, Enum): + CALLBACKS = "callbacks" + INTERNAL = "internal" + HEARTBEAT = "heartbeat" + + class Extension: """Base class for Python extensions. @@ -240,21 +248,14 @@ def __init__(self, name: str = "") -> None: self._running_callbacks: dict[int, WrappedCallback] = {} self._running_callbacks_lock: Lock = Lock() - self._scheduler = sched.scheduler(time.time, time.sleep) - - # Timestamps for scheduling of internal callbacks - self._next_internal_callbacks_timestamps: dict[str, datetime] = { - "timediff": datetime.now() + TIME_DIFF_INTERVAL, - "heartbeat": datetime.now() + HEARTBEAT_INTERVAL, - "metrics": datetime.now() + METRIC_SENDING_INTERVAL, - "events": datetime.now() + METRIC_SENDING_INTERVAL, - "sfm_metrics": datetime.now() + SFM_METRIC_SENDING_INTERVAL, - } - - # Executors for the callbacks and internal methods - self._callbacks_executor = ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE) - self._internal_executor = ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE) - self._heartbeat_executor = ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE) + # Scheduler and executors for the callbacks and internal methods + self._scheduler = BackgroundScheduler( + executors={ + ExecutorType.CALLBACKS: ThreadPoolExecutor(max_workers=CALLBACKS_THREAD_POOL_SIZE), + ExecutorType.INTERNAL: ThreadPoolExecutor(max_workers=INTERNAL_THREAD_POOL_SIZE), + ExecutorType.HEARTBEAT: ThreadPoolExecutor(max_workers=HEARTBEAT_THREAD_POOL_SIZE), + } + ) # Extension metrics self._metrics_lock = RLock() @@ -376,7 +377,14 @@ def _schedule_callback(self, callback: WrappedCallback): callback.cluster_time_diff = self._cluster_time_diff callback.running_in_sim = self._running_in_sim self._scheduled_callbacks.append(callback) - self._scheduler.enter(callback.initial_wait_time(), 1, self._callback_iteration, (callback,)) + + self._scheduler.add_job( + self._run_callback, + args=[callback], + executor=ExecutorType.CALLBACKS, + trigger=IntervalTrigger(seconds=callback.interval.total_seconds()), + next_run_time=datetime.now() + timedelta(seconds=callback.initial_wait_time()), + ) def schedule( self, @@ -809,7 +817,15 @@ def _parse_args(self): if not self._is_fastcheck: try: - self._heartbeat_iteration() + # TODO: is it surely okay to schedule hearbeat this way? Originally it was scheduled in the very same scheduler, + # which would starve heartbeat if any callback took too long + # On the other hand, those callbacks inserted specific potentially risky jobs to different executors, so it should be okay? + # Why did heartbeat have a different priority (higher or lower?) + self._scheduler.add_job( + self._heartbeat, + executor=ExecutorType.HEARTBEAT, + trigger=IntervalTrigger(seconds=HEARTBEAT_INTERVAL.total_seconds()), + ) self.initialize() if not self.is_helper: self.schedule(self.query, timedelta(minutes=1)) @@ -863,48 +879,48 @@ def _run_callback(self, callback: WrappedCallback): with self._running_callbacks_lock: self._running_callbacks.pop(current_thread_id, None) - def _callback_iteration(self, callback: WrappedCallback): - self._callbacks_executor.submit(self._run_callback, callback) - callback.iterations += 1 - next_timestamp = callback.get_next_execution_timestamp() - self._scheduler.enterabs(next_timestamp, 1, self._callback_iteration, (callback,)) - def _start_extension_loop(self): api_logger.debug(f"Starting main loop for monitoring configuration: '{self.monitoring_config_name}'") # These were scheduled before the extension started, schedule them now for callback in self._scheduled_callbacks_before_run: self._schedule_callback(callback) - self._metrics_iteration() - self._events_iteration() - self._sfm_metrics_iteration() - self._timediff_iteration() - self._scheduler.run() - - def _timediff_iteration(self): - self._internal_executor.submit(self._update_cluster_time_diff) - next_timestamp = self._get_and_set_next_internal_callback_timestamp("timediff", TIME_DIFF_INTERVAL) - self._scheduler.enterabs(next_timestamp, 1, self._timediff_iteration) - - def _heartbeat_iteration(self): - self._heartbeat_executor.submit(self._heartbeat) - next_timestamp = self._get_and_set_next_internal_callback_timestamp("heartbeat", HEARTBEAT_INTERVAL) - self._scheduler.enterabs(next_timestamp, 2, self._heartbeat_iteration) - - def _metrics_iteration(self): - self._internal_executor.submit(self._send_metrics) - next_timestamp = self._get_and_set_next_internal_callback_timestamp("metrics", METRIC_SENDING_INTERVAL) - self._scheduler.enterabs(next_timestamp, 1, self._metrics_iteration) - - def _events_iteration(self): - self._internal_executor.submit(self._send_buffered_events) - next_timestamp = self._get_and_set_next_internal_callback_timestamp("events", METRIC_SENDING_INTERVAL) - self._scheduler.enterabs(next_timestamp, 1, self._events_iteration) - - def _sfm_metrics_iteration(self): - self._internal_executor.submit(self._send_sfm_metrics) - next_timestamp = self._get_and_set_next_internal_callback_timestamp("sfm_metrics", SFM_METRIC_SENDING_INTERVAL) - self._scheduler.enterabs(next_timestamp, 1, self._sfm_metrics_iteration) + + self._scheduler.add_job( + self._send_metrics, + executor=ExecutorType.INTERNAL, + trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), + next_run_time=datetime.now(), + ) + + self._scheduler.add_job( + self._send_buffered_events, + executor=ExecutorType.INTERNAL, + trigger=IntervalTrigger(seconds=METRIC_SENDING_INTERVAL.total_seconds()), + next_run_time=datetime.now(), + ) + + self._scheduler.add_job( + self._send_sfm_metrics, + executor=ExecutorType.INTERNAL, + trigger=IntervalTrigger(seconds=SFM_METRIC_SENDING_INTERVAL.total_seconds()), + next_run_time=datetime.now(), + ) + + self._scheduler.add_job( + self._update_cluster_time_diff, + executor=ExecutorType.INTERNAL, + trigger=IntervalTrigger(seconds=TIME_DIFF_INTERVAL.total_seconds()), + next_run_time=datetime.now(), + ) + + self._scheduler.start() + + try: + while self._scheduler.running: + time.sleep(1) + except Exception: + self._scheduler.shutdown() def _send_metrics(self): with self._metrics_lock: @@ -1105,8 +1121,8 @@ def _heartbeat(self): api_logger.error(f"Heartbeat failed because {e}, response {response}", exc_info=True) def __del__(self): - self._callbacks_executor.shutdown() - self._internal_executor.shutdown() + if self._scheduler.running: + self._scheduler.shutdown() def _add_metric(self, metric: Metric): metric.validate() @@ -1150,7 +1166,7 @@ def _send_events_internal(self, events: dict | list[dict]): def _send_events(self, events: dict | list[dict], send_immediately: bool = False): if send_immediately: - self._internal_executor.submit(self._send_events_internal, events) + self._scheduler.add_job(self._send_events_internal, args=[events], executor=ExecutorType.INTERNAL) return with self._logs_lock: if isinstance(events, dict): @@ -1169,11 +1185,6 @@ def _send_buffered_events(self): def _send_dt_event(self, event: dict[str, str | int | dict[str, str]]): self._client.send_dt_event(event) - def _get_and_set_next_internal_callback_timestamp(self, callback_name: str, interval: timedelta): - next_timestamp = self._next_internal_callbacks_timestamps[callback_name] - self._next_internal_callbacks_timestamps[callback_name] += interval - return next_timestamp.timestamp() - def get_version(self) -> str: """Return the extension version.""" return self.activation_config.version @@ -1247,4 +1258,4 @@ def _send_sfm_logs(self, logs: dict | list[dict]): log["dt.extension.config.label"] = self.monitoring_config_name log.pop("monitoring.configuration", None) - self._internal_executor.submit(self._send_sfm_logs_internal, logs) + self._scheduler.add_job(self._send_sfm_logs_internal, args=[logs], executor=ExecutorType.INTERNAL) diff --git a/dynatrace_extension/sdk/status.py b/dynatrace_extension/sdk/status.py index 21c02d9..0a3e31f 100644 --- a/dynatrace_extension/sdk/status.py +++ b/dynatrace_extension/sdk/status.py @@ -159,6 +159,7 @@ def __init__(self, send_sfm_logs_function: Callable) -> None: self._ep_records: dict[str, EndpointStatusRecord] = {} self._send_sfm_logs_function = send_sfm_logs_function self._logs_to_send: list[str] = [] + self._datetime_now = datetime.now # Mockable datetime function def contains_any_status(self) -> bool: return len(self._ep_records) > 0 @@ -190,7 +191,7 @@ def send_ep_logs(self): ep_record.ep_status.message, ) ) - ep_record.last_sent = datetime.now() + ep_record.last_sent = self._datetime_now() ep_record.state = StatusState.ONGOING if logs_to_send: @@ -202,7 +203,7 @@ def _should_be_reported(self, ep_record: EndpointStatusRecord): elif ep_record.state in (StatusState.INITIAL, StatusState.NEW): return True elif ep_record.state == StatusState.ONGOING and ( - ep_record.last_sent is None or datetime.now() - ep_record.last_sent >= self.RESENDING_INTERVAL + ep_record.last_sent is None or self._datetime_now() - ep_record.last_sent >= self.RESENDING_INTERVAL ): return True else: diff --git a/pyproject.toml b/pyproject.toml index 5dd5b21..578e453 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] -dependencies = [] +dependencies = ["apscheduler"] [project.optional-dependencies] cli = [ "dt-cli>=1.6.13", "typer[all]", "pyyaml", "ruff"] @@ -47,8 +47,7 @@ dependencies = [ "pytest", "typer[all]", "pyyaml", - "dt-cli>=1.6.13", - "freezegun" + "dt-cli>=1.6.13" ] [tool.hatch.envs.default.scripts] @@ -74,8 +73,8 @@ dependencies = [ "ruff>=0.9.10", "typer[all]", "pyyaml", - "pytest", - "freezegun" + "pytest", + "apscheduler" ] [tool.hatch.envs.lint.scripts] diff --git a/tests/sdk/test_endpoints_sfm.py b/tests/sdk/test_endpoints_sfm.py index 3f8294c..3e06f82 100644 --- a/tests/sdk/test_endpoints_sfm.py +++ b/tests/sdk/test_endpoints_sfm.py @@ -1,52 +1,42 @@ -import threading import time import unittest from datetime import datetime, timedelta from unittest.mock import MagicMock -from freezegun import freeze_time - from dynatrace_extension import EndpointStatus, EndpointStatuses, Extension, Severity, StatusValue -class KillSchedulerError(Exception): - pass - - class TestSfmPerEndpont(unittest.TestCase): def setUp(self, extension_name=""): self.ext = Extension(name=extension_name) self.ext.logger = MagicMock() self.ext._running_in_sim = True self.ext._client = MagicMock() + self.ext._client.send_sfm_logs = MagicMock() self.ext._is_fastcheck = False self.i = 0 self.test_cases = None self.time_machine_idx = None self.ext.schedule(self.callback, timedelta(seconds=1)) - self.scheduler_thread = threading.Thread(target=self.scheduler_thread_fun) - - def scheduler_thread_fun(self): - with self.assertRaises(KillSchedulerError): - self.ext._scheduler.run() def tearDown(self) -> None: - self.ext._scheduler.enter(delay=0, priority=1, action=lambda: (_ for _ in ()).throw(KillSchedulerError())) - self.scheduler_thread.join() + if Extension._instance and Extension._instance._scheduler.running: + Extension._instance._scheduler.shutdown(wait=True) + Extension._instance = None def run_test(self): - self.scheduler_thread.start() + self.ext._scheduler.start() time.sleep(0.1) for case in self.test_cases[: self.time_machine_idx]: self.single_test_iteration(case) if self.time_machine_idx: - with freeze_time(datetime.now() + timedelta(hours=2), tick=True): - for case in self.test_cases[self.time_machine_idx :]: - self.single_test_iteration(case) + self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=2) + for case in self.test_cases[self.time_machine_idx :]: + self.single_test_iteration(case) def single_test_iteration(self, case): self.ext._client.send_sfm_logs.reset_mock() @@ -247,7 +237,7 @@ def test_endpoint_independent_ongoing(self): ), }, { - "status": EndpointStatus("1.2.3.4:1", StatusValue.WARNING, "Warning 1"), + "status": None, "expected": self.expected_sfm_dict( device_address="1.2.3.4:1", level="WARN", @@ -257,7 +247,7 @@ def test_endpoint_independent_ongoing(self): ), }, { - "status": EndpointStatus("1.2.3.4:2", StatusValue.WARNING, "Warning 2"), + "status": None, "expected": self.expected_sfm_dict( device_address="1.2.3.4:2", level="WARN", @@ -287,19 +277,21 @@ def test_endpoint_independent_ongoing(self): }, ] - self.scheduler_thread.start() + self.ext._scheduler.start() time.sleep(0.1) self.single_test_iteration(self.test_cases[0]) - with freeze_time(datetime.now() + timedelta(hours=1), tick=True): - self.single_test_iteration(self.test_cases[1]) + self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=1) + self.single_test_iteration(self.test_cases[1]) + + self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=2) + self.single_test_iteration(self.test_cases[2]) - with freeze_time(datetime.now() + timedelta(hours=1), tick=True): - self.single_test_iteration(self.test_cases[2]) + self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=3) + self.single_test_iteration(self.test_cases[3]) - with freeze_time(datetime.now() + timedelta(hours=1), tick=True): - self.single_test_iteration(self.test_cases[3]) + self.ext._ep_statuses._datetime_now = lambda: datetime.now() + timedelta(hours=5) + self.single_test_iteration(self.test_cases[4]) - with freeze_time(datetime.now() + timedelta(hours=2), tick=True): - self.single_test_iteration(self.test_cases[4]) + self.ext._scheduler.shutdown(wait=True) diff --git a/tests/sdk/test_extension.py b/tests/sdk/test_extension.py index 0648479..ea0ca64 100644 --- a/tests/sdk/test_extension.py +++ b/tests/sdk/test_extension.py @@ -1,7 +1,7 @@ import threading import time import unittest -from datetime import datetime, timedelta +from datetime import timedelta from unittest.mock import MagicMock, mock_open, patch import pytest @@ -19,6 +19,9 @@ class TestExtension(unittest.TestCase): def tearDown(self) -> None: + if Extension._instance and Extension._instance._scheduler.running: + Extension._instance._scheduler.shutdown(wait=True) + Extension._instance = None Extension.schedule_decorators = [] @@ -27,8 +30,8 @@ def test_heartbeat_called(self, mock_extension): extension = Extension() extension.logger = MagicMock() extension._running_in_sim = True - extension._next_heartbeat = datetime.now() - extension._heartbeat_iteration() + extension._scheduler.start() + time.sleep(50.1) extension._heartbeat.assert_called() def test_loglevel(self): @@ -63,8 +66,8 @@ def test_metrics_flushed(self): extension.report_metric("my_metric", 1) self.assertEqual(len(extension._metrics), 1) - extension._metrics_iteration() - time.sleep(0.01) + extension._send_metrics() + time.sleep(0.1) with extension._metrics_lock: self.assertEqual(len(extension._metrics), 0) @@ -107,8 +110,8 @@ def test_flush_events(self): extension.report_event("my_event1", "my_description") extension.report_event("my_event1", "my_description") self.assertEqual(len(extension._logs), 2) - extension._events_iteration() - time.sleep(0.01) + extension._send_buffered_events() + time.sleep(0.1) with extension._logs_lock: self.assertEqual(len(extension._logs), 0) @@ -129,9 +132,11 @@ def callback(): extension.schedule(callback, timedelta(seconds=1)) self.assertEqual(len(extension._scheduled_callbacks), 3) - extension._scheduler.run(blocking=False) - time.sleep(0.01) - self.assertEqual(extension._run_callback.call_count, 3) + extension._scheduler.start() + time.sleep(0.1) + + # call_count expected to be 2 because scheduled `query` method used the original, nonmocked _run_callback + self.assertEqual(extension._run_callback.call_count, 2) def test_callback_scheduled_multiple_times(self): extension = Extension() @@ -148,8 +153,8 @@ def callback(): extension.schedule(callback, timedelta(seconds=1)) extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(0.1) self.assertEqual(extension._scheduled_callbacks[0].executions_total, 1) self.assertEqual(extension._scheduled_callbacks[1].executions_total, 1) @@ -169,9 +174,9 @@ def test_big_number_callbacks_scheduled(self): args=(i,), ) # run scheduler once and flush metrics - extension._scheduler.run(blocking=False) + extension._scheduler.start() time.sleep(0.1) - extension._metrics_iteration() + extension._send_metrics() def test_callback_from_init(self): class MyExt(Extension): @@ -188,8 +193,8 @@ def initialize(self): extension._is_fastcheck = False extension._client = MagicMock() - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(0.1) self.assertEqual(len(extension._scheduled_callbacks), 2) self.assertEqual(extension._scheduled_callbacks[0].executions_total, 1) @@ -222,22 +227,22 @@ def another_callback(self): extension.schedule(extension.callback_that_schedules_another_callback, timedelta(seconds=1)) self.assertEqual(len(extension._scheduled_callbacks), 2) - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(0.1) self.assertEqual(len(extension._scheduled_callbacks), 3) self.assertEqual(extension._scheduled_callbacks[1].executions_total, 1) self.assertEqual(extension.callback_that_schedules_another_callback_call_count, 1) - extension._scheduler.run(blocking=False) + self.assertEqual(extension._scheduled_callbacks[2].executions_total, 1) + self.assertEqual(extension.another_callback_call_count, 1) time.sleep(1) self.assertEqual(len(extension._scheduled_callbacks), 3) self.assertEqual(extension._scheduled_callbacks[1].executions_total, 2) self.assertEqual(extension.callback_that_schedules_another_callback_call_count, 2) - self.assertGreaterEqual(extension._scheduled_callbacks[1].executions_total, 1) - self.assertGreaterEqual(extension.callback_that_schedules_another_callback_call_count, 1) + self.assertEqual(extension._scheduled_callbacks[2].executions_total, 2) + self.assertEqual(extension.another_callback_call_count, 2) - extension._scheduler.run(blocking=False) time.sleep(1) assert len(extension._scheduled_callbacks) == 3 @@ -264,10 +269,8 @@ def callback(): def run_scheduler(): nonlocal extension, callback, callback_wait, callback_call_count extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) - time.sleep(1) - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(2) if callback_call_count < 2: callback_wait.notify() @@ -289,8 +292,8 @@ def callback(self): extension = MyExt() - extension._scheduler.run(blocking=False) - time.sleep(0.01) + extension._scheduler.start() + time.sleep(0.1) self.assertEqual(len(extension._scheduled_callbacks), 2) self.assertTrue(extension.called_callback) @@ -304,7 +307,7 @@ def callback(): callback_done = True extension = _HelperExtension() - extension._scheduler.run(blocking=False) + extension._scheduler.start() time.sleep(1) self.assertEqual(len(extension._scheduled_callbacks), 1) @@ -326,10 +329,9 @@ def callback(): raise RuntimeError(msg) extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(0.1) self.assertEqual(extension._build_current_status().status, StatusValue.GENERIC_ERROR) - extension._scheduler.run(blocking=False) time.sleep(1) self.assertEqual(extension._build_current_status().status, StatusValue.OK) @@ -509,11 +511,11 @@ def test_sfm_ok(self): # extension._run_callback = MagicMock() def callback(): - time.sleep(0.01) + time.sleep(0.1) return 1 extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) + extension._scheduler.start() time.sleep(0.1) sfm = extension._prepare_sfm_metrics() expected_values = { @@ -539,8 +541,8 @@ def callback(): return 1 extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) - time.sleep(2) + extension._scheduler.start() + time.sleep(1.5) sfm = extension._prepare_sfm_metrics() expected_values = { "dsfm:datasource.python.threads": 0, @@ -565,8 +567,8 @@ def callback(): raise Exception(msg) extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) - time.sleep(1) + extension._scheduler.start() + time.sleep(0.1) sfm = extension._prepare_sfm_metrics() expected_values = { "dsfm:datasource.python.threads": 0, diff --git a/tests/sdk/test_status.py b/tests/sdk/test_status.py index 2a72d9a..988ba93 100644 --- a/tests/sdk/test_status.py +++ b/tests/sdk/test_status.py @@ -1,4 +1,3 @@ -import threading import time import unittest from datetime import timedelta @@ -10,6 +9,8 @@ class TestStatus(unittest.TestCase): def tearDown(self) -> None: + if Extension._instance and Extension._instance._scheduler.running: + Extension._instance._scheduler.shutdown(wait=True) Extension._instance = None def test_status(self): @@ -28,6 +29,8 @@ def callback(): ext._client = DebugClient("", "", MagicMock()) ext._is_fastcheck = False ext.schedule(callback, timedelta(seconds=1)) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -45,8 +48,8 @@ def bad_method(): raise Exception(msg) ext.schedule(bad_method, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.GENERIC_ERROR) @@ -69,7 +72,7 @@ def bad_method_2(): ext.schedule(bad_method_1, timedelta(seconds=1)) ext.schedule(bad_method_2, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -88,7 +91,7 @@ def callback(): time.sleep(1) extension.schedule(callback, timedelta(seconds=1)) - extension._scheduler.run(blocking=False) + extension._scheduler.start() time.sleep(2) self.assertTrue(extension._scheduled_callbacks[1].status.is_error()) @@ -105,8 +108,8 @@ def callback(): return Status(StatusValue.OK, "foo1") ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -127,8 +130,8 @@ def custom_query(): ext.schedule(callback, timedelta(seconds=1)) ext.schedule(custom_query, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -147,7 +150,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -167,7 +170,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -189,7 +192,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -211,7 +214,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -233,7 +236,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -254,7 +257,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -275,7 +278,7 @@ def callback(): return ret ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) + ext._scheduler.start() time.sleep(1) status = ext._build_current_status() @@ -300,8 +303,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -328,8 +331,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.WARNING) @@ -362,8 +365,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.GENERIC_ERROR) @@ -385,8 +388,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -407,8 +410,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.WARNING) @@ -432,8 +435,8 @@ def callback(): return statuses ext.schedule(callback, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.GENERIC_ERROR) @@ -462,8 +465,8 @@ def callback_ep_status_2(): ext.schedule(callback_ep_status_1, timedelta(seconds=1)) ext.schedule(callback_ep_status_2, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -488,8 +491,8 @@ def callback_ep_status_2(): ext.schedule(callback_ep_status_1, timedelta(seconds=1)) ext.schedule(callback_ep_status_2, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.GENERIC_ERROR) @@ -517,8 +520,8 @@ def callback_ep_status_2(): ext.schedule(callback_ep_status_1, timedelta(seconds=1)) ext.schedule(callback_ep_status_2, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.WARNING) @@ -548,8 +551,8 @@ def callback_status(): ext.schedule(callback_ep_status, timedelta(seconds=1)) ext.schedule(callback_multistatus, timedelta(seconds=1)) ext.schedule(callback_status, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.GENERIC_ERROR) @@ -583,8 +586,8 @@ def callback_status(): ext.schedule(callback_ep_status, timedelta(seconds=1)) ext.schedule(callback_multistatus, timedelta(seconds=1)) ext.schedule(callback_status, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.OK) @@ -614,8 +617,8 @@ def callback_status(): ext.schedule(callback_ep_status, timedelta(seconds=1)) ext.schedule(callback_multistatus, timedelta(seconds=1)) ext.schedule(callback_status, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.WARNING) @@ -648,8 +651,8 @@ def callback_status(): ext.schedule(callback_ep_status, timedelta(seconds=1)) ext.schedule(callback_multistatus, timedelta(seconds=1)) ext.schedule(callback_status, timedelta(seconds=1)) - ext._scheduler.run(blocking=False) - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) status = ext._build_current_status() self.assertEqual(status.status, StatusValue.WARNING) @@ -692,19 +695,8 @@ def regular_callback(): ext.schedule(skipped_callback, timedelta(seconds=10)) # called only once during test ext.schedule(regular_callback, timedelta(seconds=1)) - # Runngin scheduler in another thread as we need it to run in parallel in this test - class KillSchedulerError(Exception): - pass - - def scheduler_thread_impl(ext: Extension): - try: - ext._scheduler.run(blocking=True) - except KillSchedulerError: - pass - - scheduler_thread = threading.Thread(target=scheduler_thread_impl, args=(ext,)) - scheduler_thread.start() - time.sleep(0.01) + ext._scheduler.start() + time.sleep(0.1) # 5 second of test for _ in range(5): @@ -719,9 +711,6 @@ def scheduler_thread_impl(ext: Extension): ) time.sleep(1) - ext._scheduler.enter(delay=0, priority=1, action=lambda: (_ for _ in ()).throw(KillSchedulerError())) - scheduler_thread.join() - # Confirm schedulered called callbacks as requested self.assertEqual(skipped_callback_call_counter, 1) self.assertEqual(regular_callback_call_counter, 6)