Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ options:
--max-jobs MAX_JOBS Maximum number of jobs to execute before terminating worker
--fork-job-execution FORK_JOB_EXECUTION
Fork job execution to another process
--job-class JOB_CLASS
Jobs class to use
--sentry-dsn SENTRY_DSN
Sentry DSN to use
--sentry-debug Enable Sentry debug mode
Expand Down
165 changes: 59 additions & 106 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ python = "^3.10"
django = ">=5"
croniter = ">=2.0"
click = "^8.1"
rq = "^1.16"
pyyaml = { version = "^6.0", optional = true }
valkey = { version = "^6.0.2", optional = true}
sentry-sdk = { version = "^2.19", optional = true }

[tool.poetry.dev-dependencies]
poetry = "^2.0.1"
Expand All @@ -61,6 +61,7 @@ freezegun = "^1.5"
[tool.poetry.extras]
yaml = ["pyyaml"]
valkey = ["valkey"]
sentry = ["sentry-sdk"]

[tool.flake8]
max-line-length = 120
Expand Down
81 changes: 81 additions & 0 deletions scheduler/_config_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, Optional, List, Tuple, Any, Self, Type

from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty


@dataclass
class QueueConfiguration:
__CONNECTION_FIELDS__ = {
"URL",
"DB",
"UNIX_SOCKET_PATH",
"HOST",
"PORT",
"PASSWORD",
"SENTINELS",
"MASTER_NAME",
"SOCKET_TIMEOUT",
"SSL",
"CONNECTION_KWARGS",
}
DB: Optional[int] = None
CLIENT_KWARGS: Optional[Dict[str, Any]] = None

# Redis connection parameters, either UNIX_SOCKET_PATH/URL/separate params (HOST, PORT, PASSWORD) should be provided
UNIX_SOCKET_PATH: Optional[str] = None

URL: Optional[str] = None

HOST: Optional[str] = None
PORT: Optional[int] = None
USERNAME: Optional[str] = None
PASSWORD: Optional[str] = None

SSL: Optional[bool] = False
SSL_CERT_REQS: Optional[str] = "required"

DEFAULT_TIMEOUT: Optional[int] = None
ASYNC: Optional[bool] = True

SENTINELS: Optional[List[Tuple[str, int]]] = None
SENTINEL_KWARGS: Optional[Dict[str, str]] = None
SOCKET_TIMEOUT: Optional[int] = None
MASTER_NAME: Optional[str] = None
CONNECTION_KWARGS: Optional[Dict[str, Any]] = None

def same_connection_params(self, other: Self) -> bool:
for field in self.__CONNECTION_FIELDS__:
if getattr(self, field) != getattr(other, field):
return False
return True


class Broker(Enum):
REDIS = "redis"
FAKEREDIS = "fakeredis"
VALKEY = "valkey"


def _token_validation(token: str) -> bool:
return False


@dataclass
class SchedulerConfig:
EXECUTIONS_IN_PAGE: int = 20
SCHEDULER_INTERVAL: int = 10
BROKER: Broker = Broker.REDIS
TOKEN_VALIDATION_METHOD: Callable[[str], bool] = _token_validation
CALLBACK_TIMEOUT = 60 # Callback timeout in seconds (success/failure)
# Default values, can be override per task
DEFAULT_RESULT_TTL: int = 500 # Time To Live (TTL) in seconds to keep job results
DEFAULT_FAILURE_TTL: int = 31536000 # Time To Live (TTL) in seconds to keep job failure information
DEFAULT_JOB_TIMEOUT: int = 300 # timeout (seconds) for a job)
# General configuration values
DEFAULT_WORKER_TTL = 420 # Time To Live (TTL) in seconds to keep worker information after last heartbeat
DEFAULT_MAINTENANCE_TASK_INTERVAL = 10 * 60 # The interval to run maintenance tasks in seconds. 10 minutes.
DEFAULT_JOB_MONITORING_INTERVAL = 30 # The interval to monitor jobs in seconds.
SCHEDULER_FALLBACK_PERIOD_SECS: int = 120 # Period (secs) to wait before requiring to reacquire locks
DEATH_PENALTY_CLASS : Type[BaseDeathPenalty] = UnixSignalDeathPenalty
25 changes: 18 additions & 7 deletions scheduler/admin/task_admin.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import List

from django.contrib import admin, messages
from django.contrib.contenttypes.admin import GenericStackedInline
from django.utils.translation import gettext_lazy as _

from scheduler import tools
from scheduler.helpers import tools
from scheduler.broker_types import ConnectionErrorTypes
from scheduler.models.args import TaskArg, TaskKwarg
from scheduler.models.task import Task
from scheduler.helpers.queues import get_queue
from scheduler.redis_models import JobModel
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions_for_task, TaskType


def get_job_executions_for_task(queue_name, scheduled_task) -> List[JobModel]:
queue = get_queue(queue_name)
job_list: List[JobModel] = JobModel.get_many(queue.queued_job_registry.all(), connection=queue.connection)
res = list(filter(lambda j: j.is_execution_of(scheduled_task), job_list))
return res


class JobArgInline(GenericStackedInline):
Expand Down Expand Up @@ -105,11 +115,11 @@ class Media:

@admin.display(description="Schedule")
def task_schedule(self, o: Task) -> str:
if o.task_type == TaskType.ONCE.value:
if o.task_type == tools.TaskType.ONCE.value:
return f"Run once: {o.scheduled_time:%Y-%m-%d %H:%M:%S}"
elif o.task_type == TaskType.CRON.value:
elif o.task_type == tools.TaskType.CRON.value:
return f"Cron: {o.cron_string}"
elif o.task_type == TaskType.REPEATABLE.value:
elif o.task_type == tools.TaskType.REPEATABLE.value:
if o.interval is None or o.interval_unit is None:
return ""
return "Repeatable: {} {}".format(o.interval, o.get_interval_unit_display())
Expand Down Expand Up @@ -160,8 +170,9 @@ def disable_selected(self, request, queryset):
rows_updated += 1

level = messages.WARNING if not rows_updated else messages.INFO
self.message_user(request, f"{get_message_bit(rows_updated)} successfully disabled and unscheduled.",
level=level)
self.message_user(
request, f"{get_message_bit(rows_updated)} successfully disabled and unscheduled.", level=level
)

@admin.action(description=_("Enable selected %(verbose_name_plural)s"), permissions=("change",))
def enable_selected(self, request, queryset):
Expand Down
15 changes: 13 additions & 2 deletions scheduler/broker_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This is a helper module to obfuscate types used by different broker implementations.
from collections import namedtuple
from typing import Union, Dict, Tuple, Type
from typing import Any, Callable, TypeVar, Union
from typing import Dict, Tuple

import redis

Expand All @@ -11,13 +12,16 @@
valkey.Valkey = redis.Redis
valkey.StrictValkey = redis.StrictRedis

from scheduler.settings import Broker
from scheduler._config_types import Broker

ConnectionErrorTypes = (redis.ConnectionError, valkey.ConnectionError)
ResponseErrorTypes = (redis.ResponseError, valkey.ResponseError)
TimeoutErrorType = Union[redis.TimeoutError, valkey.TimeoutError]
WatchErrorType = Union[redis.WatchError, valkey.WatchError]
ConnectionType = Union[redis.Redis, valkey.Valkey]
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
SentinelType = Union[redis.sentinel.Sentinel, valkey.sentinel.Sentinel]
FunctionReferenceType = TypeVar("FunctionReferenceType", str, Callable[..., Any])

BrokerMetaDataType = namedtuple("BrokerMetaDataType", ["connection_type", "sentinel_type", "ssl_prefix"])

Expand All @@ -28,3 +32,10 @@
(Broker.REDIS, True): BrokerMetaDataType(redis.StrictRedis, redis.sentinel.Sentinel, "rediss"),
(Broker.VALKEY, True): BrokerMetaDataType(valkey.StrictValkey, valkey.sentinel.Sentinel, "valkeys"),
}

MODEL_NAMES = ["Task", ]
TASK_TYPES = ["OnceTaskType", "RepeatableTaskType", "CronTaskType"]


def is_pipeline(conn: ConnectionType) -> bool:
return isinstance(conn, redis.client.Pipeline) or isinstance(conn, valkey.client.Pipeline)
135 changes: 100 additions & 35 deletions scheduler/decorators.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,108 @@
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Union

from scheduler import settings
from .queues import get_queue, QueueNotFoundError
from .rq_classes import rq_job_decorator
from scheduler.helpers.queues import Queue, get_queue
from .broker_types import ConnectionType
from .redis_models import Callback

JOB_METHODS_LIST = list()


def job(*args, **kwargs):
"""
The same as rq package's job decorator, but it automatically works out
the ``connection`` argument from SCHEDULER_QUEUES.
class job:
queue_class = Queue

def __init__(
self,
queue: Union["Queue", str, None] = None,
connection: Optional[ConnectionType] = None,
timeout: Optional[int] = settings.SCHEDULER_CONFIG.DEFAULT_JOB_TIMEOUT,
result_ttl: int = settings.SCHEDULER_CONFIG.DEFAULT_RESULT_TTL,
ttl: Optional[int] = None,
at_front: bool = False,
meta: Optional[Dict[Any, Any]] = None,
description: Optional[str] = None,
retries_left: Optional[int] = None,
retry_intervals: Union[int, List[int], None] = None,
on_failure: Optional[Union[Callback, Callable[..., Any]]] = None,
on_success: Optional[Union[Callback, Callable[..., Any]]] = None,
on_stopped: Optional[Union[Callback, Callable[..., Any]]] = None,
):
"""A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when
called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example::

And also, it allows simplified ``@job`` syntax to put a job into the default queue.

"""
if len(args) == 0:
func = None
queue = "default"
else:
if callable(args[0]):
func = args[0]
>>> @job(queue='default')
>>> def simple_add(x, y):
>>> return x + y
>>> ...
>>> # Puts `simple_add` function into queue
>>> simple_add.delay(1, 2)

:param queue: The queue to use, can be the Queue class itself, or the queue name (str)
:type queue: Union['Queue', str]
:param connection: Broker Connection
:param timeout: Job timeout
:param result_ttl: Result time to live
:param ttl: Time to live for job execution
:param at_front: Whether to enqueue the job at front of the queue
:param meta: Arbitraty metadata about the job
:param description: Job description
:param retries_left: Number of retries left
:param retry_intervals: Retry intervals
:param on_failure: Callable to run on failure
:param on_success: Callable to run on success
:param on_stopped: Callable to run when stopped
"""
if queue is None:
queue = "default"
else:
func = None
queue = args[0]
args = args[1:]

if isinstance(queue, str):
try:
queue = get_queue(queue)
if "connection" not in kwargs:
kwargs["connection"] = queue.connection
except KeyError:
raise QueueNotFoundError(f"Queue {queue} does not exist")

kwargs.setdefault("result_ttl", settings.SCHEDULER_CONFIG.DEFAULT_RESULT_TTL)
kwargs.setdefault("timeout", settings.SCHEDULER_CONFIG.DEFAULT_TIMEOUT)

decorator = rq_job_decorator(queue, *args, **kwargs)
if func:
JOB_METHODS_LIST.append(f"{func.__module__}.{func.__name__}")
return decorator(func)
return decorator
self.queue = get_queue(queue) if isinstance(queue, str) else queue
self.connection = connection
self.timeout = timeout
self.result_ttl = result_ttl
self.ttl = ttl
self.meta = meta
self.at_front = at_front
self.description = description
self.retries_left = retries_left
self.retry_intervals = retry_intervals
self.on_success = on_success
self.on_failure = on_failure
self.on_stopped = on_stopped

def __call__(self, f):
@wraps(f)
def delay(*args, **kwargs):
if isinstance(self.queue, str):
queue = Queue(name=self.queue, connection=self.connection)
else:
queue = self.queue

job_id = kwargs.pop("job_id", None)
at_front = kwargs.pop("at_front", False)

if not at_front:
at_front = self.at_front

return queue.enqueue_call(
f,
args=args,
kwargs=kwargs,
timeout=self.timeout,
result_ttl=self.result_ttl,
ttl=self.ttl,
name=job_id,
at_front=at_front,
meta=self.meta,
description=self.description,
retries_left=self.retries_left,
retry_intervals=self.retry_intervals,
on_failure=self.on_failure,
on_success=self.on_success,
on_stopped=self.on_stopped,
)

JOB_METHODS_LIST.append(f"{f.__module__}.{f.__name__}")
f.delay = delay
return f
Empty file added scheduler/helpers/__init__.py
Empty file.
Loading
Loading