2424logger = logging .getLogger (__name__ )
2525
2626
27+ class JobDispatchMode (models .TextChoices ):
28+ """
29+ How a job dispatches its workload.
30+
31+ Jobs are configured and launched by users in the UI, then dispatched to
32+ Celery workers. This enum describes what the worker does with the work:
33+
34+ - INTERNAL: All work happens within the platform (Celery worker handles it directly).
35+ - SYNC_API: Worker calls an external processing service API and waits for each response.
36+ - ASYNC_API: Worker queues items to a message broker (NATS) for external processing
37+ service workers to pick up and process independently.
38+ """
39+
40+ # Work is handled entirely within the platform, no external service calls.
41+ # e.g. DataStorageSyncJob, DataExportJob, SourceImageCollectionPopulateJob
42+ INTERNAL = "internal" , "Internal"
43+
44+ # Worker loops over items, sends each to an external processing service
45+ # endpoint synchronously, and waits for the response before continuing.
46+ SYNC_API = "sync_api" , "Sync API"
47+
48+ # Worker publishes all items to a message broker (NATS). External processing
49+ # service workers consume and process them independently, reporting results back.
50+ ASYNC_API = "async_api" , "Async API"
51+
52+
2753class JobState (str , OrderedEnum ):
2854 """
2955 These come from Celery, except for CREATED, which is a custom state.
@@ -83,7 +109,7 @@ def python_slugify(value: str) -> str:
83109
84110
85111class JobProgressSummary (pydantic .BaseModel ):
86- """Summary of all stages of a job"""
112+ """Top-level status and progress for a job, shown in the UI. """
87113
88114 status : JobState = JobState .CREATED
89115 progress : float = 0
@@ -106,7 +132,17 @@ class JobProgressStageDetail(ConfigurableStage, JobProgressSummary):
106132
107133
108134class JobProgress (pydantic .BaseModel ):
109- """The full progress of a job and its stages."""
135+ """
136+ The user-facing progress of a job, stored as JSONB on the Job model.
137+
138+ This is what the UI displays and what external APIs read. Contains named
139+ stages ("process", "results") with per-stage params (progress percentage,
140+ detections/classifications/captures counts, failed count).
141+
142+ For async (NATS) jobs, updated by _update_job_progress() in ami/jobs/tasks.py
143+ which copies snapshots from the internal Redis-backed AsyncJobStateManager.
144+ For sync jobs, updated directly in MLJob.process_images().
145+ """
110146
111147 summary : JobProgressSummary
112148 stages : list [JobProgressStageDetail ]
@@ -196,6 +232,34 @@ def reset(self, status: JobState = JobState.CREATED):
196232 for stage in self .stages :
197233 stage .progress = 0
198234 stage .status = status
235+ # Reset numeric param values to 0
236+ for param in stage .params :
237+ if isinstance (param .value , (int , float )):
238+ param .value = 0
239+
240+ def is_complete (self ) -> bool :
241+ """
242+ Check if all stages have finished processing.
243+
244+ A job is considered complete when ALL of its stages have:
245+ - progress >= 1.0 (fully processed)
246+ - status in a final state (SUCCESS, FAILURE, or REVOKED)
247+
248+ This method works for any job type regardless of which stages it has.
249+ It's used by the Celery task_postrun signal to determine whether to
250+ set the job's final SUCCESS status, or defer to async progress handlers.
251+
252+ Related: Job.update_progress() calculates the aggregate
253+ progress percentage across all stages for display purposes. This method
254+ is a binary check for completion that considers both progress AND status.
255+
256+ Returns:
257+ True if all stages are complete, False otherwise.
258+ Returns False if job has no stages (shouldn't happen in practice).
259+ """
260+ if not self .stages :
261+ return False
262+ return all (stage .progress >= 1.0 and stage .status in JobState .final_states () for stage in self .stages )
199263
200264 class Config :
201265 use_enum_values = True
@@ -398,6 +462,8 @@ def run(cls, job: "Job"):
398462 job .save ()
399463
400464 if job .project .feature_flags .async_pipeline_workers :
465+ job .dispatch_mode = JobDispatchMode .ASYNC_API
466+ job .save (update_fields = ["dispatch_mode" ])
401467 queued = queue_images_to_nats (job , images )
402468 if not queued :
403469 job .logger .error ("Aborting job %s because images could not be queued to NATS" , job .pk )
@@ -407,6 +473,8 @@ def run(cls, job: "Job"):
407473 job .save ()
408474 return
409475 else :
476+ job .dispatch_mode = JobDispatchMode .SYNC_API
477+ job .save (update_fields = ["dispatch_mode" ])
410478 cls .process_images (job , images )
411479
412480 @classmethod
@@ -507,7 +575,8 @@ def process_images(cls, job, images):
507575
508576 job .logger .info (f"All tasks completed for job { job .pk } " )
509577
510- FAILURE_THRESHOLD = 0.5
578+ from ami .jobs .tasks import FAILURE_THRESHOLD
579+
511580 if image_count and (percent_successful < FAILURE_THRESHOLD ):
512581 job .progress .update_stage ("process" , status = JobState .FAILURE )
513582 job .save ()
@@ -798,6 +867,12 @@ class Job(BaseModel):
798867 blank = True ,
799868 related_name = "jobs" ,
800869 )
870+ dispatch_mode = models .CharField (
871+ max_length = 32 ,
872+ choices = JobDispatchMode .choices ,
873+ default = JobDispatchMode .INTERNAL ,
874+ help_text = "How the job dispatches its workload: internal, sync_api, or async_api." ,
875+ )
801876
802877 def __str__ (self ) -> str :
803878 return f'#{ self .pk } "{ self .name } " ({ self .status } )'
0 commit comments