Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,20 +1,70 @@
.editorconfig
.gitattributes
.github
.gitignore
.gitlab-ci.yml
.idea
.pre-commit-config.yaml
.readthedocs.yml
.travis.yml
.git
ui
ami/media
backups
venv
.venv
.env
.envs
venv/
.venv/
.env/
.envs/
.envs/*
node_modules
data

# Python cache / bytecode
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pdb
*.egg-info/
*.egg
*.whl


# Django / runtime artifacts
*.log
*.pot
*.pyc
db.sqlite3
media/
staticfiles/ # collected static files (use collectstatic inside container)

# Node / UI dependencies (if using React/Vue in your UI service)
npm-debug.log
yarn-error.log
.pnpm-debug.log

# Docs build artifacts
/docs/_build/

# Git / VCS
.git/
.gitignore
.gitattributes
*.swp
*.swo

# IDE / editor
.vscode/
*.iml

# OS cruft
.DS_Store
Thumbs.db

# Docker itself
.dockerignore
Dockerfile
docker-compose*.yml

# Build / dist
build/
dist/
.eggs/
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,7 @@ sandbox/
# Other
flower

# huggingface cache
huggingface_cache/
# Temporary files in current work session
.agents/todo
24 changes: 23 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,29 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Remote Attach",
"name": "Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Django attach",
"type": "debugpy",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5679
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}",
"remoteRoot": "."
}
]
},
{
"name": "Celery worker attach",
"type": "debugpy",
"request": "attach",
"connect": {
Expand Down
5 changes: 3 additions & 2 deletions ami/base/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ def get_active_project(
# If not in URL, try query parameters
if not project_id:
# Look for project_id in GET query parameters or POST data
# POST data returns a list of ints, but QueryDict.get() returns a single value
project_id = request.query_params.get(param) or request.data.get(param)
# request.data may not always be a dict (e.g., for non-POST requests), so we check its type
post_data = request.data if isinstance(request.data, dict) else {}
project_id = request.query_params.get(param) or post_data.get(param)

project_id = SingleParamSerializer[int].clean(
param_name=param,
Expand Down
11 changes: 11 additions & 0 deletions ami/jobs/management/commands/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Process Single Image Command

A Django management command for processing a single image through a pipeline. Useful for testing, debugging, and reprocessing individual images.

## Usage

### With Wait Flag (Monitor Progress)

```bash
docker compose run --rm web python manage.py process_single_image 12345 --pipeline 1 --wait
```
167 changes: 167 additions & 0 deletions ami/jobs/management/commands/process_single_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Management command to process a single image through a pipeline for testing/debugging."""

import logging
import time

from django.core.management.base import BaseCommand, CommandError

from ami.jobs.utils import submit_single_image_job
from ami.main.models import Detection, SourceImage
from ami.ml.models import Pipeline

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Submit a job to process a single image through a pipeline (for testing/debugging)"

def add_arguments(self, parser):
parser.add_argument("image_id", type=int, help="SourceImage ID to process")
parser.add_argument(
"--pipeline",
type=int,
required=True,
help="Pipeline ID to use for processing",
)
parser.add_argument(
"--name",
type=str,
default=None,
help="Custom job name (optional)",
)
parser.add_argument(
"--wait",
action="store_true",
help="Wait for the job to complete and show results",
)
parser.add_argument(
"--poll-interval",
type=int,
default=2,
help="Polling interval in seconds when using --wait (default: 2)",
)

def handle(self, *args, **options):
image_id = options["image_id"]
pipeline_id = options["pipeline"]
job_name = options["name"]
wait = options["wait"]
poll_interval = options["poll_interval"]

# Validate image exists
try:
image = SourceImage.objects.select_related("deployment__project").get(pk=image_id)
if not image.deployment or not image.deployment.project:
raise CommandError(
f"SourceImage with id {image_id} is not attached to a deployment/project, cannot submit job"
)
self.stdout.write(self.style.SUCCESS(f"✓ Found image: {image.path}"))
self.stdout.write(f" Project: {image.deployment.project.name}")
self.stdout.write(f" Deployment: {image.deployment.name}")
except SourceImage.DoesNotExist:
raise CommandError(f"SourceImage with id {image_id} does not exist")

# Validate pipeline exists
try:
pipeline = Pipeline.objects.get(pk=pipeline_id)
self.stdout.write(self.style.SUCCESS(f"✓ Using pipeline: {pipeline.name} (v{pipeline.version})"))
except Pipeline.DoesNotExist:
raise CommandError(f"Pipeline with id {pipeline_id} does not exist")

# Submit the job
self.stdout.write("")
self.stdout.write(self.style.WARNING("Submitting job..."))

try:
job = submit_single_image_job(
image_id=image_id,
project_id=image.deployment.project_id,
pipeline_id=pipeline_id,
job_name=job_name,
)
except Exception as e:
raise CommandError(f"Failed to submit job: {str(e)}")

self.stdout.write(
self.style.SUCCESS(
f"✓ Job {job.pk} created and enqueued\n"
f" Task ID: {job.task_id}\n"
f" Status: {job.status}\n"
f" Name: {job.name}"
)
)

if not wait:
self.stdout.write("")
self.stdout.write("To check job status, run:")
self.stdout.write(f" Job.objects.get(pk={job.pk}).status")
return

# Wait for job completion
self.stdout.write("")
self.stdout.write(self.style.WARNING("Waiting for job to complete..."))
self.stdout.write("(Press Ctrl+C to stop waiting)\n")

try:
start_time = time.time()
last_status = None
last_progress = None

while True:
job.refresh_from_db()
if job.progress and job.progress.summary and job.progress.summary.progress is not None:
progress = job.progress.summary.progress * 100
else:
progress = 0.0
status = job.status

# Only update display if something changed
if status != last_status or abs(progress - (last_progress or 0)) > 0.1:
elapsed = time.time() - start_time
self.stdout.write(
f" Status: {status:15s} | Progress: {progress:5.1f}% | Elapsed: {elapsed:6.1f}s",
ending="\r",
)
last_status = status
last_progress = progress

# Check if job is done
if job.status in ["SUCCESS", "FAILURE", "REVOKED", "REJECTED"]:
self.stdout.write("") # New line after progress updates
break

time.sleep(poll_interval)

except KeyboardInterrupt:
self.stdout.write("")
self.stdout.write(self.style.WARNING("\n⚠ Stopped waiting (job is still running)"))
self.stdout.write(f" Job ID: {job.pk}")
return

# Show results
self.stdout.write("")
elapsed_total = time.time() - start_time

if job.status == "SUCCESS":
self.stdout.write(self.style.SUCCESS(f"✓ Job completed successfully in {elapsed_total:.1f}s"))

# Show results summary
detection_count = Detection.objects.filter(source_image_id=image_id).count()
self.stdout.write("\nResults:")
self.stdout.write(f" Detections created: {detection_count}")

# Show classifications if any
if detection_count > 0:
from ami.main.models import Classification

classification_count = Classification.objects.filter(detection__source_image_id=image_id).count()
self.stdout.write(f" Classifications created: {classification_count}")

elif job.status == "FAILURE":
self.stdout.write(self.style.ERROR(f"✗ Job failed after {elapsed_total:.1f}s"))
self.stdout.write("\nCheck job logs for details:")
self.stdout.write(f" Job.objects.get(pk={job.pk}).logs")
else:
self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}"))

self.stdout.write(f"\nJob ID: {job.pk}")
72 changes: 72 additions & 0 deletions ami/jobs/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Utility functions for job management and testing."""

import logging

from ami.jobs.models import Job, MLJob
from ami.main.models import Project, SourceImage
from ami.ml.models import Pipeline

logger = logging.getLogger(__name__)


def submit_single_image_job(
image_id: int,
pipeline_id: int,
project_id: int,
job_name: str | None = None,
) -> Job:
"""
Submit a job to process a single image through a pipeline.

This is useful for testing, debugging, or reprocessing individual images.

Args:
image_id: The SourceImage ID to process
pipeline_id: The Pipeline ID to use for processing
project_id: Optional project ID (will be inferred from image if not provided)
job_name: Optional custom job name (will be auto-generated if not provided)

Returns:
The created Job instance

Raises:
SourceImage.DoesNotExist: If the image doesn't exist
Pipeline.DoesNotExist: If the pipeline doesn't exist
"""
# Fetch the image and validate it exists
try:
image = SourceImage.objects.select_related("deployment__project").get(pk=image_id)
except SourceImage.DoesNotExist:
logger.error(f"SourceImage with id {image_id} does not exist")
raise

# Fetch the pipeline and validate it exists
try:
pipeline = Pipeline.objects.get(pk=pipeline_id)
except Pipeline.DoesNotExist:
logger.error(f"Pipeline with id {pipeline_id} does not exist")
raise

project = Project.objects.get(pk=project_id)

# Generate job name if not provided
if job_name is None:
job_name = f"Single image {image_id} - {pipeline.name}"

# Create the job
job = Job.objects.create(
name=job_name,
project=project,
pipeline=pipeline,
job_type_key=MLJob.key,
source_image_single=image,
)

logger.info(f"Created job {job.pk} for single image {image_id} with pipeline {pipeline.name} (id: {pipeline_id})")

# Enqueue the job (starts the Celery task)
job.enqueue()

logger.info(f"Job {job.pk} enqueued with task_id: {job.task_id}")

return job
Loading