|
5 | 5 | from django.utils import timezone
|
6 | 6 |
|
7 | 7 | from scheduler import settings
|
| 8 | +from scheduler.models import TaskType, Task |
8 | 9 | from scheduler.redis_models import JobModel
|
9 | 10 | from scheduler.tests.test_task_types.test_task_model import BaseTestCases
|
10 | 11 | from scheduler.tests.testtools import task_factory, _get_task_scheduled_job_from_registry
|
11 |
| -from scheduler.models import TaskType |
12 | 12 | from scheduler.types import SchedulerConfiguration
|
13 | 13 |
|
14 | 14 |
|
15 | 15 | class TestRepeatableTask(BaseTestCases.TestSchedulableTask):
|
16 | 16 | task_type = TaskType.REPEATABLE
|
17 | 17 | queue_name = settings.get_queue_names()[0]
|
18 | 18 |
|
| 19 | + def test_create_task_error(self): |
| 20 | + scheduled_time = timezone.now() |
| 21 | + |
| 22 | + Task.objects.create( |
| 23 | + name="konichiva_every_2s", |
| 24 | + callable="chat.task_scheduler.konichiva_func", |
| 25 | + task_type="REPEATABLE", |
| 26 | + interval=2, |
| 27 | + interval_unit="seconds", |
| 28 | + queue="default", |
| 29 | + enabled=True, |
| 30 | + scheduled_time=scheduled_time, |
| 31 | + ) |
| 32 | + |
19 | 33 | def test_unschedulable_old_job(self):
|
20 | 34 | job = task_factory(self.task_type, scheduled_time=timezone.now() - timedelta(hours=1), repeat=0)
|
21 | 35 | self.assertFalse(job.is_scheduled())
|
@@ -145,7 +159,7 @@ def test_repeat_none_interval_2_min(self):
|
145 | 159 | task = task_factory(self.task_type, scheduled_time=base_time - timedelta(minutes=29), repeat=None)
|
146 | 160 | task.interval = 120
|
147 | 161 | task.interval_unit = "seconds"
|
148 |
| - task._schedule() |
| 162 | + task.save() |
149 | 163 | self.assertTrue(task.scheduled_time > base_time)
|
150 | 164 | self.assertTrue(task.is_scheduled())
|
151 | 165 |
|
|
0 commit comments