Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ dependencies = [
"django>=5",
"croniter>=2.0",
"click~=8.2",
"fakeredis",
]

[project.optional-dependencies]
yaml = ["pyyaml~=6.0"]
yaml = ["pyyaml~=6.0", "types-PyYAML>=6.0.12.20250516"]
valkey = ["valkey>=6.0.2,<7"]
sentry = ["sentry-sdk~=2.19"]

Expand All @@ -61,6 +62,8 @@ dev = [
"coverage~=7.6",
"fakeredis~=2.28",
"pyyaml>=6,<7",
"mypy>=1.16.0",
"types-croniter>=6.0.0.20250411",
]

[tool.hatch.build.targets.sdist]
Expand All @@ -84,3 +87,19 @@ quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"


[tool.mypy]
packages = ['scheduler', ]
exclude = ["scheduler/tests/.*\\.py",
"scheduler/migrations/.*\\.py",
"testproject/.*\\.py",
"testproject/tests/.*\\.py"]
strict = true
follow_imports = "silent"
ignore_missing_imports = true
scripts_are_modules = true
check_untyped_defs = true

[tool.uv.sources]
fakeredis = { path = "../fakeredis" }
15 changes: 9 additions & 6 deletions scheduler/admin/ephemeral_models.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
from typing import Any

from django.contrib import admin
from django.http import HttpResponse, HttpRequest

from scheduler import views
from scheduler.models.ephemeral_models import Queue, Worker


class ImmutableAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
def has_add_permission(self, request: HttpRequest) -> bool:
return False # Hide the admin "+ Add" link for Queues

def has_change_permission(self, request, obj=None):
def has_change_permission(self, request: HttpRequest, obj: Any = None) -> bool:
return True

def has_module_permission(self, request):
def has_module_permission(self, request: HttpRequest) -> bool:
"""Returns True if the given request has any permission in the given app label.

Can be overridden by the user in subclasses. In such case, it should return True if the given request has
permission to view the module on the admin index page and access the module's index page. Overriding it does
not restrict access to the add, change or delete views. Use `ModelAdmin.has_(add|change|delete)_permission` for
that.
"""
return request.user.has_module_perms("django-tasks-scheduler")
return request.user.has_module_perms("django-tasks-scheduler") # type: ignore


@admin.register(Queue)
class QueueAdmin(ImmutableAdmin):
"""Admin View for queues"""

def changelist_view(self, request, extra_context=None):
def changelist_view(self, request: HttpRequest, extra_context: Any = None) -> HttpResponse:
"""The 'change list' admin view for this model."""
return views.stats(request)

Expand All @@ -35,6 +38,6 @@ def changelist_view(self, request, extra_context=None):
class WorkerAdmin(ImmutableAdmin):
"""Admin View for workers"""

def changelist_view(self, request, extra_context=None):
def changelist_view(self, request: HttpRequest, extra_context: Any = None) -> HttpResponse:
"""The 'change list' admin view for this model."""
return views.workers_list(request)
2 changes: 1 addition & 1 deletion scheduler/helpers/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, func: Union[str, Callable[..., Any]], timeout: Optional[int]
def name(self) -> str:
return f"{self.func.__module__}.{self.func.__qualname__}"

def __call__(self, *args, **kwargs):
def __call__(self, *args: Any, **kwargs: Any) -> Any:
from scheduler.settings import SCHEDULER_CONFIG

with SCHEDULER_CONFIG.DEATH_PENALTY_CLASS(self.timeout, JobTimeoutException):
Expand Down
11 changes: 5 additions & 6 deletions scheduler/helpers/queues/getters.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from typing import Set

from scheduler.redis_models.worker import WorkerModel
from scheduler.settings import SCHEDULER_CONFIG, get_queue_names, get_queue_configuration, QueueConfiguration, logger
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker
from scheduler.settings import SCHEDULER_CONFIG, get_queue_names, get_queue_configuration, logger
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker, ConnectionType, QueueConfiguration
from .queue_logic import Queue


_BAD_QUEUE_CONFIGURATION = set()


def _get_connection(config: QueueConfiguration, use_strict_broker=False):
def _get_connection(config: QueueConfiguration, use_strict_broker: bool = False) -> ConnectionType:
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
import fakeredis
Expand All @@ -32,7 +31,7 @@ def _get_connection(config: QueueConfiguration, use_strict_broker=False):
sentinel_kwargs = config.SENTINEL_KWARGS or {}
SentinelClass = BrokerMetaData[(SCHEDULER_CONFIG.BROKER, use_strict_broker)].sentinel_type
sentinel = SentinelClass(config.SENTINELS, sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
return sentinel.master_for(
return sentinel.master_for( # type: ignore
service_name=config.MASTER_NAME,
redis_class=broker_cls,
)
Expand All @@ -47,7 +46,7 @@ def _get_connection(config: QueueConfiguration, use_strict_broker=False):
)


def get_queue(name="default") -> Queue:
def get_queue(name: str = "default") -> Queue:
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
queue_settings = get_queue_configuration(name)
is_async = queue_settings.ASYNC
Expand Down
51 changes: 25 additions & 26 deletions scheduler/helpers/queues/queue_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from scheduler.redis_models import JobStatus, SchedulerLock, Result, ResultType, JobModel
from scheduler.settings import logger, SCHEDULER_CONFIG
from scheduler.types import ConnectionType, FunctionReferenceType, Self
from scheduler.types import ConnectionType, FunctionReferenceType, Self, PipelineType


class InvalidJobOperation(Exception):
Expand All @@ -30,6 +30,10 @@ class NoSuchJobError(Exception):
pass


class NoSuchRegistryError(Exception):
pass


def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
"""The main execution method. Invokes the job function with the job arguments.

Expand All @@ -45,17 +49,17 @@ def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
coro_result = loop.run_until_complete(result)
result = coro_result
if job_model.success_callback:
job_model.success_callback(job_model, connection, result) # type: ignore
job_model.success_callback(job_model, connection, result)
return result
except:
if job_model.failure_callback:
job_model.failure_callback(job_model, connection, *sys.exc_info()) # type: ignore
job_model.failure_callback(job_model, connection, *sys.exc_info())
raise
finally:
assert job_model is _job_stack.pop()


_job_stack = []
_job_stack: List[JobModel] = []


class Queue:
Expand All @@ -68,14 +72,14 @@ class Queue:
queued="queued_job_registry",
)

def __init__(self, connection: Optional[ConnectionType], name: str, is_async: bool = True) -> None:
def __init__(self, connection: ConnectionType, name: str, is_async: bool = True) -> None:
"""Initializes a Queue object.

:param name: The queue name
:param connection: Broker connection
:param is_async: Whether jobs should run "async" (using the worker).
"""
self.connection = connection
self.connection: ConnectionType = connection
self.name = name
self._is_async = is_async
self.queued_job_registry = QueuedJobRegistry(connection=self.connection, name=self.name)
Expand All @@ -85,11 +89,11 @@ def __init__(self, connection: Optional[ConnectionType], name: str, is_async: bo
self.scheduled_job_registry = ScheduledJobRegistry(connection=self.connection, name=self.name)
self.canceled_job_registry = CanceledJobRegistry(connection=self.connection, name=self.name)

def __len__(self):
def __len__(self) -> int:
return self.count

@property
def scheduler_pid(self) -> int:
def scheduler_pid(self) -> Optional[int]:
lock = SchedulerLock(self.name)
pid = lock.value(self.connection)
return int(pid.decode()) if pid is not None else None
Expand Down Expand Up @@ -155,11 +159,11 @@ def count(self) -> int:
res += getattr(self, registry).count(connection=self.connection)
return res

def get_registry(self, name: str) -> Union[None, JobNamesRegistry]:
def get_registry(self, name: str) -> JobNamesRegistry:
name = name.lower()
if name in Queue.REGISTRIES:
return getattr(self, Queue.REGISTRIES[name])
return None
return getattr(self, Queue.REGISTRIES[name]) # type: ignore
raise NoSuchRegistryError(f"Unknown registry name {name}")

def get_all_job_names(self) -> List[str]:
res = list()
Expand All @@ -178,22 +182,21 @@ def get_all_jobs(self) -> List[JobModel]:
def create_and_enqueue_job(
self,
func: FunctionReferenceType,
args: Union[Tuple, List, None] = None,
kwargs: Optional[Dict] = None,
args: Union[Tuple[Any, ...], List[Any], None] = None,
kwargs: Optional[Dict[str, Any]] = None,
when: Optional[datetime] = None,
timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
job_info_ttl: Optional[int] = None,
description: Optional[str] = None,
name: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
meta: Optional[Dict[str, Any]] = None,
on_success: Optional[Callback] = None,
on_failure: Optional[Callback] = None,
on_stopped: Optional[Callback] = None,
task_type: Optional[str] = None,
scheduled_task_id: Optional[int] = None,
pipeline: Optional[ConnectionType] = None,
) -> JobModel:
"""Creates a job to represent the delayed function call and enqueues it.
:param when: When to schedule the job (None to enqueue immediately)
Expand All @@ -212,7 +215,6 @@ def create_and_enqueue_job(
:param on_stopped: Callback for on stopped
:param task_type: The task type
:param scheduled_task_id: The scheduled task id
:param pipeline: The Broker Pipeline
:returns: The enqueued Job
"""
status = JobStatus.QUEUED if when is None else JobStatus.SCHEDULED
Expand All @@ -236,7 +238,7 @@ def create_and_enqueue_job(
scheduled_task_id=scheduled_task_id,
)
if when is None:
job_model = self.enqueue_job(job_model, connection=pipeline, at_front=at_front)
job_model = self.enqueue_job(job_model, at_front=at_front)
elif isinstance(when, datetime):
job_model.save(connection=self.connection)
self.scheduled_job_registry.schedule(self.connection, job_model.name, when)
Expand All @@ -246,7 +248,7 @@ def create_and_enqueue_job(

def job_handle_success(
self, job: JobModel, result: Any, job_info_ttl: int, result_ttl: int, connection: ConnectionType
):
) -> None:
"""Saves and cleanup job after successful execution"""
job.after_execution(
job_info_ttl,
Expand All @@ -264,7 +266,7 @@ def job_handle_success(
ttl=result_ttl,
)

def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType):
def job_handle_failure(self, status: JobStatus, job: JobModel, exc_string: str, connection: ConnectionType) -> None:
# Does not set job status since the job might be stopped
job.after_execution(
SCHEDULER_CONFIG.DEFAULT_FAILURE_TTL,
Expand Down Expand Up @@ -304,10 +306,7 @@ def run_sync(self, job: JobModel) -> JobModel:

@classmethod
def dequeue_any(
cls,
queues: List[Self],
timeout: Optional[int],
connection: Optional[ConnectionType] = None,
cls, queues: List[Self], timeout: Optional[int], connection: ConnectionType
) -> Tuple[Optional[JobModel], Optional[Self]]:
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
is important.
Expand Down Expand Up @@ -410,19 +409,19 @@ def delete_job(self, job_name: str, expire_job_model: bool = True) -> None:
pass

def enqueue_job(
self, job_model: JobModel, connection: Optional[ConnectionType] = None, at_front: bool = False
self, job_model: JobModel, pipeline: Optional[PipelineType] = None, at_front: bool = False
) -> JobModel:
"""Enqueues a job for delayed execution without checking dependencies.

If Queue is instantiated with is_async=False, job is executed immediately.
:param job_model: The job redis model
:param connection: The Redis Pipeline
:param pipeline: The Broker Pipeline
:param at_front: Whether to enqueue the job at the front

:returns: The enqueued JobModel
"""

pipe = connection if connection is not None else self.connection.pipeline()
pipe: PipelineType = pipeline if pipeline is not None else self.connection.pipeline()
job_model.started_at = None
job_model.ended_at = None
job_model.status = JobStatus.QUEUED
Expand Down
Loading
Loading