Skip to content
This repository was archived by the owner on Feb 17, 2026. It is now read-only.

Commit 45de0da

Browse files
veguMatt Griswold
authored andcommitted
Race condition has a chance of multi task creation off of a schedule
1 parent 1b5602f commit 45de0da

File tree

5 files changed

+198
-2
lines changed

5 files changed

+198
-2
lines changed

src/fullctl/django/admin.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
TaskClaim,
2222
TaskHeartbeat,
2323
TaskSchedule,
24+
TaskScheduleClaim,
2425
UserSettings,
2526
)
2627
from fullctl.django.models.concrete.service_bridge import (
@@ -188,6 +189,9 @@ def recent_tasks(self, obj):
188189
tasks = obj.tasks.all()[:5]
189190
return f"{tasks}"
190191

192+
@admin.register(TaskScheduleClaim)
193+
class TaskScheduleClaimAdmin(BaseAdmin):
194+
list_display = ("task_schedule", "worker_id", "schedule_date", "created", "updated")
191195

192196
@admin.register(AuditLog)
193197
class AuditLogAdmin(admin.ModelAdmin):
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Generated by Django 4.2.13 on 2025-05-14 18:15
2+
3+
from django.db import migrations, models
4+
import django.db.models.deletion
5+
import django.db.models.manager
6+
import django_handleref.models
7+
8+
9+
class Migration(migrations.Migration):
10+
11+
dependencies = [
12+
("django_fullctl", "0038_taskheartbeat_fullctl_tas_timesta_1a9e0d_idx"),
13+
]
14+
15+
operations = [
16+
migrations.CreateModel(
17+
name="TaskScheduleClaim",
18+
fields=[
19+
("id", models.AutoField(primary_key=True, serialize=False)),
20+
(
21+
"created",
22+
django_handleref.models.CreatedDateTimeField(
23+
auto_now_add=True, verbose_name="Created"
24+
),
25+
),
26+
(
27+
"updated",
28+
django_handleref.models.UpdatedDateTimeField(
29+
auto_now=True, verbose_name="Updated"
30+
),
31+
),
32+
("version", models.IntegerField(default=0)),
33+
(
34+
"status",
35+
models.CharField(
36+
choices=[
37+
("ok", "Ok"),
38+
("pending", "Pending"),
39+
("deactivated", "Deactivated"),
40+
("failed", "Failed"),
41+
("expired", "Expired"),
42+
],
43+
default="ok",
44+
max_length=12,
45+
),
46+
),
47+
("worker_id", models.CharField(max_length=255)),
48+
("schedule_date", models.DateTimeField()),
49+
(
50+
"task_schedule",
51+
models.ForeignKey(
52+
on_delete=django.db.models.deletion.CASCADE,
53+
to="django_fullctl.taskschedule",
54+
),
55+
),
56+
],
57+
options={
58+
"verbose_name": "Task Schedule Claim",
59+
"verbose_name_plural": "Task Schedule Claims",
60+
"db_table": "fullctl_task_schedule_claim",
61+
"unique_together": {("task_schedule", "schedule_date")},
62+
},
63+
managers=[
64+
("handleref", django.db.models.manager.Manager()),
65+
],
66+
),
67+
]

src/fullctl/django/models/concrete/tasks.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from django.contrib.auth import get_user_model
1414
from django.core.exceptions import ValidationError
1515
from django.core.management import call_command
16-
from django.db import models
16+
from django.db import IntegrityError, models
1717
from django.utils import timezone
1818
from django.utils.translation import gettext_lazy as _
1919

@@ -36,6 +36,7 @@
3636
"TaskClaim",
3737
"TaskHeartbeat",
3838
"TaskSchedule",
39+
"TaskScheduleClaim",
3940
"CallCommand",
4041
]
4142

@@ -51,6 +52,9 @@ class TaskClaimed(IOError):
5152
def __init__(self, task):
5253
super().__init__(f"Task already claimed by another worker: {task}")
5354

55+
class TaskScheduleClaimed(IOError):
56+
def __init__(self, task_schedule):
57+
super().__init__(f"Task schedule already claimed by another worker: {task_schedule}")
5458

5559
class WorkerUnqualified(IOError):
5660
def __init__(self, task, qualifier):
@@ -759,6 +763,17 @@ def spawn_tasks(self):
759763

760764
if self.are_limited_tasks_pending():
761765
return []
766+
767+
# try to create a claim for the schedule
768+
try:
769+
TaskScheduleClaim.objects.create(
770+
task_schedule=self,
771+
worker_id=worker_id(),
772+
schedule_date=self.schedule,
773+
)
774+
except IntegrityError:
775+
raise TaskScheduleClaimed(self)
776+
762777

763778
for task in self.tasks.all():
764779
if task.status in ["pending", "running"]:
@@ -781,6 +796,19 @@ def spawn_tasks(self):
781796

782797
return tasks
783798

799+
class TaskScheduleClaim(HandleRefModel):
800+
task_schedule = models.ForeignKey(TaskSchedule, on_delete=models.CASCADE)
801+
worker_id = models.CharField(max_length=255)
802+
schedule_date = models.DateTimeField()
803+
804+
class Meta:
805+
db_table = "fullctl_task_schedule_claim"
806+
verbose_name = _("Task Schedule Claim")
807+
verbose_name_plural = _("Task Schedule Claims")
808+
unique_together = (("task_schedule", "schedule_date"))
809+
810+
class HandleRef:
811+
tag = "task_schedule_claim"
784812

785813
class Monitor(HandleRefModel):
786814
email = models.EmailField(

src/fullctl/django/tasks/orm.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
TaskSchedule,
1818
WorkerUnqualified,
1919
)
20-
from fullctl.django.models.concrete.tasks import TaskClaim, TaskLimitError
20+
from fullctl.django.models.concrete.tasks import TaskClaim, TaskLimitError, TaskScheduleClaimed
2121
from fullctl.django.tasks import requeue as requeue_task
2222
from fullctl.django.tasks import specify_task
2323
from fullctl.django.tasks.util import worker_id
@@ -231,5 +231,7 @@ def progress_schedules(**filters):
231231

232232
try:
233233
schedule.spawn_tasks()
234+
except TaskScheduleClaimed as exc:
235+
log.info(f"Task schedule already claimed by another worker: {exc}")
234236
except (TaskAlreadyStarted, TaskLimitError):
235237
schedule.reschedule()

tests/django_tests/test_tasks.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import pytest
22
from django.conf import settings
3+
from django.db import IntegrityError, transaction
34
from django.utils import timezone
5+
from unittest.mock import patch
46

57
import fullctl.django.tasks.orm as orm
68
import tests.django_tests.testapp.models as models
@@ -9,12 +11,17 @@
911
health_check_task_stack_queue,
1012
)
1113
from fullctl.django.models.concrete.tasks import (
14+
TaskClaim,
1215
TaskClaimed,
1316
TaskHeartbeatError,
1417
TaskLimitError,
1518
TaskMaxAgeError,
1619
TaskSchedule,
20+
TaskScheduleClaim,
21+
TaskScheduleClaimed,
22+
TaskAlreadyStarted,
1723
)
24+
from fullctl.django.tasks.util import worker_id
1825

1926

2027
@pytest.mark.django_db
@@ -194,6 +201,41 @@ def test_schedule_limited_task_manually(dj_account_objects):
194201
task_schedule.spawn_tasks()
195202

196203

204+
@pytest.mark.django_db
205+
def test_task_schedule_cannot_be_claimed_twice(dj_account_objects):
206+
"""Test that a task schedule cannot be claimed twice."""
207+
org = dj_account_objects.org
208+
209+
# Create a task schedule
210+
task_schedule = TaskSchedule.objects.create(
211+
org=org,
212+
task_config={
213+
# NO tasks so it is forced to raise a TaskScheduleClaimed and not a TaskAlreadyStarted
214+
"tasks": [
215+
],
216+
},
217+
description="test claiming",
218+
repeat=True,
219+
interval=3600,
220+
schedule=timezone.now(),
221+
)
222+
223+
schedule_date = task_schedule.schedule
224+
225+
# First claim should succeed
226+
task_schedule.spawn_tasks()
227+
228+
# reset the schedule date to the original
229+
task_schedule.schedule = schedule_date
230+
task_schedule.save()
231+
232+
# Second claim should fail with TaskScheduleClaimed exception
233+
with pytest.raises(TaskScheduleClaimed) as exc_info:
234+
task_schedule.spawn_tasks()
235+
236+
assert "Task schedule already claimed by another worker:" in str(exc_info.value)
237+
238+
197239
@pytest.mark.django_db
198240
def test_task_stack_queue_for_maximum_pending_tasks():
199241
settings.MAX_PENDING_TASKS = 10
@@ -244,3 +286,56 @@ def test_task_heartbeat_timeout():
244286

245287
with pytest.raises(TaskHeartbeatError):
246288
health_check_task_heartbeat()
289+
290+
291+
@pytest.mark.django_db
292+
def test_task_schedule_unique_claim_constraint(dj_account_objects):
293+
"""Test that the TaskScheduleClaim enforces the unique constraint."""
294+
295+
org = dj_account_objects.org
296+
297+
# Create a task schedule
298+
task_schedule = TaskSchedule.objects.create(
299+
org=org,
300+
task_config={
301+
"tasks": [],
302+
},
303+
description="test claiming constraint",
304+
repeat=True,
305+
interval=3600,
306+
schedule=timezone.now(),
307+
)
308+
309+
# Create first claim
310+
schedule_date = timezone.now()
311+
claim1 = TaskScheduleClaim.objects.create(
312+
task_schedule=task_schedule,
313+
worker_id=worker_id(),
314+
schedule_date=schedule_date,
315+
)
316+
assert claim1.id is not None
317+
318+
# Use an atomic block to contain the IntegrityError
319+
try:
320+
with transaction.atomic():
321+
# Attempting to create a second claim with the same task_schedule and schedule_date
322+
# should raise an IntegrityError
323+
TaskScheduleClaim.objects.create(
324+
task_schedule=task_schedule,
325+
worker_id=worker_id(),
326+
schedule_date=schedule_date,
327+
)
328+
# If we get here, the test should fail
329+
assert False, "Expected IntegrityError was not raised"
330+
except IntegrityError:
331+
# This is expected, continue with the test
332+
pass
333+
334+
# Creating a claim with a different schedule_date should succeed
335+
different_date = timezone.now() + timezone.timedelta(hours=1)
336+
claim2 = TaskScheduleClaim.objects.create(
337+
task_schedule=task_schedule,
338+
worker_id=worker_id(),
339+
schedule_date=different_date,
340+
)
341+
assert claim2.id is not None

0 commit comments

Comments
 (0)