From c995023bed9a85ffd06fde43c7df0527230b3bd5 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 1 Jul 2025 14:33:49 -0400 Subject: [PATCH] fix:signal on non-main thread #284 --- scheduler/decorators.py | 6 ++--- scheduler/helpers/callback.py | 2 +- scheduler/{ => helpers}/timeouts.py | 17 ++++++++++---- scheduler/settings.py | 1 - scheduler/tests/test_job_decorator.py | 33 ++++++++++++++++++++++----- scheduler/types/settings_types.py | 6 ++--- scheduler/worker/worker.py | 2 +- 7 files changed, 48 insertions(+), 19 deletions(-) rename scheduler/{ => helpers}/timeouts.py (85%) diff --git a/scheduler/decorators.py b/scheduler/decorators.py index 72e15cb..08472c1 100644 --- a/scheduler/decorators.py +++ b/scheduler/decorators.py @@ -18,9 +18,9 @@ def __init__( at_front: bool = False, meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, - on_failure: Optional[Union["Callback", Callable[..., Any]]] = None, - on_success: Optional[Union["Callback", Callable[..., Any]]] = None, - on_stopped: Optional[Union["Callback", Callable[..., Any]]] = None, + on_failure: Optional[Union[Callback, Callable[..., Any]]] = None, + on_success: Optional[Union[Callback, Callable[..., Any]]] = None, + on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None, ): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string diff --git a/scheduler/helpers/callback.py b/scheduler/helpers/callback.py index 6250d4f..cfb85e2 100644 --- a/scheduler/helpers/callback.py +++ b/scheduler/helpers/callback.py @@ -2,7 +2,7 @@ from typing import Union, Callable, Any, Optional from scheduler.helpers.utils import callable_func -from scheduler.timeouts import JobTimeoutException +from scheduler.helpers.timeouts import JobTimeoutException class CallbackSetupError(Exception): diff --git a/scheduler/timeouts.py b/scheduler/helpers/timeouts.py similarity index 85% rename from scheduler/timeouts.py rename to scheduler/helpers/timeouts.py index 1bed3ee..15d4bf3 100644 --- a/scheduler/timeouts.py +++ b/scheduler/helpers/timeouts.py @@ -1,7 +1,10 @@ import ctypes +import logging import signal import threading +logger = logging.getLogger("scheduler") + class BaseTimeoutException(Exception): """Base exception for timeouts.""" @@ -59,13 +62,19 @@ def handle_death_penalty(self, signum, frame) -> None: def setup_death_penalty(self) -> None: """Sets up an alarm signal and a signal handler that raises an exception after the timeout amount (expressed in seconds).""" - signal.signal(signal.SIGALRM, self.handle_death_penalty) - signal.alarm(self._timeout) + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + else: + logger.warning(f"Ignoring death penalty setup in non-main thread `{threading.current_thread().name}`.") def cancel_death_penalty(self) -> None: """Removes the death penalty alarm and puts back the system into default signal handling.""" - signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) + if threading.current_thread() is threading.main_thread(): + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + else: + logger.warning(f"Ignoring death penalty cancel in non-main thread `{threading.current_thread().name}`.") class TimerDeathPenalty(BaseDeathPenalty): diff --git a/scheduler/settings.py b/scheduler/settings.py index 405c5ab..0e80768 100644 --- a/scheduler/settings.py +++ b/scheduler/settings.py @@ -7,7 +7,6 @@ from scheduler.types import SchedulerConfiguration, QueueConfiguration logger = logging.getLogger("scheduler") -logging.basicConfig(level=logging.DEBUG) _QUEUES: Dict[str, QueueConfiguration] = dict() SCHEDULER_CONFIG: SchedulerConfiguration = SchedulerConfiguration() diff --git a/scheduler/tests/test_job_decorator.py b/scheduler/tests/test_job_decorator.py index bd437f5..7175118 100644 --- a/scheduler/tests/test_job_decorator.py +++ b/scheduler/tests/test_job_decorator.py @@ -1,3 +1,4 @@ +import threading import time from django.test import TestCase @@ -45,16 +46,21 @@ def __eq__(self, other): @job() -def long_running_func(x): +def func_with_param(x): x.run() +@job(timeout=1) +def long_running_func(): + time.sleep(1000) + + class JobDecoratorTest(TestCase): def setUp(self) -> None: get_queue("default").connection.flushall() def test_all_job_methods_registered(self): - self.assertEqual(5, len(JOB_METHODS_LIST)) + self.assertEqual(6, len(JOB_METHODS_LIST)) def test_job_decorator_no_params(self): test_job.delay() @@ -104,15 +110,13 @@ def _assert_job_with_func_and_props(self, queue_name, expected_func, expected_re def test_job_decorator_bad_queue(self): with self.assertRaises(settings.QueueNotFoundError): - @job("bad-queue") def test_job_bad_queue(): - time.sleep(1) return 1 + 1 def test_job_decorator_delay_with_param(self): queue_name = "default" - long_running_func.delay(MyClass()) + func_with_param.delay(MyClass()) worker = create_worker(queue_name, burst=True) worker.work() @@ -120,7 +124,24 @@ def test_job_decorator_delay_with_param(self): jobs_list = worker.queues[0].get_all_jobs() self.assertEqual(1, len(jobs_list)) job = jobs_list[0] - self.assertEqual(job.func, long_running_func) + self.assertEqual(job.func, func_with_param) self.assertEqual(job.kwargs, {}) self.assertEqual(job.status, JobStatus.FINISHED) self.assertEqual(job.args, (MyClass(),)) + + def test_job_decorator_delay_with_param_worker_thread(self): + queue_name = "default" + + long_running_func.delay() + + worker = create_worker(queue_name, burst=True) + t = threading.Thread(target=worker.work) + t.start() + t.join() + + jobs_list = get_queue(queue_name).get_all_jobs() + self.assertEqual(1, len(jobs_list)) + j = jobs_list[0] + self.assertEqual(j.func, long_running_func) + self.assertEqual(j.kwargs, {}) + self.assertEqual(j.status, JobStatus.FAILED) diff --git a/scheduler/types/settings_types.py b/scheduler/types/settings_types.py index 82adc54..40eb761 100644 --- a/scheduler/types/settings_types.py +++ b/scheduler/types/settings_types.py @@ -1,9 +1,9 @@ import sys from dataclasses import dataclass from enum import Enum -from typing import Callable, Dict, Optional, List, Tuple, Any, Type +from typing import Callable, Dict, Optional, List, Tuple, Any, Type, ClassVar -from scheduler.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty +from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty if sys.version_info >= (3, 11): from typing import Self @@ -45,7 +45,7 @@ class SchedulerConfiguration: @dataclass(slots=True, frozen=True, kw_only=True) class QueueConfiguration: - __CONNECTION_FIELDS__ = { + __CONNECTION_FIELDS__: ClassVar[Dict] = { "URL", "DB", "UNIX_SOCKET_PATH", diff --git a/scheduler/worker/worker.py b/scheduler/worker/worker.py index ed3e5cd..77f0395 100644 --- a/scheduler/worker/worker.py +++ b/scheduler/worker/worker.py @@ -35,7 +35,7 @@ from contextlib import suppress from scheduler.helpers.queues import Queue, perform_job -from scheduler.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException +from scheduler.helpers.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException from scheduler.helpers.utils import utcnow, current_timestamp try: