Skip to content

Commit d8696ae

Browse files
committed
Index task runs to improve GET performance
1 parent e55e318 commit d8696ae

File tree

4 files changed

+223
-16
lines changed

4 files changed

+223
-16
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.2 on 2026-03-13 00:00
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("etl", "0005_fix_celery_task_names"),
10+
]
11+
12+
operations = [
13+
migrations.AddIndex(
14+
model_name="taskrun",
15+
index=models.Index(
16+
fields=["task", "-started_at"],
17+
name="etl_taskrun_task_started_idx",
18+
),
19+
),
20+
]

domains/etl/models/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,11 @@ def message(self) -> str | None:
5959
@property
6060
def failure_count(self) -> int | None:
6161
return self.extract_failure_count(self.result)
62+
63+
class Meta:
64+
indexes = [
65+
models.Index(
66+
fields=["task", "-started_at"],
67+
name="etl_taskrun_task_started_idx",
68+
),
69+
]

domains/etl/services/task.py

Lines changed: 143 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,22 @@ def build_task_response(
139139
include_latest_run_result: bool = True,
140140
include_data_connection_settings: bool = True,
141141
) -> dict:
142+
latest_task_run = getattr(task, "latest_task_run", None)
143+
latest_run_message = None
144+
latest_run_failure_count = None
145+
146+
if latest_task_run:
147+
latest_run_message = getattr(latest_task_run, "message_text", None)
148+
latest_run_failure_count = getattr(
149+
latest_task_run, "failure_count_value", None
150+
)
151+
152+
if include_latest_run_result:
153+
if latest_run_message is None:
154+
latest_run_message = latest_task_run.message
155+
if latest_run_failure_count is None:
156+
latest_run_failure_count = latest_task_run.failure_count
157+
142158
response = {
143159
"id": task.id,
144160
"name": task.name,
@@ -155,18 +171,33 @@ def build_task_response(
155171
"intervalPeriod": task.periodic_task.interval.period if task.periodic_task.interval else None,
156172
} if task.periodic_task else None,
157173
"latest_run": {
158-
"id": getattr(task, "latest_run_id", None),
159-
"status": getattr(task, "latest_run_status", None),
160-
"message": getattr(task, "latest_run_message", None),
161-
"failure_count": getattr(task, "latest_run_failure_count", None),
174+
"id": latest_task_run.id if latest_task_run else getattr(task, "latest_run_id", None),
175+
"status": latest_task_run.status if latest_task_run else getattr(task, "latest_run_status", None),
176+
"message": latest_run_message if latest_task_run else getattr(task, "latest_run_message", None),
177+
"failure_count": (
178+
latest_run_failure_count
179+
if latest_task_run
180+
else getattr(task, "latest_run_failure_count", None)
181+
),
162182
"result": (
163-
getattr(task, "latest_run_result", None)
164-
if include_latest_run_result
165-
else None
183+
(
184+
latest_task_run.result
185+
if latest_task_run
186+
else getattr(task, "latest_run_result", None)
187+
)
188+
if include_latest_run_result else None
189+
),
190+
"started_at": (
191+
latest_task_run.started_at
192+
if latest_task_run
193+
else getattr(task, "latest_run_started_at", None)
166194
),
167-
"started_at": getattr(task, "latest_run_started_at", None),
168-
"finished_at": getattr(task, "latest_run_finished_at", None),
169-
} if getattr(task, "latest_run_id", None) else None,
195+
"finished_at": (
196+
latest_task_run.finished_at
197+
if latest_task_run
198+
else getattr(task, "latest_run_finished_at", None)
199+
),
200+
} if latest_task_run or getattr(task, "latest_run_id", None) else None,
170201
"extractor_variables": task.extractor_variables,
171202
"transformer_variables": task.transformer_variables,
172203
"loader_variables": task.loader_variables,
@@ -265,6 +296,72 @@ def annotate_target_identifiers(queryset: QuerySet) -> QuerySet:
265296
)
266297
)
267298

299+
@staticmethod
300+
def annotate_latest_task_run_fields(queryset: QuerySet) -> QuerySet:
301+
task_result_queryset = (
302+
TaskRun.objects
303+
.filter(task_id=OuterRef("pk"))
304+
.order_by("-started_at", "-id")
305+
)
306+
return queryset.annotate(
307+
latest_run_id=Subquery(
308+
task_result_queryset.values("id")[:1]
309+
),
310+
latest_run_status=Subquery(
311+
task_result_queryset.values("status")[:1]
312+
),
313+
latest_run_started_at=Subquery(
314+
task_result_queryset.values("started_at")[:1]
315+
),
316+
latest_run_finished_at=Subquery(
317+
task_result_queryset.values("finished_at")[:1]
318+
),
319+
)
320+
321+
@staticmethod
322+
def get_latest_runs_for_tasks(
323+
task_ids: list[uuid.UUID],
324+
include_result: bool = True,
325+
) -> dict[uuid.UUID, TaskRun]:
326+
if not task_ids:
327+
return {}
328+
329+
latest_runs = (
330+
TaskRun.objects
331+
.filter(task_id__in=task_ids)
332+
.annotate(
333+
message_text=Coalesce(
334+
KeyTextTransform("message", "result"),
335+
KeyTextTransform("summary", "result"),
336+
KeyTextTransform("statusMessage", "result"),
337+
KeyTextTransform("status_message", "result"),
338+
KeyTextTransform("failureReason", "result"),
339+
KeyTextTransform("failure_reason", "result"),
340+
KeyTextTransform("error", "result"),
341+
),
342+
failure_count_value=Coalesce(
343+
Cast(
344+
KeyTextTransform("failure_count", "result"),
345+
IntegerField(),
346+
),
347+
Cast(
348+
KeyTextTransform("failureCount", "result"),
349+
IntegerField(),
350+
),
351+
output_field=IntegerField(),
352+
),
353+
)
354+
.order_by("task_id", "-started_at", "-id")
355+
)
356+
357+
if not include_result:
358+
latest_runs = latest_runs.defer("result")
359+
360+
return {
361+
task_run.task_id: task_run
362+
for task_run in latest_runs.distinct("task_id")
363+
}
364+
268365
@staticmethod
269366
def annotate_latest_task_result(
270367
queryset: QuerySet,
@@ -273,7 +370,7 @@ def annotate_latest_task_result(
273370
task_result_queryset = (
274371
TaskRun.objects
275372
.filter(task_id=OuterRef("pk"))
276-
.order_by("-started_at")
373+
.order_by("-started_at", "-id")
277374
)
278375
annotations = {
279376
"latest_run_id": Subquery(
@@ -349,12 +446,34 @@ def list(
349446
if include_data_connection_settings is None
350447
else include_data_connection_settings
351448
)
449+
filtering = filtering or {}
450+
order_by = order_by or []
352451
queryset = Task.objects
353452

354-
queryset = self.annotate_latest_task_result(
355-
queryset,
356-
include_result=include_latest_run_result,
357-
)
453+
if (
454+
any(
455+
field in filtering
456+
for field in [
457+
"latest_run_status",
458+
"latest_run_started_at__lte",
459+
"latest_run_started_at__gte",
460+
"latest_run_finished_at__lte",
461+
"latest_run_finished_at__gte",
462+
]
463+
)
464+
or any(
465+
field.lstrip("-")
466+
in {
467+
"latestRunStatus",
468+
"latestRunStartedAt",
469+
"latestRunFinishedAt",
470+
}
471+
for field in order_by
472+
)
473+
):
474+
queryset = self.annotate_latest_task_run_fields(
475+
queryset
476+
)
358477

359478
for field in [
360479
"workspace_id",
@@ -425,14 +544,22 @@ def list(
425544
queryset = queryset.visible(principal=principal).distinct() # noqa
426545
queryset, count = self.apply_pagination(queryset, response, page, page_size)
427546

547+
queryset = list(queryset.all())
548+
latest_runs_by_task_id = self.get_latest_runs_for_tasks(
549+
[task.id for task in queryset],
550+
include_result=include_latest_run_result,
551+
)
552+
for task in queryset:
553+
task.latest_task_run = latest_runs_by_task_id.get(task.id)
554+
428555
return [
429556
self.build_task_response(
430557
task,
431558
expand=expand_related,
432559
include_mappings=include_mappings,
433560
include_latest_run_result=include_latest_run_result,
434561
include_data_connection_settings=include_data_connection_settings,
435-
) for task in queryset.all()
562+
) for task in queryset
436563
]
437564

438565
def get(

tests/etl/services/test_task.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
import uuid
3+
from datetime import datetime
34
from collections import Counter
45
from ninja.errors import HttpError
56
from django.http import HttpResponse
@@ -233,6 +234,57 @@ def test_list_task_can_expose_failure_count_without_latest_run_result(get_princi
233234
assert lean_task["latest_run"]["result"] is None
234235

235236

237+
def test_list_task_can_filter_and_order_by_latest_run_fields(get_principal):
238+
task = Task.objects.create(
239+
name="Later Task",
240+
workspace_id=uuid.UUID("b27c51a0-7374-462d-8a53-d97d47176c10"),
241+
data_connection_id=uuid.UUID("019adb5c-da8b-7970-877d-c3b4ca37cc60"),
242+
orchestration_system_id=uuid.UUID("7cb900d2-eb11-4a59-a05b-dd02d95af312"),
243+
extractor_variables={},
244+
transformer_variables={},
245+
loader_variables={},
246+
)
247+
latest_run = TaskRun.objects.create(
248+
task=task,
249+
status="FAILURE",
250+
result={
251+
"summary": "Later failed",
252+
"failure_count": 3,
253+
},
254+
)
255+
TaskRun.objects.filter(pk=latest_run.pk).update(
256+
started_at=timezone.make_aware(datetime(2025, 1, 3, 1, 0, 0)),
257+
finished_at=timezone.make_aware(datetime(2025, 1, 3, 2, 0, 0)),
258+
)
259+
260+
filtered = task_service.list(
261+
principal=get_principal("owner"),
262+
response=HttpResponse(),
263+
page=1,
264+
page_size=100,
265+
order_by=["-latestRunStartedAt"],
266+
filtering={"latest_run_status": ["FAILURE"]},
267+
include_latest_run_result=False,
268+
)
269+
270+
assert [task["name"] for task in filtered] == ["Later Task"]
271+
assert filtered[0]["latest_run"]["status"] == "FAILURE"
272+
assert filtered[0]["latest_run"]["message"] == "Later failed"
273+
assert filtered[0]["latest_run"]["failure_count"] == 3
274+
assert filtered[0]["latest_run"]["result"] is None
275+
276+
ordered = task_service.list(
277+
principal=get_principal("owner"),
278+
response=HttpResponse(),
279+
page=1,
280+
page_size=100,
281+
order_by=["-latestRunStartedAt"],
282+
filtering={},
283+
)
284+
285+
assert [task["name"] for task in ordered[:2]] == ["Later Task", "Test ETL Task"]
286+
287+
236288
def test_run_task_returns_a_new_running_run(get_principal, monkeypatch, settings):
237289
settings.CELERY_ENABLED = True
238290
principal = get_principal("owner")

0 commit comments

Comments
 (0)