Skip to content

Commit 0a61af4

Browse files
committed
job => task
1 parent b369281 commit 0a61af4

File tree

6 files changed

+120
-16
lines changed

6 files changed

+120
-16
lines changed

scheduler/admin/old_task_models.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from scheduler import tools
1111
from scheduler.broker_types import ConnectionErrorTypes
12-
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask, BaseTask
12+
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask, BaseTask, migrate_util
1313
from scheduler.settings import SCHEDULER_CONFIG, logger
1414
from scheduler.tools import get_job_executions_for_task
1515

@@ -113,6 +113,11 @@ class JobKwargInline(HiddenMixin, GenericStackedInline):
113113
)
114114

115115

116+
def get_message_bit(rows_updated: int) -> str:
117+
message_bit = "1 task was" if rows_updated == 1 else f"{rows_updated} tasks were"
118+
return message_bit
119+
120+
116121
@admin.register(CronTask, ScheduledTask, RepeatableTask)
117122
class TaskAdmin(admin.ModelAdmin):
118123
"""TaskAdmin admin view for all task models.
@@ -230,6 +235,16 @@ def delete_model(self, request, obj):
230235
obj.unschedule()
231236
super(TaskAdmin, self).delete_model(request, obj)
232237

238+
@admin.action(description=_("Migrate to new Task model(s)"), permissions=("change",))
239+
def migrate(self, request, queryset):
240+
rows_updated = 0
241+
for obj in queryset.filter(enabled=True).iterator():
242+
migrate_util.migrate(obj)
243+
rows_updated += 1
244+
245+
level = messages.WARNING if not rows_updated else messages.INFO
246+
self.message_user(request, f"{get_message_bit(rows_updated)} successfully migrated to new model.", level=level)
247+
233248
@admin.action(description=_("Disable selected %(verbose_name_plural)s"), permissions=("change",))
234249
def disable_selected(self, request, queryset):
235250
rows_updated = 0
@@ -238,10 +253,9 @@ def disable_selected(self, request, queryset):
238253
obj.unschedule()
239254
rows_updated += 1
240255

241-
message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"
242-
243256
level = messages.WARNING if not rows_updated else messages.INFO
244-
self.message_user(request, f"{message_bit} successfully disabled and unscheduled.", level=level)
257+
self.message_user(request, f"{get_message_bit(rows_updated)} successfully disabled and unscheduled.",
258+
level=level)
245259

246260
@admin.action(description=_("Enable selected %(verbose_name_plural)s"), permissions=("change",))
247261
def enable_selected(self, request, queryset):
@@ -251,9 +265,8 @@ def enable_selected(self, request, queryset):
251265
obj.save()
252266
rows_updated += 1
253267

254-
message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"
255268
level = messages.WARNING if not rows_updated else messages.INFO
256-
self.message_user(request, f"{message_bit} successfully enabled and scheduled.", level=level)
269+
self.message_user(request, f"{get_message_bit(rows_updated)} successfully enabled and scheduled.", level=level)
257270

258271
@admin.action(description="Enqueue now", permissions=("change",))
259272
def enqueue_job_now(self, request, queryset):

scheduler/admin/task_admin.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ class JobKwargInline(GenericStackedInline):
2121
fieldsets = ((None, dict(fields=("key", ("arg_type", "val")))),)
2222

2323

24+
def get_message_bit(rows_updated: int) -> str:
25+
message_bit = "1 task was" if rows_updated == 1 else f"{rows_updated} tasks were"
26+
return message_bit
27+
28+
2429
@admin.register(Task)
2530
class TaskAdmin(admin.ModelAdmin):
2631
"""TaskAdmin admin view for all task models."""
@@ -153,10 +158,9 @@ def disable_selected(self, request, queryset):
153158
obj.unschedule()
154159
rows_updated += 1
155160

156-
message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"
157-
158161
level = messages.WARNING if not rows_updated else messages.INFO
159-
self.message_user(request, f"{message_bit} successfully disabled and unscheduled.", level=level)
162+
self.message_user(request, f"{get_message_bit(rows_updated)} successfully disabled and unscheduled.",
163+
level=level)
160164

161165
@admin.action(description=_("Enable selected %(verbose_name_plural)s"), permissions=("change",))
162166
def enable_selected(self, request, queryset):
@@ -166,9 +170,8 @@ def enable_selected(self, request, queryset):
166170
obj.save()
167171
rows_updated += 1
168172

169-
message_bit = "1 job was" if rows_updated == 1 else f"{rows_updated} jobs were"
170173
level = messages.WARNING if not rows_updated else messages.INFO
171-
self.message_user(request, f"{message_bit} successfully enabled and scheduled.", level=level)
174+
self.message_user(request, f"{get_message_bit(rows_updated)} successfully enabled and scheduled.", level=level)
172175

173176
@admin.action(description="Enqueue now", permissions=("change",))
174177
def enqueue_job_now(self, request, queryset):

scheduler/models/migrate_util.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
from datetime import datetime
2+
from typing import Dict, Any, Optional
3+
4+
from django.conf import settings
5+
from django.contrib.contenttypes.models import ContentType
6+
from django.utils import timezone
7+
8+
from scheduler.models.old_scheduled_task import BaseTask
9+
from scheduler.models.task import Task, TaskArg, TaskKwarg
10+
from scheduler.settings import logger
11+
from scheduler.tools import TaskType
12+
13+
14+
def job_model_str(model_str: str) -> str:
15+
if model_str.find("Job") == len(model_str) - 3:
16+
return model_str[:-3] + "Task"
17+
return model_str
18+
19+
20+
def get_task_type(model_str: str) -> TaskType:
21+
model_str = job_model_str(model_str)
22+
if TaskType(model_str):
23+
return TaskType(model_str)
24+
if model_str == "CronTask":
25+
return TaskType.CRON
26+
elif model_str == "RepeatableTask":
27+
return TaskType.REPEATABLE
28+
elif model_str in {"ScheduledTask", "OnceTask"}:
29+
return TaskType.ONCE
30+
raise ValueError(f"Invalid model {model_str}")
31+
32+
33+
def create_task_from_dict(task_dict: Dict[str, Any], recreate: bool) -> Optional[Task]:
34+
if "new_task_id" in task_dict:
35+
existing_task = Task.objects.filter(id=task_dict["new_task_id"]).first()
36+
else:
37+
existing_task = Task.objects.filter(name=task_dict["name"]).first()
38+
task_type = get_task_type(task_dict["model"])
39+
if existing_task:
40+
if recreate:
41+
logger.info(f'Found existing job "{existing_task}, removing it to be reinserted"')
42+
existing_task.delete()
43+
else:
44+
logger.info(f'Found existing job "{existing_task}", skipping')
45+
return None
46+
kwargs = dict(task_dict)
47+
kwargs["task_type"] = task_type
48+
del kwargs["model"]
49+
del kwargs["callable_args"]
50+
del kwargs["callable_kwargs"]
51+
del kwargs["new_task_id"]
52+
if kwargs.get("scheduled_time", None):
53+
target = datetime.fromisoformat(kwargs["scheduled_time"])
54+
if not settings.USE_TZ and not timezone.is_naive(target):
55+
target = timezone.make_naive(target)
56+
kwargs["scheduled_time"] = target
57+
model_fields = filter(lambda field: hasattr(field, 'attname'), Task._meta.get_fields())
58+
model_fields = set(map(lambda field: field.attname, model_fields))
59+
keys_to_ignore = list(filter(lambda _k: _k not in model_fields, kwargs.keys()))
60+
for k in keys_to_ignore:
61+
del kwargs[k]
62+
task = Task.objects.create(**kwargs)
63+
logger.info(f"Created task {task}")
64+
content_type = ContentType.objects.get_for_model(task)
65+
66+
for arg in task_dict["callable_args"]:
67+
TaskArg.objects.create(
68+
content_type=content_type,
69+
object_id=task.id,
70+
**arg,
71+
)
72+
for arg in task_dict["callable_kwargs"]:
73+
TaskKwarg.objects.create(
74+
content_type=content_type,
75+
object_id=task.id,
76+
**arg,
77+
)
78+
return task
79+
80+
81+
def migrate(old: BaseTask) -> Optional[Task]:
82+
old_task_dict = old.to_dict()
83+
new_task = create_task_from_dict(old_task_dict, old_task_dict.get("new_task_id") is not None)
84+
old.new_task_id = new_task.id
85+
old.enabled = False
86+
old.save()
87+
return new_task

scheduler/models/old_scheduled_task.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ def to_dict(self) -> Dict:
299299
failed_runs=getattr(self, "failed_runs", 0),
300300
last_successful_run=getattr(self, "last_successful_run", None),
301301
last_failed_run=getattr(self, "last_failed_run", None),
302+
new_task_id=getattr(self, "new_task_id", None),
302303
)
303304
return res
304305

@@ -396,7 +397,7 @@ class ScheduledTask(ScheduledTimeMixin, BaseTask):
396397

397398
def ready_for_schedule(self) -> bool:
398399
return super(ScheduledTask, self).ready_for_schedule() and (
399-
self.scheduled_time is None or self.scheduled_time >= timezone.now()
400+
self.scheduled_time is None or self.scheduled_time >= timezone.now()
400401
)
401402

402403
class Meta:

scheduler/tests/test_old_models/test_old_models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ def test_admin_enable_job(self):
426426
task.refresh_from_db()
427427
self.assertTrue(task.enabled)
428428
self.assertTrue(task.is_scheduled())
429-
assert_response_has_msg(res, "1 job was successfully enabled and scheduled.")
429+
assert_response_has_msg(res, "1 task was successfully enabled and scheduled.")
430430

431431
def test_admin_disable_job(self):
432432
# arrange
@@ -449,7 +449,7 @@ def test_admin_disable_job(self):
449449
task.refresh_from_db()
450450
self.assertFalse(task.is_scheduled())
451451
self.assertFalse(task.enabled)
452-
assert_response_has_msg(res, "1 job was successfully disabled and unscheduled.")
452+
assert_response_has_msg(res, "1 task was successfully disabled and unscheduled.")
453453

454454
def test_admin_single_delete(self):
455455
# arrange

scheduler/tests/test_old_models/test_old_task_model.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ def test_admin_enable_job(self):
428428
task.refresh_from_db()
429429
self.assertTrue(task.enabled)
430430
self.assertTrue(task.is_scheduled())
431-
assert_response_has_msg(res, "1 job was successfully enabled and scheduled.")
431+
assert_response_has_msg(res, "1 task was successfully enabled and scheduled.")
432432

433433
def test_admin_disable_job(self):
434434
# arrange
@@ -451,7 +451,7 @@ def test_admin_disable_job(self):
451451
task.refresh_from_db()
452452
self.assertFalse(task.is_scheduled())
453453
self.assertFalse(task.enabled)
454-
assert_response_has_msg(res, "1 job was successfully disabled and unscheduled.")
454+
assert_response_has_msg(res, "1 task was successfully disabled and unscheduled.")
455455

456456
def test_admin_single_delete(self):
457457
# arrange

0 commit comments

Comments
 (0)