Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions scheduler/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def __init__(
at_front: bool = False,
meta: Optional[Dict[Any, Any]] = None,
description: Optional[str] = None,
on_failure: Optional[Union["Callback", Callable[..., Any]]] = None,
on_success: Optional[Union["Callback", Callable[..., Any]]] = None,
on_stopped: Optional[Union["Callback", Callable[..., Any]]] = None,
on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
):
"""A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when
called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string
Expand Down
2 changes: 1 addition & 1 deletion scheduler/helpers/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Union, Callable, Any, Optional

from scheduler.helpers.utils import callable_func
from scheduler.timeouts import JobTimeoutException
from scheduler.helpers.timeouts import JobTimeoutException


class CallbackSetupError(Exception):
Expand Down
17 changes: 13 additions & 4 deletions scheduler/timeouts.py → scheduler/helpers/timeouts.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import ctypes
import logging
import signal
import threading

logger = logging.getLogger("scheduler")


class BaseTimeoutException(Exception):
"""Base exception for timeouts."""
Expand Down Expand Up @@ -59,13 +62,19 @@ def handle_death_penalty(self, signum, frame) -> None:
def setup_death_penalty(self) -> None:
"""Sets up an alarm signal and a signal handler that raises an exception after the timeout amount
(expressed in seconds)."""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)
if threading.current_thread() is threading.main_thread():
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)
else:
logger.warning(f"Ignoring death penalty setup in non-main thread `{threading.current_thread().name}`.")

def cancel_death_penalty(self) -> None:
"""Removes the death penalty alarm and puts back the system into default signal handling."""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
if threading.current_thread() is threading.main_thread():
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
else:
logger.warning(f"Ignoring death penalty cancel in non-main thread `{threading.current_thread().name}`.")


class TimerDeathPenalty(BaseDeathPenalty):
Expand Down
1 change: 0 additions & 1 deletion scheduler/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from scheduler.types import SchedulerConfiguration, QueueConfiguration

logger = logging.getLogger("scheduler")
logging.basicConfig(level=logging.DEBUG)

_QUEUES: Dict[str, QueueConfiguration] = dict()
SCHEDULER_CONFIG: SchedulerConfiguration = SchedulerConfiguration()
Expand Down
33 changes: 27 additions & 6 deletions scheduler/tests/test_job_decorator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import time

from django.test import TestCase
Expand Down Expand Up @@ -45,16 +46,21 @@ def __eq__(self, other):


@job()
def long_running_func(x):
def func_with_param(x):
x.run()


@job(timeout=1)
def long_running_func():
time.sleep(1000)


class JobDecoratorTest(TestCase):
def setUp(self) -> None:
get_queue("default").connection.flushall()

def test_all_job_methods_registered(self):
self.assertEqual(5, len(JOB_METHODS_LIST))
self.assertEqual(6, len(JOB_METHODS_LIST))

def test_job_decorator_no_params(self):
test_job.delay()
Expand Down Expand Up @@ -104,23 +110,38 @@ def _assert_job_with_func_and_props(self, queue_name, expected_func, expected_re

def test_job_decorator_bad_queue(self):
with self.assertRaises(settings.QueueNotFoundError):

@job("bad-queue")
def test_job_bad_queue():
time.sleep(1)
return 1 + 1

def test_job_decorator_delay_with_param(self):
queue_name = "default"
long_running_func.delay(MyClass())
func_with_param.delay(MyClass())

worker = create_worker(queue_name, burst=True)
worker.work()

jobs_list = worker.queues[0].get_all_jobs()
self.assertEqual(1, len(jobs_list))
job = jobs_list[0]
self.assertEqual(job.func, long_running_func)
self.assertEqual(job.func, func_with_param)
self.assertEqual(job.kwargs, {})
self.assertEqual(job.status, JobStatus.FINISHED)
self.assertEqual(job.args, (MyClass(),))

def test_job_decorator_delay_with_param_worker_thread(self):
queue_name = "default"

long_running_func.delay()

worker = create_worker(queue_name, burst=True)
t = threading.Thread(target=worker.work)
t.start()
t.join()

jobs_list = get_queue(queue_name).get_all_jobs()
self.assertEqual(1, len(jobs_list))
j = jobs_list[0]
self.assertEqual(j.func, long_running_func)
self.assertEqual(j.kwargs, {})
self.assertEqual(j.status, JobStatus.FAILED)
6 changes: 3 additions & 3 deletions scheduler/types/settings_types.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sys
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, Optional, List, Tuple, Any, Type
from typing import Callable, Dict, Optional, List, Tuple, Any, Type, ClassVar

from scheduler.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty

if sys.version_info >= (3, 11):
from typing import Self
Expand Down Expand Up @@ -45,7 +45,7 @@ class SchedulerConfiguration:

@dataclass(slots=True, frozen=True, kw_only=True)
class QueueConfiguration:
__CONNECTION_FIELDS__ = {
__CONNECTION_FIELDS__: ClassVar[Dict] = {
"URL",
"DB",
"UNIX_SOCKET_PATH",
Expand Down
2 changes: 1 addition & 1 deletion scheduler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from contextlib import suppress

from scheduler.helpers.queues import Queue, perform_job
from scheduler.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
from scheduler.helpers.timeouts import JobExecutionMonitorTimeoutException, JobTimeoutException
from scheduler.helpers.utils import utcnow, current_timestamp

try:
Expand Down
Loading