Skip to content

Commit d7efc5b

Browse files
committed
fix(get_recurringtasks_to_process): Add last_picked_at
Closes: #35
1 parent 3adfffa commit d7efc5b

File tree

4 files changed

+104
-0
lines changed

4 files changed

+104
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Generated by Django 4.2.18 on 2025-04-03 07:34
2+
3+
from django.db import migrations, models
4+
import django.utils.timezone
5+
import os
6+
from common.migrations.helpers import PostgresOnlyRunSQL
7+
8+
9+
class Migration(migrations.Migration):
10+
11+
dependencies = [
12+
("task_processor", "0012_add_locked_at_and_timeout"),
13+
]
14+
15+
operations = [
16+
migrations.AddField(
17+
model_name="recurringtask",
18+
name="last_picked_at",
19+
field=models.DateTimeField(
20+
auto_now_add=True, default=django.utils.timezone.now
21+
),
22+
preserve_default=False,
23+
),
24+
PostgresOnlyRunSQL.from_sql_file(
25+
os.path.join(
26+
os.path.dirname(__file__),
27+
"sql",
28+
"0013_get_recurringtasks_to_process.sql",
29+
),
30+
reverse_sql=os.path.join(
31+
os.path.dirname(__file__),
32+
"sql",
33+
"0012_get_recurringtasks_to_process.sql",
34+
),
35+
),
36+
]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
CREATE OR REPLACE FUNCTION get_recurringtasks_to_process()
2+
RETURNS SETOF task_processor_recurringtask AS $$
3+
DECLARE
4+
row_to_return task_processor_recurringtask;
5+
BEGIN
6+
-- Select the tasks that needs to be processed
7+
FOR row_to_return IN
8+
SELECT *
9+
FROM task_processor_recurringtask
10+
-- Add one minute to the timeout as a grace period for overhead
11+
WHERE is_locked = FALSE OR (locked_at IS NOT NULL AND locked_at < NOW() - timeout + INTERVAL '1 minute')
12+
ORDER BY last_picked_at
13+
LIMIT 1
14+
-- Select for update to ensure that no other workers can select these tasks while in this transaction block
15+
FOR UPDATE SKIP LOCKED
16+
LOOP
17+
-- Lock every selected task(by updating `is_locked` to true)
18+
UPDATE task_processor_recurringtask
19+
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this
20+
-- transaction is complete (but the tasks are still being executed by the current worker)
21+
SET is_locked = TRUE, locked_at = NOW(), last_picked_at = NOW()
22+
WHERE id = row_to_return.id;
23+
-- If we don't explicitly update the columns here, the client will receive a row
24+
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`.
25+
row_to_return.is_locked := TRUE;
26+
row_to_return.locked_at := NOW();
27+
RETURN NEXT row_to_return;
28+
END LOOP;
29+
30+
RETURN;
31+
END;
32+
$$ LANGUAGE plpgsql
33+

src/task_processor/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class RecurringTask(AbstractBaseTask):
163163
locked_at = models.DateTimeField(blank=True, null=True)
164164
timeout = models.DurationField(default=timedelta(minutes=30))
165165

166+
last_picked_at = models.DateTimeField(auto_now_add=True)
166167
objects: RecurringTaskManager = RecurringTaskManager()
167168

168169
class Meta:

tests/unit/task_processor/test_unit_task_processor_processor.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,40 @@ def _dummy_recurring_task() -> None:
303303
assert task_run.error_details is None
304304

305305

306+
@pytest.mark.django_db(transaction=True)
307+
def test_run_recurring_tasks_loops_over_all_tasks(
308+
db: None,
309+
run_by_processor: None,
310+
) -> None:
311+
# Given, Three recurring tasks
312+
@register_recurring_task(run_every=timedelta(milliseconds=200))
313+
def _dummy_recurring_task_1() -> None:
314+
pass
315+
316+
@register_recurring_task(run_every=timedelta(milliseconds=200))
317+
def _dummy_recurring_task_2() -> None:
318+
pass
319+
320+
@register_recurring_task(run_every=timedelta(milliseconds=200))
321+
def _dummy_recurring_task_3() -> None:
322+
pass
323+
324+
initialise()
325+
task_runs = []
326+
327+
# When, we call run_recurring_tasks in a loop few times
328+
for _ in range(4):
329+
task_runs.append(run_recurring_tasks())
330+
331+
# Then, we should have exactly one RecurringTaskRun for each task
332+
for i in range(1, 4):
333+
task = RecurringTask.objects.get(
334+
task_identifier=f"test_unit_task_processor_processor._dummy_recurring_task_{i}",
335+
)
336+
337+
assert RecurringTaskRun.objects.filter(task=task).count() == 1
338+
339+
306340
def test_run_recurring_tasks_only_executes_tasks_after_interval_set_by_run_every(
307341
db: None,
308342
run_by_processor: None,

0 commit comments

Comments
 (0)