Skip to content

Commit cb9b344

Browse files
committed
wip
1 parent 87fc53b commit cb9b344

File tree

19 files changed

+132
-131
lines changed

19 files changed

+132
-131
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ dev = [
6363
"fakeredis~=2.28",
6464
"pyyaml>=6,<7",
6565
"mypy>=1.16.0",
66+
"types-croniter>=6.0.0.20250411",
6667
]
6768

6869
[tool.hatch.build.targets.sdist]

scheduler/helpers/queues/queue_logic.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class NoSuchJobError(Exception):
3030
pass
3131

3232

33+
class NoSuchRegistryError(Exception):
34+
pass
35+
36+
3337
def perform_job(job_model: JobModel, connection: ConnectionType) -> Any: # noqa
3438
"""The main execution method. Invokes the job function with the job arguments.
3539
@@ -155,11 +159,11 @@ def count(self) -> int:
155159
res += getattr(self, registry).count(connection=self.connection)
156160
return res
157161

158-
def get_registry(self, name: str) -> Union[None, JobNamesRegistry]:
162+
def get_registry(self, name: str) -> JobNamesRegistry:
159163
name = name.lower()
160164
if name in Queue.REGISTRIES:
161165
return getattr(self, Queue.REGISTRIES[name])
162-
return None
166+
raise NoSuchRegistryError(f"Unknown registry name {name}")
163167

164168
def get_all_job_names(self) -> List[str]:
165169
res = list()

scheduler/helpers/sentry_integration.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ def sentry_patched_perform_job(self: Any, job_model: JobModel, *args: Queue, **k
5252
transaction.name = job_model.func_name
5353

5454
with sentry_sdk.start_transaction(
55-
transaction,
56-
custom_sampling_context={"rq_job": job_model},
55+
transaction,
56+
custom_sampling_context={"rq_job": job_model},
5757
):
5858
rv = old_perform_job(self, job_model, *args, **kwargs)
5959

@@ -70,11 +70,7 @@ def sentry_patched_perform_job(self: Any, job_model: JobModel, *args: Queue, **k
7070
old_handle_exception = Worker.handle_exception
7171

7272
def sentry_patched_handle_exception(self: Worker, job: Any, *exc_info: Any, **kwargs: Any) -> Any:
73-
retry = (
74-
hasattr(job, "retries_left")
75-
and job.retries_left
76-
and job.retries_left > 0
77-
)
73+
retry = hasattr(job, "retries_left") and job.retries_left and job.retries_left > 0
7874
failed = job._status == JobStatus.FAILED or job.is_failed
7975
if failed and not retry:
8076
_capture_exception(exc_info)
@@ -89,9 +85,7 @@ def sentry_patched_handle_exception(self: Worker, job: Any, *exc_info: Any, **kw
8985
def sentry_patched_enqueue_job(self: Queue, job: Any, **kwargs: Any) -> Any:
9086
scope = sentry_sdk.get_current_scope()
9187
if scope.span is not None:
92-
job.meta["_sentry_trace_headers"] = dict(
93-
scope.iter_trace_propagation_headers()
94-
)
88+
job.meta["_sentry_trace_headers"] = dict(scope.iter_trace_propagation_headers())
9589

9690
return old_enqueue_job(self, job, **kwargs)
9791

scheduler/management/commands/delete_failed_executions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import click
2+
from django.core.management import CommandParser
23
from django.core.management.base import BaseCommand
34

45
from scheduler.helpers.queues import get_queue

scheduler/management/commands/import.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,9 @@ def create_task_from_dict(task_dict: Dict[str, Any], update: bool) -> Optional[T
5252
if not settings.USE_TZ and not timezone.is_naive(target):
5353
target = timezone.make_naive(target)
5454
kwargs["scheduled_time"] = target
55-
model_fields = set(map(
56-
lambda field: field.attname,
57-
filter(
58-
lambda field: hasattr(field, "attname"),
59-
Task._meta.get_fields()
60-
)
61-
))
55+
model_fields = set(
56+
map(lambda field: field.attname, filter(lambda field: hasattr(field, "attname"), Task._meta.get_fields()))
57+
)
6258
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
6359
for k in keys_to_ignore:
6460
del kwargs[k]
@@ -119,7 +115,7 @@ def add_arguments(self, parser: CommandParser) -> None:
119115
)
120116

121117
def handle(self, *args: Any, **options: Any) -> None:
122-
file = open(options.get("filename")) if options.get("filename") else sys.stdin # type: ignore[arg-type]
118+
file = open(options.get("filename")) if options.get("filename") else sys.stdin # type: ignore[arg-type]
123119
jobs = list()
124120
if options.get("format") == "json":
125121
import json
@@ -143,4 +139,4 @@ def handle(self, *args: Any, **options: Any) -> None:
143139
Task.objects.all().delete()
144140

145141
for job in jobs:
146-
create_task_from_dict(job, update=options.get("update")) # type: ignore[arg-type]
142+
create_task_from_dict(job, update=options.get("update")) # type: ignore[arg-type]

scheduler/management/commands/run_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def add_arguments(self, parser: CommandParser) -> None:
2727
)
2828
parser.add_argument("args", nargs="*", help="Args for callable")
2929

30-
def handle(self, **options:Any)-> None:
30+
def handle(self, **options: Any) -> None:
3131
verbosity = int(options.get("verbosity", 1))
3232
timeout = options.get("timeout")
3333
result_ttl = options.get("result_ttl")

scheduler/models/args.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Callable
2+
from typing import Callable, Any, Tuple, Dict, Type
33

44
from django.contrib.contenttypes.fields import GenericForeignKey
55
from django.contrib.contenttypes.models import ContentType
@@ -9,7 +9,7 @@
99

1010
from scheduler.helpers import utils
1111

12-
ARG_TYPE_TYPES_DICT = {
12+
ARG_TYPE_TYPES_DICT:Dict[str,Type] = {
1313
"str": str,
1414
"int": int,
1515
"bool": bool,
@@ -37,7 +37,7 @@ class ArgType(models.TextChoices):
3737
object_id = models.PositiveIntegerField()
3838
content_object = GenericForeignKey()
3939

40-
def clean(self):
40+
def clean(self) -> None:
4141
if self.arg_type not in ARG_TYPE_TYPES_DICT:
4242
raise ValidationError(
4343
{
@@ -61,15 +61,15 @@ def clean(self):
6161
{"arg_type": ValidationError(_(f"Could not parse {self.val} as {self.arg_type}"), code="invalid")}
6262
)
6363

64-
def save(self, **kwargs):
64+
def save(self, **kwargs: Any) -> None:
6565
super(BaseTaskArg, self).save(**kwargs)
6666
self.content_object.save()
6767

68-
def delete(self, **kwargs):
68+
def delete(self, **kwargs: Any) -> None:
6969
super(BaseTaskArg, self).delete(**kwargs)
7070
self.content_object.save()
7171

72-
def value(self):
72+
def value(self) -> Any:
7373
if self.arg_type == "callable":
7474
res = utils.callable_func(self.val)()
7575
elif self.arg_type == "datetime":
@@ -86,16 +86,16 @@ class Meta:
8686

8787

8888
class TaskArg(BaseTaskArg):
89-
def __str__(self):
89+
def __str__(self) -> str:
9090
return f"TaskArg[arg_type={self.arg_type},value={self.value()}]"
9191

9292

9393
class TaskKwarg(BaseTaskArg):
9494
key = models.CharField(max_length=255)
9595

96-
def __str__(self):
96+
def __str__(self) -> str:
9797
key, value = self.value()
9898
return f"TaskKwarg[key={key},arg_type={self.arg_type},value={self.val}]"
9999

100-
def value(self):
100+
def value(self) -> Tuple[str, Any]:
101101
return self.key, super(TaskKwarg, self).value()

scheduler/models/task.py

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import math
22
from datetime import timedelta, datetime
3-
from typing import Dict, Any, Optional
3+
from typing import Dict, Any, Optional, List, Tuple, Callable
44

55
import croniter
66
from django.conf import settings as django_settings
@@ -29,11 +29,11 @@
2929
def _get_task_for_job(job: JobModel) -> Optional["Task"]:
3030
if job.task_type is None or job.scheduled_task_id is None:
3131
return None
32-
task = Task.objects.filter(id=job.scheduled_task_id).first()
32+
task: Task = Task.objects.filter(id=job.scheduled_task_id).first()
3333
return task
3434

3535

36-
def failure_callback(job: JobModel, connection, result, *args, **kwargs):
36+
def failure_callback(job: JobModel, connection:ConnectionType, result: Any, *args: Any, **kwargs: Any) -> None:
3737
task = _get_task_for_job(job)
3838
if task is None:
3939
logger.warn(f"Could not find task for job {job.name}")
@@ -48,7 +48,7 @@ def failure_callback(job: JobModel, connection, result, *args, **kwargs):
4848
task.save(schedule_job=True, clean=False)
4949

5050

51-
def success_callback(job: JobModel, connection: ConnectionType, result: Any, *args, **kwargs):
51+
def success_callback(job: JobModel, connection: ConnectionType, result: Any, *args: Any, **kwargs: Any) -> None:
5252
task = _get_task_for_job(job)
5353
if task is None:
5454
logger.warn(f"Could not find task for job {job.name}")
@@ -59,7 +59,7 @@ def success_callback(job: JobModel, connection: ConnectionType, result: Any, *ar
5959
task.save(schedule_job=True, clean=False)
6060

6161

62-
def get_queue_choices():
62+
def get_queue_choices() -> List[Tuple[str, str]]:
6363
queue_names = get_queue_names()
6464
return [(queue, queue) for queue in queue_names]
6565

@@ -176,20 +176,20 @@ class TimeUnits(models.TextChoices):
176176
),
177177
)
178178

179-
def callable_func(self):
179+
def callable_func(self)->Callable:
180180
"""Translate callable string to callable"""
181181
return utils.callable_func(self.callable)
182182

183-
@admin.display(boolean=True, description=_("is scheduled?"))
183+
@admin.display(boolean=True, description=_("is scheduled?")) # type: ignore[misc]
184184
def is_scheduled(self) -> bool:
185185
"""Check whether a next job for this task is queued/scheduled to be executed"""
186186
if self.job_name is None: # no job_id => is not scheduled
187187
return False
188188
# check whether job_id is in scheduled/queued/active jobs
189189
res = (
190-
(self.job_name in self.rqueue.scheduled_job_registry.all())
191-
or (self.job_name in self.rqueue.queued_job_registry.all())
192-
or (self.job_name in self.rqueue.active_job_registry.all())
190+
(self.job_name in self.rqueue.scheduled_job_registry.all())
191+
or (self.job_name in self.rqueue.queued_job_registry.all())
192+
or (self.job_name in self.rqueue.active_job_registry.all())
193193
)
194194
# If the job_id is not scheduled/queued/started,
195195
# update the job_id to None. (The job_id belongs to a previous run which is completed)
@@ -198,29 +198,29 @@ def is_scheduled(self) -> bool:
198198
super(Task, self).save()
199199
return res
200200

201-
@admin.display(description="Callable")
201+
@admin.display(description="Callable") # type: ignore[misc]
202202
def function_string(self) -> str:
203203
args = self.parse_args()
204204
args_list = [repr(arg) for arg in args]
205205
kwargs = self.parse_kwargs()
206206
kwargs_list = [k + "=" + repr(v) for (k, v) in kwargs.items()]
207207
return self.callable + f"({', '.join(args_list + kwargs_list)})"
208208

209-
def parse_args(self):
209+
def parse_args(self) -> List[Any]:
210210
"""Parse args for running the job"""
211211
args = self.callable_args.all()
212212
return [arg.value() for arg in args]
213213

214-
def parse_kwargs(self):
214+
def parse_kwargs(self) -> Dict[str, Any]:
215215
"""Parse kwargs for running the job"""
216216
kwargs = self.callable_kwargs.all()
217217
return dict([kwarg.value() for kwarg in kwargs])
218218

219-
def _next_job_id(self):
219+
def _next_job_id(self) -> str:
220220
addition = timezone.now().strftime("%Y%m%d%H%M%S%f")
221221
return f"{self.queue}:{self.id}:{addition}"
222222

223-
def _enqueue_args(self) -> Dict:
223+
def _enqueue_args(self) -> Dict[str, Any]:
224224
"""Args for Queue.enqueue_call.
225225
Set all arguments for Queue.enqueue. Particularly:
226226
- set job timeout and ttl
@@ -282,7 +282,7 @@ def _schedule_time(self) -> datetime:
282282
self.repeat = (self.repeat - gap) if self.repeat is not None else None
283283
return utc(self.scheduled_time) if django_settings.USE_TZ else self.scheduled_time
284284

285-
def to_dict(self) -> Dict:
285+
def to_dict(self) -> Dict[str, Any]:
286286
"""Export model to dictionary, so it can be saved as external file backup"""
287287
interval_unit = str(self.interval_unit) if self.interval_unit else None
288288
res = dict(
@@ -321,16 +321,11 @@ def to_dict(self) -> Dict:
321321
)
322322
return res
323323

324-
def get_absolute_url(self):
324+
def get_absolute_url(self) -> str:
325325
model = self._meta.model.__name__.lower()
326-
return reverse(
327-
f"admin:scheduler_{model}_change",
328-
args=[
329-
self.id,
330-
],
331-
)
326+
return reverse(f"admin:scheduler_{model}_change", args=[self.id])
332327

333-
def __str__(self):
328+
def __str__(self) -> str:
334329
func = self.function_string()
335330
return f"{self.task_type}[{self.name}={func}]"
336331

@@ -359,7 +354,7 @@ def _schedule(self) -> bool:
359354
self.job_name = job.name
360355
return True
361356

362-
def save(self, **kwargs):
357+
def save(self, **kwargs: Any) -> None:
363358
should_clean = kwargs.pop("clean", True)
364359
if should_clean:
365360
self.clean()
@@ -372,25 +367,25 @@ def save(self, **kwargs):
372367
self._schedule()
373368
super(Task, self).save()
374369

375-
def delete(self, **kwargs):
370+
def delete(self, **kwargs: Any) -> None:
376371
self.unschedule()
377372
super(Task, self).delete(**kwargs)
378373

379-
def interval_seconds(self):
374+
def interval_seconds(self) -> float:
380375
kwargs = {
381376
self.interval_unit: self.interval,
382377
}
383378
return timedelta(**kwargs).total_seconds()
384379

385-
def clean_callable(self):
380+
def clean_callable(self) -> None:
386381
try:
387382
utils.callable_func(self.callable)
388383
except Exception:
389384
raise ValidationError(
390385
{"callable": ValidationError(_("Invalid callable, must be importable"), code="invalid")}
391386
)
392387

393-
def clean_queue(self):
388+
def clean_queue(self) -> None:
394389
queue_names = get_queue_names()
395390
if self.queue not in queue_names:
396391
raise ValidationError(
@@ -401,7 +396,7 @@ def clean_queue(self):
401396
}
402397
)
403398

404-
def clean_interval_unit(self):
399+
def clean_interval_unit(self) -> None:
405400
config = settings.SCHEDULER_CONFIG
406401
if config.SCHEDULER_INTERVAL > self.interval_seconds():
407402
raise ValidationError(
@@ -424,13 +419,13 @@ def clean_result_ttl(self) -> None:
424419
params={"interval": self.interval_seconds()},
425420
)
426421

427-
def clean_cron_string(self):
422+
def clean_cron_string(self) -> None:
428423
try:
429424
croniter.croniter(self.cron_string)
430425
except ValueError as e:
431426
raise ValidationError({"cron_string": ValidationError(_(str(e)), code="invalid")})
432427

433-
def clean(self):
428+
def clean(self) -> None:
434429
if self.task_type not in TaskType.values:
435430
raise ValidationError(
436431
{"task_type": ValidationError(_("Invalid task type"), code="invalid")},
@@ -470,7 +465,7 @@ def get_scheduled_task(task_type_str: str, task_id: int) -> Task:
470465
task = Task.objects.filter(task_type=task_type, id=task_id).first()
471466
if task is None:
472467
raise ValueError(f"Job {task_type}:{task_id} does not exit")
473-
return task
468+
return task # type: ignore[no-any-return]
474469
except ValueError:
475470
raise ValueError(f"Invalid task type {task_type_str}")
476471
raise ValueError(f"Job Model {task_type_str} does not exist, choices are {TASK_TYPES}")
@@ -484,5 +479,5 @@ def run_task(task_model: str, task_id: int) -> Any:
484479
logger.debug(f"Running task {str(scheduled_task)}")
485480
args = scheduled_task.parse_args()
486481
kwargs = scheduled_task.parse_kwargs()
487-
res = scheduled_task.callable_func()(*args, **kwargs)
482+
res = scheduled_task.callable_func()(*args, **kwargs) # type: ignore[no-untyped-call]
488483
return res

0 commit comments

Comments
 (0)