Skip to content

Commit 49c0cda

Browse files
committed
wip
1 parent 02c7871 commit 49c0cda

File tree

6 files changed

+36
-44
lines changed

6 files changed

+36
-44
lines changed

scheduler/management/commands/export.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from django.core.management.base import BaseCommand
66

77
from scheduler.models import Task
8-
from scheduler.rq_classes import MODEL_NAMES
9-
from scheduler.tools import OLD_MODEL_NAMES
8+
from scheduler.tools import MODEL_NAMES
109

1110

1211
class Command(BaseCommand):
@@ -45,12 +44,7 @@ def add_arguments(self, parser):
4544
def handle(self, *args, **options):
4645
file = open(options.get("filename"), "w") if options.get("filename") else sys.stdout
4746
res = list()
48-
jobs = Task.objects.all()
49-
if options.get("enabled"):
50-
jobs = jobs.filter(enabled=True)
51-
for job in jobs:
52-
res.append(job.to_dict())
53-
for model_name in OLD_MODEL_NAMES:
47+
for model_name in MODEL_NAMES:
5448
model = apps.get_model(app_label="scheduler", model_name=model_name)
5549
jobs = model.objects.all()
5650
if options.get("enabled"):

scheduler/management/commands/import.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from scheduler.models import TaskArg, TaskKwarg, Task
1212
from scheduler.models.task import TaskType
13-
from scheduler.tools import OLD_MODEL_NAMES
13+
from scheduler.tools import MODEL_NAMES
1414

1515

1616
def job_model_str(model_str: str) -> str:
@@ -21,6 +21,8 @@ def job_model_str(model_str: str) -> str:
2121

2222
def get_task_type(model_str: str) -> TaskType:
2323
model_str = job_model_str(model_str)
24+
if TaskType(model_str):
25+
return TaskType(model_str)
2426
if model_str == "CronTask":
2527
return TaskType.CRON
2628
elif model_str == "RepeatableTask":
@@ -133,7 +135,7 @@ def handle(self, *args, **options):
133135
jobs = yaml.load(file, yaml.SafeLoader)
134136

135137
if options.get("reset"):
136-
for model_name in OLD_MODEL_NAMES:
138+
for model_name in MODEL_NAMES:
137139
model = apps.get_model(app_label="scheduler", model_name=model_name)
138140
model.objects.all().delete()
139141

scheduler/models/task.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ def failure_callback(job, connection, result, *args, **kwargs):
4848

4949

5050
def success_callback(job, connection, result, *args, **kwargs):
51-
model_name = job.meta.get("task_type", None)
52-
if model_name is None:
51+
task_type = job.meta.get("task_type", None)
52+
if task_type is None:
5353
return
5454

5555
task = Task.objects.filter(job_id=job.id).first()
5656
if task is None:
57-
model = apps.get_model(app_label="scheduler", model_name=model_name)
57+
model = apps.get_model(app_label="scheduler", model_name=task_type)
5858
task = model.objects.filter(job_id=job.id).first()
5959
if task is None:
6060
logger.warn(f"Could not find task for job {job.id}")
@@ -267,7 +267,7 @@ def ready_for_schedule(self) -> bool:
267267
if not self.enabled:
268268
logger.debug(f"Task {str(self)} disabled, enable task before scheduling")
269269
return False
270-
if self.task_type == TaskType.REPEATABLE and self._schedule_time() < timezone.now():
270+
if self.task_type in {TaskType.REPEATABLE, TaskType.ONCE} and self._schedule_time() < timezone.now():
271271
return False
272272
return True
273273

scheduler/rq_classes.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
from scheduler import settings
2525
from scheduler.broker_types import PipelineType, ConnectionType
2626

27-
OLD_MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]
28-
MODEL_NAMES = ["OnceTask", "RepeatableTask", "CronTask"]
27+
MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask", "Task"]
28+
TASK_TYPES = ["OnceTaskType", "RepeatableTaskType", "CronTaskType"]
2929

3030
rq_job_decorator = job
3131
ExecutionStatus = JobStatus
@@ -264,7 +264,7 @@ def __init__(self, *args, **kwargs) -> None:
264264

265265
@staticmethod
266266
def reschedule_all_jobs():
267-
for model_name in OLD_MODEL_NAMES:
267+
for model_name in MODEL_NAMES:
268268
model = apps.get_model(app_label="scheduler", model_name=model_name)
269269
enabled_jobs = model.objects.filter(enabled=True)
270270
unscheduled_jobs = filter(lambda j: j.ready_for_schedule(), enabled_jobs)

scheduler/tests/test_mgmt_commands/test_export.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,18 @@
77
from django.core.management import call_command
88
from django.test import TestCase
99

10+
from scheduler.tests import test_settings # noqa
1011
from scheduler.tests.testtools import task_factory
1112
from scheduler.tools import TaskType
12-
from scheduler.tests import test_settings # noqa
1313

1414

1515
class ExportTest(TestCase):
1616
def setUp(self) -> None:
17+
super().setUp()
1718
self.tmpfile = tempfile.NamedTemporaryFile()
1819

1920
def tearDown(self) -> None:
21+
super().tearDown()
2022
os.remove(self.tmpfile.name)
2123

2224
def test_export__should_export_job(self):

scheduler/tools.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import importlib
22
import os
3-
from typing import List, Any, Callable, Optional, Union
3+
from typing import List, Any, Callable, Optional
44

55
import croniter
66
from django.apps import apps
@@ -10,14 +10,14 @@
1010
from django.utils.translation import gettext_lazy as _
1111

1212
from scheduler.queues import get_queues, logger, get_queue
13-
from scheduler.rq_classes import DjangoWorker, OLD_MODEL_NAMES, JobExecution, MODEL_NAMES
13+
from scheduler.rq_classes import DjangoWorker, JobExecution, TASK_TYPES, MODEL_NAMES
1414
from scheduler.settings import SCHEDULER_CONFIG, Broker
1515

1616

1717
class TaskType(models.TextChoices):
18-
CRON = "CronTask", _("Cron Task")
19-
REPEATABLE = "RepeatableTask", _("Repeatable Task")
20-
ONCE = "OnceTask", _("Run once")
18+
CRON = "CronTaskType", _("Cron Task")
19+
REPEATABLE = "RepeatableTaskType", _("Repeatable Task")
20+
ONCE = "OnceTaskType", _("Run once")
2121

2222

2323
def callable_func(callable_str: str) -> Callable:
@@ -39,31 +39,25 @@ def get_next_cron_time(cron_string: Optional[str]) -> Optional[timezone.datetime
3939
return next_itr
4040

4141

42-
def get_scheduled_task(task_model: str, task_id: int) -> "BaseTask": # noqa: F821
43-
if isinstance(task_model, str) and task_model not in OLD_MODEL_NAMES and task_model not in MODEL_NAMES:
44-
raise ValueError(f"Job Model `{task_model}` does not exist, choices are {OLD_MODEL_NAMES}")
45-
42+
def get_scheduled_task(task_type_str: str, task_id: int) -> "BaseTask": # noqa: F821
4643
# Try with new model names
4744
model = apps.get_model(app_label="scheduler", model_name="Task")
48-
if task_model == "OnceTask":
49-
task = model.objects.filter(task_type=TaskType.ONCE, id=task_id).first()
45+
if task_type_str in TASK_TYPES:
46+
try:
47+
task_type = TaskType(task_type_str)
48+
task = model.objects.filter(task_type=TaskType.ONCE, id=task_id).first()
49+
if task is None:
50+
raise ValueError(f"Job {task_type}:{task_id} does not exit")
51+
return task
52+
except ValueError:
53+
raise ValueError(f"Invalid task type {task_type_str}")
54+
elif task_type_str in MODEL_NAMES:
55+
model = apps.get_model(app_label="scheduler", model_name=task_type_str)
56+
task = model.objects.filter(id=task_id).first()
5057
if task is None:
51-
raise ValueError(f"Job {task_model}:{task_id} does not exit")
58+
raise ValueError(f"Job {task_type_str}:{task_id} does not exit")
5259
return task
53-
elif task_model == "RepeatableTask":
54-
task = model.objects.filter(task_type=TaskType.REPEATABLE, id=task_id).first()
55-
if task is not None:
56-
return task
57-
elif task_model == "CronTask":
58-
task = model.objects.filter(task_type=TaskType.CRON, id=task_id).first()
59-
if task is not None:
60-
return task
61-
62-
model = apps.get_model(app_label="scheduler", model_name=task_model)
63-
task = model.objects.filter(id=task_id).first()
64-
if task is None:
65-
raise ValueError(f"Job {task_model}:{task_id} does not exit")
66-
return task
60+
raise ValueError(f"Job Model {task_type_str} does not exist, choices are {TASK_TYPES}")
6761

6862

6963
def run_task(task_model: str, task_id: int) -> Any:

0 commit comments

Comments
 (0)