Skip to content

Commit 87fc53b

Browse files
committed
wip
1 parent c823413 commit 87fc53b

File tree

10 files changed

+225
-86
lines changed

10 files changed

+225
-86
lines changed

pyproject.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ dependencies = [
4545
]
4646

4747
[project.optional-dependencies]
48-
yaml = ["pyyaml~=6.0"]
48+
yaml = ["pyyaml~=6.0", "types-PyYAML>=6.0.12.20250516"]
4949
valkey = ["valkey>=6.0.2,<7"]
5050
sentry = ["sentry-sdk~=2.19"]
5151

@@ -90,7 +90,10 @@ line-ending = "auto"
9090

9191
[tool.mypy]
9292
packages = ['scheduler', ]
93-
exclude = "scheduler/tests/.*\\.py"
93+
exclude = ["scheduler/tests/.*\\.py",
94+
"scheduler/migrations/.*\\.py",
95+
"testproject/.*\\.py",
96+
"testproject/tests/.*\\.py"]
9497
strict = true
9598
follow_imports = "silent"
9699
ignore_missing_imports = true

scheduler/helpers/sentry_integration.py

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
import weakref
2+
from typing import Any, Callable
23

34
import sentry_sdk
4-
from sentry_sdk.consts import OP
5+
from sentry_sdk._types import EventProcessor, Event, ExcInfo
56
from sentry_sdk.api import continue_trace
6-
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
7+
from sentry_sdk.consts import OP
8+
from sentry_sdk.integrations import _check_minimum_version, Integration
79
from sentry_sdk.integrations.logging import ignore_logger
810
from sentry_sdk.tracing import TransactionSource
911
from sentry_sdk.utils import (
1012
capture_internal_exceptions,
1113
ensure_integration_enabled,
1214
event_from_exception,
13-
format_timestamp,
1415
parse_version,
1516
)
1617

1718
import scheduler
1819
from scheduler.helpers.queues import Queue
20+
from scheduler.redis_models import JobModel
1921
from scheduler.redis_models import JobStatus
2022
from scheduler.timeouts import JobTimeoutException
2123
from scheduler.worker import Worker
@@ -26,36 +28,34 @@ class SentryIntegration(Integration):
2628
origin = f"auto.queue.{identifier}"
2729

2830
@staticmethod
29-
def setup_once():
30-
# type: () -> None
31+
def setup_once() -> None:
3132
version = parse_version(scheduler.__version__)
3233
_check_minimum_version(SentryIntegration, version)
3334

3435
old_perform_job = Worker.perform_job
3536

3637
@ensure_integration_enabled(SentryIntegration, old_perform_job)
37-
def sentry_patched_perform_job(self, job, *args, **kwargs):
38-
# type: (Any, Job, *Queue, **Any) -> bool
38+
def sentry_patched_perform_job(self: Any, job_model: JobModel, *args: Queue, **kwargs: Any) -> bool:
3939
with sentry_sdk.new_scope() as scope:
4040
scope.clear_breadcrumbs()
41-
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
41+
scope.add_event_processor(_make_event_processor(weakref.ref(job_model)))
4242

4343
transaction = continue_trace(
44-
job.meta.get("_sentry_trace_headers") or {},
44+
job_model.meta.get("_sentry_trace_headers") or {},
4545
op=OP.QUEUE_TASK_RQ,
4646
name="unknown RQ task",
4747
source=TransactionSource.TASK,
4848
origin=SentryIntegration.origin,
4949
)
5050

5151
with capture_internal_exceptions():
52-
transaction.name = job.func_name
52+
transaction.name = job_model.func_name
5353

5454
with sentry_sdk.start_transaction(
55-
transaction,
56-
custom_sampling_context={"rq_job": job},
55+
transaction,
56+
custom_sampling_context={"rq_job": job_model},
5757
):
58-
rv = old_perform_job(self, job, *args, **kwargs)
58+
rv = old_perform_job(self, job_model, *args, **kwargs)
5959

6060
if self.is_horse:
6161
# We're inside of a forked process and RQ is
@@ -65,30 +65,28 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
6565

6666
return rv
6767

68-
Worker.perform_job = sentry_patched_perform_job
68+
Worker.perform_job = sentry_patched_perform_job # type: ignore[method-assign]
6969

7070
old_handle_exception = Worker.handle_exception
7171

72-
def sentry_patched_handle_exception(self, job, *exc_info, **kwargs):
73-
# type: (Worker, Any, *Any, **Any) -> Any
72+
def sentry_patched_handle_exception(self: Worker, job: Any, *exc_info: Any, **kwargs: Any) -> Any:
7473
retry = (
75-
hasattr(job, "retries_left")
76-
and job.retries_left
77-
and job.retries_left > 0
74+
hasattr(job, "retries_left")
75+
and job.retries_left
76+
and job.retries_left > 0
7877
)
7978
failed = job._status == JobStatus.FAILED or job.is_failed
8079
if failed and not retry:
8180
_capture_exception(exc_info)
8281

8382
return old_handle_exception(self, job, *exc_info, **kwargs)
8483

85-
Worker.handle_exception = sentry_patched_handle_exception
84+
Worker.handle_exception = sentry_patched_handle_exception # type: ignore[method-assign]
8685

8786
old_enqueue_job = Queue.enqueue_job
8887

8988
@ensure_integration_enabled(SentryIntegration, old_enqueue_job)
90-
def sentry_patched_enqueue_job(self, job, **kwargs):
91-
# type: (Queue, Any, **Any) -> Any
89+
def sentry_patched_enqueue_job(self: Queue, job: Any, **kwargs: Any) -> Any:
9290
scope = sentry_sdk.get_current_scope()
9391
if scope.span is not None:
9492
job.meta["_sentry_trace_headers"] = dict(
@@ -97,46 +95,30 @@ def sentry_patched_enqueue_job(self, job, **kwargs):
9795

9896
return old_enqueue_job(self, job, **kwargs)
9997

100-
Queue.enqueue_job = sentry_patched_enqueue_job
98+
Queue.enqueue_job = sentry_patched_enqueue_job # type: ignore[method-assign]
10199

102100
ignore_logger("rq.worker")
103101

104102

105-
def _make_event_processor(weak_job):
106-
# type: (Callable[[], Job]) -> EventProcessor
107-
def event_processor(event, hint):
108-
# type: (Event, dict[str, Any]) -> Event
103+
def _make_event_processor(weak_job: Callable[[], JobModel]) -> EventProcessor:
104+
def event_processor(event: Event, hint: dict[str, Any]) -> Event:
109105
job = weak_job()
110106
if job is not None:
111107
with capture_internal_exceptions():
112108
extra = event.setdefault("extra", {})
113-
rq_job = {
114-
"job_id": job.id,
115-
"func": job.func_name,
116-
"args": job.args,
117-
"kwargs": job.kwargs,
118-
"description": job.description,
119-
}
120-
121-
if job.enqueued_at:
122-
rq_job["enqueued_at"] = format_timestamp(job.enqueued_at)
123-
if job.started_at:
124-
rq_job["started_at"] = format_timestamp(job.started_at)
125-
126-
extra["rq-job"] = rq_job
109+
extra["job"] = job.serialize()
127110

128111
if "exc_info" in hint:
129112
with capture_internal_exceptions():
130113
if issubclass(hint["exc_info"][0], JobTimeoutException):
131-
event["fingerprint"] = ["rq", "JobTimeoutException", job.func_name]
114+
event["fingerprint"] = ["django-tasks-scheduler", "JobTimeoutException", job.func_name]
132115

133116
return event
134117

135118
return event_processor
136119

137120

138-
def _capture_exception(exc_info, **kwargs):
139-
# type: (ExcInfo, **Any) -> None
121+
def _capture_exception(exc_info: ExcInfo, **kwargs: Any) -> None:
140122
client = sentry_sdk.get_client()
141123

142124
event, hint = event_from_exception(

scheduler/management/commands/delete_failed_executions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
class Command(BaseCommand):
99
help = "Delete failed jobs from Django queue."
1010

11-
def add_arguments(self, parser):
11+
def add_arguments(self, parser: CommandParser) -> None:
1212
parser.add_argument("--queue", "-q", dest="queue", default="default", help="Specify the queue [default]")
1313
parser.add_argument("-f", "--func", help='optional job function name, e.g. "app.tasks.func"')
1414
parser.add_argument("--dry-run", action="store_true", help="Do not actually delete failed jobs")

scheduler/management/commands/export.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import sys
2+
from typing import Any
23

34
import click
4-
from django.core.management.base import BaseCommand
5+
from django.core.management.base import BaseCommand, CommandParser
56

67
from scheduler.models import Task
78

@@ -11,7 +12,7 @@ class Command(BaseCommand):
1112

1213
help = __doc__
1314

14-
def add_arguments(self, parser):
15+
def add_arguments(self, parser: CommandParser) -> None:
1516
parser.add_argument(
1617
"-o",
1718
"--output",
@@ -37,7 +38,7 @@ def add_arguments(self, parser):
3738
help="File name to load (otherwise writes to standard output)",
3839
)
3940

40-
def handle(self, *args, **options):
41+
def handle(self, *args: Any, **options: Any) -> None:
4142
file = open(options.get("filename"), "w") if options.get("filename") else sys.stdout
4243
res = list()
4344

scheduler/management/commands/import.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import click
55
from django.conf import settings
66
from django.contrib.contenttypes.models import ContentType
7-
from django.core.management.base import BaseCommand
7+
from django.core.management.base import BaseCommand, CommandParser
88
from django.utils import timezone
99

1010
from scheduler.models import TaskArg, TaskKwarg, Task
@@ -24,11 +24,11 @@ def get_task_type(model_str: str) -> TaskType:
2424
except ValueError:
2525
pass
2626
if model_str == "CronTask":
27-
return TaskType.CRON
27+
return TaskType(TaskType.CRON)
2828
elif model_str == "RepeatableTask":
29-
return TaskType.REPEATABLE
29+
return TaskType(TaskType.REPEATABLE)
3030
elif model_str in {"ScheduledTask", "OnceTask"}:
31-
return TaskType.ONCE
31+
return TaskType(TaskType.ONCE)
3232
raise ValueError(f"Invalid model {model_str}")
3333

3434

@@ -52,12 +52,17 @@ def create_task_from_dict(task_dict: Dict[str, Any], update: bool) -> Optional[T
5252
if not settings.USE_TZ and not timezone.is_naive(target):
5353
target = timezone.make_naive(target)
5454
kwargs["scheduled_time"] = target
55-
model_fields = filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields())
56-
model_fields = set(map(lambda field: field.attname, model_fields))
55+
model_fields = set(map(
56+
lambda field: field.attname,
57+
filter(
58+
lambda field: hasattr(field, "attname"),
59+
Task._meta.get_fields()
60+
)
61+
))
5762
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
5863
for k in keys_to_ignore:
5964
del kwargs[k]
60-
task = Task.objects.create(**kwargs)
65+
task: Task = Task.objects.create(**kwargs)
6166
click.echo(f"Created task {task}")
6267
content_type = ContentType.objects.get_for_model(task)
6368

@@ -77,13 +82,11 @@ def create_task_from_dict(task_dict: Dict[str, Any], update: bool) -> Optional[T
7782

7883

7984
class Command(BaseCommand):
80-
"""
81-
Import scheduled jobs
82-
"""
85+
"""Import scheduled jobs"""
8386

8487
help = __doc__
8588

86-
def add_arguments(self, parser):
89+
def add_arguments(self, parser: CommandParser) -> None:
8790
parser.add_argument(
8891
"-f",
8992
"--format",
@@ -115,8 +118,8 @@ def add_arguments(self, parser):
115118
help="Update existing records",
116119
)
117120

118-
def handle(self, *args, **options):
119-
file = open(options.get("filename")) if options.get("filename") else sys.stdin
121+
def handle(self, *args: Any, **options: Any) -> None:
122+
file = open(options.get("filename")) if options.get("filename") else sys.stdin # type: ignore[arg-type]
120123
jobs = list()
121124
if options.get("format") == "json":
122125
import json
@@ -133,11 +136,11 @@ def handle(self, *args, **options):
133136
click.echo("Aborting. LibYAML is not installed.")
134137
exit(1)
135138
# Disable YAML alias
136-
yaml.Dumper.ignore_aliases = lambda *x: True
139+
yaml.Dumper.ignore_aliases = lambda *x: True # type: ignore[method-assign]
137140
jobs = yaml.load(file, yaml.SafeLoader)
138141

139142
if options.get("reset"):
140143
Task.objects.all().delete()
141144

142145
for job in jobs:
143-
create_task_from_dict(job, update=options.get("update"))
146+
create_task_from_dict(job, update=options.get("update")) # type: ignore[arg-type]

scheduler/management/commands/run_job.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from typing import Any
2+
13
import click
2-
from django.core.management.base import BaseCommand
4+
from django.core.management.base import BaseCommand, CommandParser
35

46
from scheduler.helpers.queues import get_queue
57

@@ -13,7 +15,7 @@ class Command(BaseCommand):
1315
help = __doc__
1416
args = "<function arg arg ...>"
1517

16-
def add_arguments(self, parser):
18+
def add_arguments(self, parser: CommandParser) -> None:
1719
parser.add_argument("--queue", "-q", dest="queue", default="default", help="Specify the queue [default]")
1820
parser.add_argument("--timeout", "-t", type=int, dest="timeout", help="A timeout in seconds")
1921
parser.add_argument(
@@ -25,7 +27,7 @@ def add_arguments(self, parser):
2527
)
2628
parser.add_argument("args", nargs="*", help="Args for callable")
2729

28-
def handle(self, **options):
30+
def handle(self, **options:Any)-> None:
2931
verbosity = int(options.get("verbosity", 1))
3032
timeout = options.get("timeout")
3133
result_ttl = options.get("result_ttl")

scheduler/management/commands/scheduler_stats.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Any, Dict, List, Optional
44

55
import click
6-
from django.core.management.base import BaseCommand
6+
from django.core.management.base import BaseCommand, CommandParser
77

88
from scheduler.views import get_statistics
99

@@ -24,7 +24,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
2424
self.table_width = 80
2525
self.interval = None
2626

27-
def add_arguments(self, parser: ArgumentParser) -> None:
27+
def add_arguments(self, parser: CommandParser) -> None:
2828
parser.add_argument(
2929
"-j",
3030
"--json",

scheduler/views/job_views.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ class JobDetailAction(str, Enum):
2121
CANCEL = "cancel"
2222

2323

24-
@never_cache
25-
@staff_member_required
24+
@never_cache # type: ignore
25+
@staff_member_required # type: ignore
2626
def job_detail(request: HttpRequest, job_name: str) -> HttpResponse:
2727
queue, job = _find_job(job_name)
2828
if job is None or queue is None:
@@ -50,8 +50,8 @@ def job_detail(request: HttpRequest, job_name: str) -> HttpResponse:
5050
return render(request, "admin/scheduler/job_detail.html", context_data)
5151

5252

53-
@never_cache
54-
@staff_member_required
53+
@never_cache # type: ignore
54+
@staff_member_required # type: ignore
5555
def job_action(request: HttpRequest, job_name: str, action: str) -> HttpResponse:
5656
queue, job = _find_job(job_name)
5757
if job is None or queue is None:
@@ -80,6 +80,9 @@ def job_action(request: HttpRequest, job_name: str, action: str) -> HttpResponse
8080
messages.info(request, f"You have successfully enqueued {job.name}")
8181
return redirect("job_details", job_name)
8282
elif action == JobDetailAction.CANCEL:
83+
if job.worker_name is None:
84+
messages.warning(request, "You cannot cancel a job that has no worker assigned")
85+
return redirect("job_details", job_name)
8386
send_command(
8487
connection=queue.connection, command=StopJobCommand(job_name=job.name, worker_name=job.worker_name)
8588
)

0 commit comments

Comments
 (0)