Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions comfy_execution/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""
Job utilities for the /api/jobs endpoint.
Provides normalization and helper functions for job status tracking.
"""


class JobStatus:
"""Job status constants."""
PENDING = 'pending'
IN_PROGRESS = 'in_progress'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the cancelled status exist?

The /api/jobs endpoint will always return an empty list for this status, and normalize_history_item will treat this status as JobStatus.COMPLETED.


ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED]


# Media types that can be previewed in the frontend
PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'})

# 3D file extensions for preview fallback (no dedicated media_type exists)
THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'})


def is_previewable(media_type, item):
"""
Check if an output item is previewable.
Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts
Priority:
1. media_type is 'images', 'video', or 'audio'
2. format field starts with 'video/' or 'audio/'
3. filename has a 3D extension (.obj, .fbx, .gltf, .glb)
"""
if media_type in PREVIEWABLE_MEDIA_TYPES:
return True

# Check format field (MIME type)
fmt = item.get('format', '')
if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')):
return True

# Check for 3D files by extension
filename = item.get('filename', '').lower()
if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS):
return True

return False


def normalize_queue_item(item, status):
"""Convert queue item tuple to unified job dict."""
priority, prompt_id, _, extra_data, _ = item[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')

return {
'id': prompt_id,
'status': status,
'priority': priority,
'create_time': create_time,
'execution_error': None,
'execution_start_time': None,
'execution_end_time': None,
'outputs_count': 0,
'preview_output': None,
'workflow_id': workflow_id,
}


def normalize_history_item(prompt_id, history_item, include_outputs=False):
"""Convert history item dict to unified job dict."""
prompt_tuple = history_item['prompt']
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')

status_info = history_item.get('status', {})
status_str = status_info.get('status_str') if status_info else None
if status_str == 'success':
status = JobStatus.COMPLETED
elif status_str == 'error':
status = JobStatus.FAILED
else:
status = JobStatus.COMPLETED

outputs = history_item.get('outputs', {})
outputs_count, preview_output = get_outputs_summary(outputs)

execution_error = None
execution_start_time = None
execution_end_time = None
if status_info:
messages = status_info.get('messages', [])
for entry in messages:
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
event_name, event_data = entry[0], entry[1]
if isinstance(event_data, dict):
if event_name == 'execution_start':
execution_start_time = event_data.get('timestamp')
elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'):
execution_end_time = event_data.get('timestamp')
if event_name == 'execution_error':
execution_error = event_data

job = {
'id': prompt_id,
'status': status,
'priority': priority,
'create_time': create_time,
'execution_error': execution_error,
'execution_start_time': execution_start_time,
'execution_end_time': execution_end_time,
'outputs_count': outputs_count,
'preview_output': preview_output,
'workflow_id': workflow_id,
}

if include_outputs:
job['outputs'] = outputs
job['execution_status'] = status_info
job['workflow'] = {
'prompt': prompt,
'extra_data': extra_data,
}

return job


def get_outputs_summary(outputs):
"""
Count outputs and find preview in a single pass.
Returns (outputs_count, preview_output).
Preview priority (matching frontend):
1. type="output" with previewable media
2. Any previewable media
"""
count = 0
preview_output = None
fallback_preview = None

for node_id, node_outputs in outputs.items():
for media_type, items in node_outputs.items():
if media_type == 'animated' or not isinstance(items, list):
continue

for item in items:
count += 1

if preview_output is None and is_previewable(media_type, item):
enriched = {
**item,
'nodeId': node_id,
'mediaType': media_type
}
if item.get('type') == 'output':
preview_output = enriched
elif fallback_preview is None:
fallback_preview = enriched

return count, preview_output or fallback_preview


def apply_sorting(jobs, sort_by, sort_order):
"""Sort jobs list by specified field and order."""
reverse = (sort_order == 'desc')

if sort_by == 'execution_duration':
def get_sort_key(job):
start = job.get('execution_start_time') or 0
end = job.get('execution_end_time') or 0
return end - start if end and start else 0
else:
def get_sort_key(job):
return job.get('create_time') or 0

return sorted(jobs, key=get_sort_key, reverse=reverse)


def get_job(prompt_id, running, queued, history):
"""
Get a single job by prompt_id from history or queue.
Args:
prompt_id: The prompt ID to look up
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id
Returns:
Job dict with full details, or None if not found
"""
if prompt_id in history:
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)

for item in running:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.IN_PROGRESS)

for item in queued:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.PENDING)

return None


def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
"""
Get all jobs (running, pending, completed) with filtering and sorting.
Args:
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id
status_filter: List of statuses to include (from JobStatus.ALL)
sort_by: Field to sort by ('created_at', 'execution_duration')
sort_order: 'asc' or 'desc'
limit: Maximum number of items to return
offset: Number of items to skip
Returns:
tuple: (jobs_list, total_count)
"""
jobs = []

if status_filter is None:
status_filter = JobStatus.ALL

if JobStatus.IN_PROGRESS in status_filter:
for item in running:
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))

if JobStatus.PENDING in status_filter:
for item in queued:
jobs.append(normalize_queue_item(item, JobStatus.PENDING))

include_completed = JobStatus.COMPLETED in status_filter
include_failed = JobStatus.FAILED in status_filter
if include_completed or include_failed:
for prompt_id, history_item in history.items():
is_failed = history_item.get('status', {}).get('status_str') == 'error'
if (is_failed and include_failed) or (not is_failed and include_completed):
jobs.append(normalize_history_item(prompt_id, history_item))

jobs = apply_sorting(jobs, sort_by, sort_order)

total_count = len(jobs)

if offset > 0:
jobs = jobs[offset:]
if limit is not None:
jobs = jobs[:limit]

return (jobs, total_count)
102 changes: 102 additions & 0 deletions server.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment to before -- likely out of scope for this, but can we start introducing some structure to this repo and start breaking up this superclass to separate files for maintainability and extensibility

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import nodes
import folder_paths
import execution
from comfy_execution.jobs import JobStatus, get_job, get_all_jobs
import uuid
import urllib
import json
Expand Down Expand Up @@ -694,6 +695,107 @@ async def get_object_info_node(request):
out[node_class] = node_info(node_class)
return web.json_response(out)

@routes.get("/api/jobs")
async def get_jobs(request):
"""List all jobs with filtering, sorting, and pagination."""
query = request.rel_url.query

status_param = query.get("status", None)
status_filter = None
if status_param:
status_filter = [s.strip() for s in status_param.split(',') if s.strip()]
Copy link
Contributor

@bigcat88 bigcat88 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the caller passes status=foo, then:

  • status_filter becomes ["foo"].
  • Filtered against valid_statuses -> []
  • status_filter becomes None and call returns full results with 200 OK

That will be pretty surprising for the caller

valid_statuses = set(JobStatus.ALL)
status_filter = [s for s in status_filter if s in valid_statuses]
if not status_filter:
status_filter = None

sort_by = query.get('sort_by', 'created_at')
if sort_by not in {'created_at', 'execution_duration'}:
return web.json_response(
{"error": "sort_by must be 'created_at' or 'execution_duration'"},
status=400
)

sort_order = query.get('sort_order', 'desc')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add lowercase conversion here?

if sort_order not in {'asc', 'desc'}:
return web.json_response(
{"error": "sort_order must be 'asc' or 'desc'"},
status=400
)

limit = None
if 'limit' in query:
try:
limit = int(query.get('limit'))
if limit <= 0:
return web.json_response(
{"error": "limit must be a positive integer"},
status=400
)
except (ValueError, TypeError):
return web.json_response(
{"error": "limit must be an integer"},
status=400
)

offset = 0
if 'offset' in query:
try:
offset = int(query.get('offset'))
if offset < 0:
offset = 0
except (ValueError, TypeError):
return web.json_response(
{"error": "offset must be an integer"},
status=400
)

running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history()

jobs, total = get_all_jobs(
running, queued, history,
status_filter=status_filter,
sort_by=sort_by,
sort_order=sort_order,
limit=limit,
offset=offset
)

has_more = (offset + len(jobs)) < total

return web.json_response({
'jobs': jobs,
'pagination': {
'offset': offset,
'limit': limit,
'total': total,
'has_more': has_more
}
})

@routes.get("/api/jobs/{job_id}")
async def get_job_by_id(request):
"""Get a single job by ID."""
job_id = request.match_info.get("job_id", None)
if not job_id:
return web.json_response(
{"error": "job_id is required"},
status=400
)

running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history(prompt_id=job_id)

job = get_job(job_id, running, queued, history)
if job is None:
return web.json_response(
{"error": "Job not found"},
status=404
)

return web.json_response(job)

@routes.get("/history")
async def get_history(request):
max_items = request.rel_url.query.get("max_items", None)
Expand Down
Loading