Skip to content

Commit 9fce850

Browse files
committed
Add support for scheduling jobs to run in the future
1 parent 0254ac1 commit 9fce850

File tree

4 files changed

+48
-1
lines changed

4 files changed

+48
-1
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ Job.objects.create(name='critical_job', priority=2)
129129

130130
Jobs will be ordered by their `priority` (highest to lowest) and then the time which they were created (oldest to newest) and processed in that order.
131131

132+
### Scheduling jobs
133+
If you'd like to create a job but have it run at some time in the future, you can use the `run_after` field on the Job model:
134+
135+
```python
136+
Job.objects.create(name='scheduled_job', run_after=timezone.now() + timedelta(minutes=10))
137+
```
138+
139+
Of course, the job will only be run if your `python manage.py worker` process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed.
140+
132141
## Terminology
133142

134143
### Job
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Generated by Django 3.2rc1 on 2021-11-04 03:32
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('django_dbq', '0004_auto_20210818_0247'),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name='job',
15+
name='run_after',
16+
field=models.DateTimeField(db_index=True, null=True),
17+
),
18+
]

django_dbq/models.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ def delete_old(self):
6868

6969
def to_process(self, queue_name):
7070
return self.select_for_update().filter(
71-
queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW)
71+
models.Q(queue_name=queue_name) &
72+
models.Q(state__in=(Job.STATES.READY, Job.STATES.NEW)) &
73+
models.Q(
74+
models.Q(run_after__isnull=True) |
75+
models.Q(run_after__lte=timezone.now())
76+
)
7277
)
7378

7479

@@ -91,6 +96,7 @@ class STATES(TextChoices):
9196
workspace = JSONField(null=True)
9297
queue_name = models.CharField(max_length=20, default="default", db_index=True)
9398
priority = models.SmallIntegerField(default=0, db_index=True)
99+
run_after = models.DateTimeField(null=True, db_index=True)
94100

95101
class Meta:
96102
ordering = ["-priority", "created"]

django_dbq/tests.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,20 @@ def test_gets_jobs_in_priority_and_date_order(self):
210210
self.assertEqual(Job.objects.get_ready_or_none("default"), job_1)
211211
self.assertFalse(Job.objects.to_process("default").filter(id=job_2.id).exists())
212212

213+
def test_ignores_jobs_until_run_after_is_in_the_past(self):
214+
job_1 = Job.objects.create(name="testjob")
215+
job_2 = Job.objects.create(name="testjob", run_after=datetime(2021, 11, 4, 8))
216+
217+
with freezegun.freeze_time(datetime(2021, 11, 4, 7)):
218+
self.assertEqual(
219+
{job for job in Job.objects.to_process("default")}, {job_1}
220+
)
221+
222+
with freezegun.freeze_time(datetime(2021, 11, 4, 9)):
223+
self.assertEqual(
224+
{job for job in Job.objects.to_process("default")}, {job_1, job_2}
225+
)
226+
213227
def test_get_next_ready_job_created(self):
214228
"""
215229
Created jobs should be picked too.

0 commit comments

Comments
 (0)