Skip to content

Commit 02c7871

Browse files
committed
wip
1 parent be603fa commit 02c7871

File tree

7 files changed

+68
-35
lines changed

7 files changed

+68
-35
lines changed

scheduler/management/commands/export.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from django.apps import apps
55
from django.core.management.base import BaseCommand
66

7-
from scheduler.tools import MODEL_NAMES
7+
from scheduler.models import Task
8+
from scheduler.rq_classes import MODEL_NAMES
9+
from scheduler.tools import OLD_MODEL_NAMES
810

911

1012
class Command(BaseCommand):
@@ -43,7 +45,12 @@ def add_arguments(self, parser):
4345
def handle(self, *args, **options):
4446
file = open(options.get("filename"), "w") if options.get("filename") else sys.stdout
4547
res = list()
46-
for model_name in MODEL_NAMES:
48+
jobs = Task.objects.all()
49+
if options.get("enabled"):
50+
jobs = jobs.filter(enabled=True)
51+
for job in jobs:
52+
res.append(job.to_dict())
53+
for model_name in OLD_MODEL_NAMES:
4754
model = apps.get_model(app_label="scheduler", model_name=model_name)
4855
jobs = model.objects.all()
4956
if options.get("enabled"):

scheduler/management/commands/import.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from scheduler.models import TaskArg, TaskKwarg, Task
1212
from scheduler.models.task import TaskType
13-
from scheduler.tools import MODEL_NAMES
13+
from scheduler.tools import OLD_MODEL_NAMES
1414

1515

1616
def job_model_str(model_str: str) -> str:
@@ -133,7 +133,7 @@ def handle(self, *args, **options):
133133
jobs = yaml.load(file, yaml.SafeLoader)
134134

135135
if options.get("reset"):
136-
for model_name in MODEL_NAMES:
136+
for model_name in OLD_MODEL_NAMES:
137137
model = apps.get_model(app_label="scheduler", model_name=model_name)
138138
model.objects.all().delete()
139139

scheduler/models/task.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ def success_callback(job, connection, result, *args, **kwargs):
5151
model_name = job.meta.get("task_type", None)
5252
if model_name is None:
5353
return
54-
model = apps.get_model(app_label="scheduler", model_name=model_name)
55-
task = model.objects.filter(job_id=job.id).first()
54+
55+
task = Task.objects.filter(job_id=job.id).first()
5656
if task is None:
57+
model = apps.get_model(app_label="scheduler", model_name=model_name)
58+
task = model.objects.filter(job_id=job.id).first()
59+
if task is None:
60+
logger.warn(f"Could not find task for job {job.id}")
5761
return
5862
task.job_id = None
5963
task.successful_runs += 1

scheduler/rq_classes.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
from scheduler import settings
2525
from scheduler.broker_types import PipelineType, ConnectionType
2626

27-
MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask", "Task"]
27+
OLD_MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]
28+
MODEL_NAMES = ["OnceTask", "RepeatableTask", "CronTask"]
2829

2930
rq_job_decorator = job
3031
ExecutionStatus = JobStatus
@@ -63,7 +64,8 @@ def is_scheduled_task(self) -> bool:
6364

6465
def is_execution_of(self, task: "Task") -> bool: # noqa: F821
6566
return (
66-
self.meta.get("task_type", None) == task.task_type and self.meta.get("scheduled_task_id", None) == task.id
67+
self.meta.get("task_type", None) == task.task_type and self.meta.get("scheduled_task_id",
68+
None) == task.id
6769
)
6870

6971
def stop_execution(self, connection: ConnectionType):
@@ -92,11 +94,11 @@ def __str__(self):
9294
return f"{self.name}/{','.join(self.queue_names())}"
9395

9496
def _start_scheduler(
95-
self,
96-
burst: bool = False,
97-
logging_level: str = "INFO",
98-
date_format: str = "%H:%M:%S",
99-
log_format: str = "%(asctime)s %(message)s",
97+
self,
98+
burst: bool = False,
99+
logging_level: str = "INFO",
100+
date_format: str = "%H:%M:%S",
101+
log_format: str = "%(asctime)s %(message)s",
100102
) -> None:
101103
"""Starts the scheduler process.
102104
This is specifically designed to be run by the worker when running the `work()` method.
@@ -262,7 +264,7 @@ def __init__(self, *args, **kwargs) -> None:
262264

263265
@staticmethod
264266
def reschedule_all_jobs():
265-
for model_name in MODEL_NAMES:
267+
for model_name in OLD_MODEL_NAMES:
266268
model = apps.get_model(app_label="scheduler", model_name=model_name)
267269
enabled_jobs = model.objects.filter(enabled=True)
268270
unscheduled_jobs = filter(lambda j: j.ready_for_schedule(), enabled_jobs)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from datetime import timedelta
2+
3+
from django.utils import timezone
4+
5+
from scheduler import settings
6+
from scheduler.models.task import TaskType
7+
from scheduler.tests.test_task_types.test_task_model import BaseTestCases
8+
from scheduler.tests.testtools import task_factory
9+
10+
11+
class TestScheduledTask(BaseTestCases.TestSchedulableTask):
12+
task_type = TaskType.ONCE
13+
14+
def test_clean(self):
15+
job = task_factory(self.task_type)
16+
job.queue = list(settings.QUEUES)[0]
17+
job.callable = "scheduler.tests.jobs.test_job"
18+
self.assertIsNone(job.clean())
19+
20+
def test_unschedulable_old_job(self):
21+
job = task_factory(self.task_type, scheduled_time=timezone.now() - timedelta(hours=1))
22+
self.assertFalse(job.is_scheduled())

scheduler/tests/test_task_types/test_task_model.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -507,16 +507,3 @@ def test_result_ttl_passthrough(self):
507507
entry = _get_task_job_execution_from_registry(job)
508508
self.assertEqual(entry.result_ttl, 500)
509509

510-
511-
class TestScheduledTask(BaseTestCases.TestSchedulableTask):
512-
task_type = TaskType.ONCE
513-
514-
def test_clean(self):
515-
job = task_factory(self.task_type)
516-
job.queue = list(settings.QUEUES)[0]
517-
job.callable = "scheduler.tests.jobs.test_job"
518-
self.assertIsNone(job.clean())
519-
520-
def test_unschedulable_old_job(self):
521-
job = task_factory(self.task_type, scheduled_time=timezone.now() - timedelta(hours=1))
522-
self.assertFalse(job.is_scheduled())

scheduler/tools.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from django.utils.translation import gettext_lazy as _
1111

1212
from scheduler.queues import get_queues, logger, get_queue
13-
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES, JobExecution
13+
from scheduler.rq_classes import DjangoWorker, OLD_MODEL_NAMES, JobExecution, MODEL_NAMES
1414
from scheduler.settings import SCHEDULER_CONFIG, Broker
1515

1616

@@ -39,15 +39,26 @@ def get_next_cron_time(cron_string: Optional[str]) -> Optional[timezone.datetime
3939
return next_itr
4040

4141

42-
def get_scheduled_task(task_model: Union[TaskType, str], task_id: int) -> "BaseTask": # noqa: F821
43-
if isinstance(task_model, TaskType):
44-
model = apps.get_model(app_label="scheduler", model_name="Task")
45-
task = model.objects.filter(task_type=task_model, id=task_id).first()
42+
def get_scheduled_task(task_model: str, task_id: int) -> "BaseTask": # noqa: F821
43+
if isinstance(task_model, str) and task_model not in OLD_MODEL_NAMES and task_model not in MODEL_NAMES:
44+
raise ValueError(f"Job Model `{task_model}` does not exist, choices are {OLD_MODEL_NAMES}")
45+
46+
# Try with new model names
47+
model = apps.get_model(app_label="scheduler", model_name="Task")
48+
if task_model == "OnceTask":
49+
task = model.objects.filter(task_type=TaskType.ONCE, id=task_id).first()
4650
if task is None:
4751
raise ValueError(f"Job {task_model}:{task_id} does not exit")
4852
return task
49-
if isinstance(task_model, str) and task_model not in MODEL_NAMES:
50-
raise ValueError(f"Job Model `{task_model}` does not exist, choices are {MODEL_NAMES}")
53+
elif task_model == "RepeatableTask":
54+
task = model.objects.filter(task_type=TaskType.REPEATABLE, id=task_id).first()
55+
if task is not None:
56+
return task
57+
elif task_model == "CronTask":
58+
task = model.objects.filter(task_type=TaskType.CRON, id=task_id).first()
59+
if task is not None:
60+
return task
61+
5162
model = apps.get_model(app_label="scheduler", model_name=task_model)
5263
task = model.objects.filter(id=task_id).first()
5364
if task is None:
@@ -81,7 +92,7 @@ def create_worker(*queue_names, **kwargs) -> DjangoWorker:
8192
queues = get_queues(*queue_names)
8293
existing_workers = DjangoWorker.all(connection=queues[0].connection)
8394
existing_worker_names = set(map(lambda w: w.name, existing_workers))
84-
kwargs["fork_job_execution"] = SCHEDULER_CONFIG.BROKER != Broker.FAKEREDIS
95+
kwargs.setdefault("fork_job_execution", SCHEDULER_CONFIG.BROKER != Broker.FAKEREDIS)
8596
if kwargs.get("name", None) is None:
8697
kwargs["name"] = _calc_worker_name(existing_worker_names)
8798

0 commit comments

Comments
 (0)