Skip to content

Commit 0b6eb9c

Browse files
carlosgjsCopilotmihow
authored
New jobs API for accessing the ML task queue and posting back results (#1046)
Defines an initial set of API endpoints for an external processing service to read & pull ML tasks from the job queue and post back results (detections & predictions). - Add tasks() endpoint (stubbed) for NATS JetStream integration - Add result() endpoint (stubbed) for pipeline result processing - Add ids_only and incomplete_only query parameters to job list endpoint - Add JobFilterSet with pipeline__slug filtering and search fields - Add utility OpenApiParameter definitions for new query params The tasks() and result() endpoints are documented and stubbed out, awaiting NATS and Celery task dependencies from PR #987. * test: Add comprehensive unit tests for new JobViewSet features - test_list_jobs_with_ids_only: Verifies ids_only parameter returns job IDs - test_list_jobs_with_incomplete_only: Tests incomplete_only filtering by results stage - test_filter_by_pipeline_slug: Tests JobFilterSet pipeline__slug filtering - test_search_jobs: Tests search functionality by name and pipeline name - test_tasks_endpoint_stub: Tests stubbed tasks() endpoint - test_tasks_endpoint_with_batch: Tests batch parameter handling - test_tasks_endpoint_without_pipeline: Tests validation for missing pipeline - test_result_endpoint_stub: Tests stubbed result() endpoint - test_result_endpoint_validation: Tests request validation for result() All tests pass successfully, verifying the new API features work correctly. * refactor: Use helper methods in JobViewSet tests for cleaner code * Update ami/jobs/views.py CR feedback Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update ami/jobs/views.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update ami/jobs/views.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Use self.get_object() * Lean out implementation * Use pydantic types for tasks/results * Update tests * coderabbit feedback * Better batch validation * better tests, fix formatting settings * Fix typo * Move filtering to filter_queryset * Undo settings change * Make jobs/tasks?batch required * CR nits * Refactor * Update tests * chore: move openAPI param definitions to job/schemas * docs: add inline todos * chore: move project openAPI param to main/schemas * test: fix periodically failing test due to pagination. update asserts. * fix: add missing new schema files --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Michael Bunsen <notbot@gmail.com>
1 parent f899482 commit 0b6eb9c

File tree

10 files changed

+454
-38
lines changed

10 files changed

+454
-38
lines changed

ami/jobs/schemas.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from drf_spectacular.utils import OpenApiParameter
2+
3+
ids_only_param = OpenApiParameter(
4+
name="ids_only",
5+
description="Return only job IDs instead of full objects",
6+
required=False,
7+
type=bool,
8+
)
9+
10+
incomplete_only_param = OpenApiParameter(
11+
name="incomplete_only",
12+
description="Filter to only incomplete jobs (excludes jobs with final state in 'results' stage)",
13+
required=False,
14+
type=bool,
15+
)
16+
17+
batch_param = OpenApiParameter(
18+
name="batch",
19+
description="Number of tasks to retrieve",
20+
required=False,
21+
type=int,
22+
)

ami/jobs/serializers.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,12 @@ class Meta(JobListSerializer.Meta):
151151
fields = JobListSerializer.Meta.fields + [
152152
"result",
153153
]
154+
155+
156+
class MinimalJobSerializer(DefaultSerializer):
157+
"""Minimal serializer returning only essential job fields."""
158+
159+
class Meta:
160+
model = Job
161+
# Add other fields when needed, e.g: "name", "status", "created_at"
162+
fields = ["id"]

ami/jobs/tests.py

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# from rich import print
22
import logging
3+
from typing import Any
34

45
from django.test import TestCase
56
from guardian.shortcuts import assign_perm
@@ -88,6 +89,7 @@ def setUp(self):
8889
is_superuser=True,
8990
)
9091
self.factory = APIRequestFactory()
92+
self.pipeline = None # type: Pipeline | None
9193

9294
def test_get_job(self):
9395
self.client.force_authenticate(user=self.user)
@@ -134,6 +136,30 @@ def _create_job(self, name: str, start_now: bool = True):
134136
self.assertEqual(resp.status_code, 201)
135137
return resp.json()
136138

139+
def _create_pipeline(self, name: str = "Test Pipeline", slug: str = "test-pipeline") -> Pipeline:
140+
"""Helper to create a pipeline and add it to the project."""
141+
if self.pipeline and self.pipeline.slug == slug and self.pipeline.name == name:
142+
return self.pipeline
143+
144+
pipeline = Pipeline.objects.create(
145+
name=name,
146+
slug=slug,
147+
description=f"{name} description",
148+
)
149+
pipeline.projects.add(self.project)
150+
self.pipeline = pipeline
151+
return pipeline
152+
153+
def _create_ml_job(self, name: str, pipeline: Pipeline) -> Job:
154+
"""Helper to create an ML job with a pipeline."""
155+
return Job.objects.create(
156+
job_type_key=MLJob.key,
157+
project=self.project,
158+
name=name,
159+
pipeline=pipeline,
160+
source_image_collection=self.source_image_collection,
161+
)
162+
137163
def test_create_job(self):
138164
job_name = "Test job - Start but don't run"
139165
data = self._create_job(job_name, start_now=False)
@@ -201,3 +227,190 @@ def test_cancel_job(self):
201227
# This cannot be tested until we have a way to cancel jobs
202228
# and a way to run async tasks in tests.
203229
pass
230+
231+
def test_list_jobs_with_ids_only(self):
232+
"""Test the ids_only parameter returns only job IDs."""
233+
# Create additional jobs via API
234+
self._create_job("Test job 2", start_now=False)
235+
self._create_job("Test job 3", start_now=False)
236+
237+
self.client.force_authenticate(user=self.user)
238+
jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "ids_only": True})
239+
resp = self.client.get(jobs_list_url)
240+
241+
self.assertEqual(resp.status_code, 200)
242+
data = resp.json()
243+
self.assertIn("count", data)
244+
self.assertEqual(data["count"], 3) # Original job + 2 new ones
245+
self.assertEqual(len(data["results"]), 3)
246+
# Verify these are actually IDs
247+
self.assertTrue(all(isinstance(r["id"], int) for r in data["results"]))
248+
# Verify we don't get the full results structure
249+
self.assertNotIn("details", data["results"][0])
250+
251+
def test_list_jobs_with_incomplete_only(self):
252+
"""Test the incomplete_only parameter filters jobs correctly."""
253+
# Create jobs via API
254+
completed_data = self._create_job("Completed job", start_now=False)
255+
incomplete_data = self._create_job("Incomplete job", start_now=False)
256+
257+
# Mark completed job as complete by setting results stage to SUCCESS
258+
completed_job = Job.objects.get(pk=completed_data["id"])
259+
completed_job.progress.add_stage("results")
260+
completed_job.progress.update_stage("results", progress=1.0, status=JobState.SUCCESS)
261+
completed_job.save()
262+
263+
# Mark incomplete job as incomplete
264+
incomplete_job = Job.objects.get(pk=incomplete_data["id"])
265+
incomplete_job.progress.add_stage("results")
266+
incomplete_job.progress.update_stage("results", progress=0.5, status=JobState.STARTED)
267+
incomplete_job.save()
268+
269+
self.client.force_authenticate(user=self.user)
270+
jobs_list_url = reverse_with_params(
271+
"api:job-list", params={"project_id": self.project.pk, "incomplete_only": True}
272+
)
273+
resp = self.client.get(jobs_list_url)
274+
275+
self.assertEqual(resp.status_code, 200)
276+
data = resp.json()
277+
# Should only return the incomplete job and the original test job (which has no results stage)
278+
returned_ids = [job["id"] for job in data["results"]]
279+
self.assertIn(incomplete_job.pk, returned_ids)
280+
self.assertIn(self.job.pk, returned_ids) # Original job has no results stage
281+
self.assertNotIn(completed_job.pk, returned_ids)
282+
283+
def test_filter_by_pipeline_slug(self):
284+
"""Test filtering jobs by pipeline__slug."""
285+
pipeline = self._create_pipeline("Test Pipeline", "test-pipeline")
286+
job_with_pipeline = self._create_ml_job("Job with pipeline", pipeline)
287+
288+
self.client.force_authenticate(user=self.user)
289+
jobs_list_url = reverse_with_params(
290+
"api:job-list", params={"project_id": self.project.pk, "pipeline__slug": "test-pipeline"}
291+
)
292+
resp = self.client.get(jobs_list_url)
293+
294+
self.assertEqual(resp.status_code, 200)
295+
data = resp.json()
296+
self.assertEqual(data["count"], 1)
297+
self.assertEqual(data["results"][0]["id"], job_with_pipeline.pk)
298+
299+
def test_search_jobs(self):
300+
"""Test searching jobs by name and pipeline name."""
301+
pipeline = self._create_pipeline("SearchablePipeline", "searchable-pipeline")
302+
303+
self._create_ml_job("Find me job", pipeline)
304+
self._create_job("Other job", start_now=False)
305+
306+
self.client.force_authenticate(user=self.user)
307+
308+
# Search by job name
309+
jobs_list_url = reverse_with_params("api:job-list", params={"project_id": self.project.pk, "search": "Find"})
310+
resp = self.client.get(jobs_list_url)
311+
self.assertEqual(resp.status_code, 200)
312+
data = resp.json()
313+
self.assertEqual(data["count"], 1)
314+
self.assertIn("Find me", data["results"][0]["name"])
315+
316+
# Search by pipeline name
317+
jobs_list_url = reverse_with_params(
318+
"api:job-list", params={"project_id": self.project.pk, "search": "Searchable"}
319+
)
320+
resp = self.client.get(jobs_list_url)
321+
self.assertEqual(resp.status_code, 200)
322+
data = resp.json()
323+
self.assertEqual(data["count"], 1)
324+
self.assertEqual(data["results"][0]["pipeline"]["name"], "SearchablePipeline")
325+
326+
def _task_batch_helper(self, value: Any, expected_status: int):
327+
pipeline = self._create_pipeline()
328+
job = self._create_ml_job("Job for batch test", pipeline)
329+
330+
self.client.force_authenticate(user=self.user)
331+
tasks_url = reverse_with_params(
332+
"api:job-tasks", args=[job.pk], params={"project_id": self.project.pk, "batch": value}
333+
)
334+
resp = self.client.get(tasks_url)
335+
self.assertEqual(resp.status_code, expected_status)
336+
return resp.json()
337+
338+
def test_tasks_endpoint_with_batch(self):
339+
"""Test the tasks endpoint respects the batch parameter."""
340+
data = self._task_batch_helper(5, 200)
341+
self.assertIn("tasks", data)
342+
self.assertEqual(len(data["tasks"]), 5)
343+
344+
def test_tasks_endpoint_with_invalid_batch(self):
345+
"""Test the tasks endpoint with bad batch parameters."""
346+
347+
for value in ["invalid", None, "", 0]:
348+
with self.subTest(batch=value):
349+
self._task_batch_helper(value, 400)
350+
351+
def test_tasks_endpoint_without_pipeline(self):
352+
"""Test the tasks endpoint returns error when job has no pipeline."""
353+
# Use the existing job which doesn't have a pipeline
354+
job_data = self._create_job("Job without pipeline", start_now=False)
355+
356+
self.client.force_authenticate(user=self.user)
357+
tasks_url = reverse_with_params(
358+
"api:job-tasks", args=[job_data["id"]], params={"project_id": self.project.pk, "batch": 1}
359+
)
360+
resp = self.client.get(tasks_url)
361+
362+
self.assertEqual(resp.status_code, 400)
363+
self.assertIn("pipeline", resp.json()[0].lower())
364+
365+
def test_result_endpoint_stub(self):
366+
"""Test the result endpoint accepts results (stubbed implementation)."""
367+
pipeline = self._create_pipeline()
368+
job = self._create_ml_job("Job for results test", pipeline)
369+
370+
self.client.force_authenticate(user=self.user)
371+
result_url = reverse_with_params(
372+
"api:job-result", args=[job.pk], params={"project_id": self.project.pk, "batch": 1}
373+
)
374+
375+
result_data = [
376+
{
377+
"reply_subject": "test.reply.1",
378+
"result": {
379+
"pipeline": "test-pipeline",
380+
"algorithms": {},
381+
"total_time": 1.5,
382+
"source_images": [],
383+
"detections": [],
384+
"errors": None,
385+
},
386+
}
387+
]
388+
389+
resp = self.client.post(result_url, result_data, format="json")
390+
391+
self.assertEqual(resp.status_code, 200)
392+
data = resp.json()
393+
self.assertEqual(data["status"], "received")
394+
self.assertEqual(data["job_id"], job.pk)
395+
self.assertEqual(data["results_received"], 1)
396+
self.assertIn("message", data)
397+
398+
def test_result_endpoint_validation(self):
399+
"""Test the result endpoint validates request data."""
400+
pipeline = self._create_pipeline()
401+
job = self._create_ml_job("Job for validation test", pipeline)
402+
403+
self.client.force_authenticate(user=self.user)
404+
result_url = reverse_with_params("api:job-result", args=[job.pk], params={"project_id": self.project.pk})
405+
406+
# Test with missing reply_subject
407+
invalid_data = [{"result": {"pipeline": "test"}}]
408+
resp = self.client.post(result_url, invalid_data, format="json")
409+
self.assertEqual(resp.status_code, 400)
410+
self.assertIn("reply_subject", resp.json()[0].lower())
411+
412+
# Test with missing result
413+
invalid_data = [{"reply_subject": "test.reply"}]
414+
resp = self.client.post(result_url, invalid_data, format="json")
415+
self.assertEqual(resp.status_code, 400)
416+
self.assertIn("result", resp.json()[0].lower())

0 commit comments

Comments
 (0)