Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5aa07d7
feat: create a /jobs api to return queue and history jobs
ric-yu Dec 2, 2025
aa00641
update unused vars
ric-yu Dec 2, 2025
b2bd48e
include priority
ric-yu Dec 2, 2025
b874f46
create jobs helper file
ric-yu Dec 3, 2025
380b6ae
fix ruff
ric-yu Dec 3, 2025
048c413
update how we set error message
ric-yu Dec 3, 2025
2e0b26b
include execution error in both responses
ric-yu Dec 4, 2025
e4c7136
rename error -> failed, fix output shape
ric-yu Dec 4, 2025
90fb5cc
re-use queue and history functions
ric-yu Dec 4, 2025
8a2bb7c
set workflow id
ric-yu Dec 4, 2025
b7c7712
allow srot by exec duration
ric-yu Dec 4, 2025
a38aacf
fix tests
ric-yu Dec 4, 2025
1b67306
send priority and remove error msg
ric-yu Dec 4, 2025
5274b91
use ws messages to get start and end times
ric-yu Dec 4, 2025
c860cc6
revert main.py fully
ric-yu Dec 4, 2025
5ed90a1
refactor: move all /jobs business logic to jobs.py
ric-yu Dec 4, 2025
b14ae80
fix failing test
ric-yu Dec 4, 2025
a10cb30
remove some tests
ric-yu Dec 4, 2025
86590ca
fix non dict nodes
ric-yu Dec 8, 2025
460b848
address comments
ric-yu Dec 8, 2025
1f7c1a9
filter by workflow id and remove null fields
ric-yu Dec 9, 2025
ed61899
add clearer typing - remove get("..") or ..
ric-yu Dec 12, 2025
c8a1d2e
refactor query params to top get_job(s) doc, add remove_sensitive_fro…
ric-yu Dec 12, 2025
7f4fb73
Merge branch 'master' into feature/unified-jobs-api
Kosinkadink Dec 16, 2025
03a7f1c
add brief comment explaining why we skip animated
ric-yu Dec 17, 2025
724fbf8
comment that format field is for frontend backward compatibility
ric-yu Dec 17, 2025
786703a
fix whitespace
ric-yu Dec 18, 2025
be341c9
Merge branch 'master' into feature/unified-jobs-api
guill Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions comfy_execution/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""
Job utilities for the /api/jobs endpoint.
Provides normalization and helper functions for job status tracking.
"""


class JobStatus:
"""Job status constants."""
PENDING = 'pending'
IN_PROGRESS = 'in_progress'
COMPLETED = 'completed'
FAILED = 'failed'
CANCELLED = 'cancelled'

ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED]


# Media types that can be previewed in the frontend
PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'})

# 3D file extensions for preview fallback (no dedicated media_type exists)
THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'})


def is_previewable(media_type, item):
"""
Check if an output item is previewable.
Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts

Priority:
1. media_type is 'images', 'video', or 'audio'
2. format field starts with 'video/' or 'audio/'
3. filename has a 3D extension (.obj, .fbx, .gltf, .glb)
"""
if media_type in PREVIEWABLE_MEDIA_TYPES:
return True

# Check format field (MIME type)
fmt = item.get('format', '')
if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')):
return True

# Check for 3D files by extension
filename = item.get('filename', '').lower()
if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS):
return True

return False


def normalize_queue_item(item, status):
"""Convert queue item tuple to unified job dict."""
priority, prompt_id, _, extra_data, _ = item[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')

return {
'id': prompt_id,
'status': status,
'priority': priority,
'create_time': create_time,
'execution_error': None,
'execution_start_time': None,
'execution_end_time': None,
'outputs_count': 0,
'preview_output': None,
'workflow_id': workflow_id,
}


def normalize_history_item(prompt_id, history_item, include_outputs=False):
"""Convert history item dict to unified job dict."""
prompt_tuple = history_item['prompt']
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
create_time = extra_data.get('create_time')
extra_pnginfo = extra_data.get('extra_pnginfo') or {}
workflow_id = extra_pnginfo.get('workflow', {}).get('id')

status_info = history_item.get('status', {})
status_str = status_info.get('status_str') if status_info else None
if status_str == 'success':
status = JobStatus.COMPLETED
elif status_str == 'error':
status = JobStatus.FAILED
else:
status = JobStatus.COMPLETED

outputs = history_item.get('outputs', {})
outputs_count, preview_output = get_outputs_summary(outputs)

execution_error = None
execution_start_time = None
execution_end_time = None
if status_info:
messages = status_info.get('messages', [])
for entry in messages:
if isinstance(entry, (list, tuple)) and len(entry) >= 2:
event_name, event_data = entry[0], entry[1]
if isinstance(event_data, dict):
if event_name == 'execution_start':
execution_start_time = event_data.get('timestamp')
elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'):
execution_end_time = event_data.get('timestamp')
if event_name == 'execution_error':
execution_error = event_data

job = {
'id': prompt_id,
'status': status,
'priority': priority,
'create_time': create_time,
'execution_error': execution_error,
'execution_start_time': execution_start_time,
'execution_end_time': execution_end_time,
'outputs_count': outputs_count,
'preview_output': preview_output,
'workflow_id': workflow_id,
}

if include_outputs:
job['outputs'] = outputs
job['execution_status'] = status_info
job['workflow'] = {
'prompt': prompt,
'extra_data': extra_data,
}

return job


def get_outputs_summary(outputs):
"""
Count outputs and find preview in a single pass.
Returns (outputs_count, preview_output).

Preview priority (matching frontend):
1. type="output" with previewable media
2. Any previewable media
"""
count = 0
preview_output = None
fallback_preview = None

for node_id, node_outputs in outputs.items():
for media_type, items in node_outputs.items():
if media_type == 'animated' or not isinstance(items, list):
continue

for item in items:
count += 1

if preview_output is None and is_previewable(media_type, item):
enriched = {
**item,
'nodeId': node_id,
'mediaType': media_type
}
if item.get('type') == 'output':
preview_output = enriched
elif fallback_preview is None:
fallback_preview = enriched

return count, preview_output or fallback_preview


def apply_sorting(jobs, sort_by, sort_order):
"""Sort jobs list by specified field and order."""
reverse = (sort_order == 'desc')

if sort_by == 'execution_duration':
def get_sort_key(job):
start = job.get('execution_start_time') or 0
end = job.get('execution_end_time') or 0
return end - start if end and start else 0
else:
def get_sort_key(job):
return job.get('create_time') or 0

return sorted(jobs, key=get_sort_key, reverse=reverse)


def get_job(prompt_id, running, queued, history):
"""
Get a single job by prompt_id from history or queue.

Args:
prompt_id: The prompt ID to look up
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id

Returns:
Job dict with full details, or None if not found
"""
if prompt_id in history:
return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True)

for item in running:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.IN_PROGRESS)

for item in queued:
if item[1] == prompt_id:
return normalize_queue_item(item, JobStatus.PENDING)

return None


def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
"""
Get all jobs (running, pending, completed) with filtering and sorting.

Args:
running: List of currently running queue items
queued: List of pending queue items
history: Dict of history items keyed by prompt_id
status_filter: List of statuses to include (from JobStatus.ALL)
sort_by: Field to sort by ('created_at', 'execution_duration')
sort_order: 'asc' or 'desc'
limit: Maximum number of items to return
offset: Number of items to skip

Returns:
tuple: (jobs_list, total_count)
"""
jobs = []

if status_filter is None:
status_filter = JobStatus.ALL

if JobStatus.IN_PROGRESS in status_filter:
for item in running:
jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS))

if JobStatus.PENDING in status_filter:
for item in queued:
jobs.append(normalize_queue_item(item, JobStatus.PENDING))

include_completed = JobStatus.COMPLETED in status_filter
include_failed = JobStatus.FAILED in status_filter
if include_completed or include_failed:
for prompt_id, history_item in history.items():
is_failed = history_item.get('status', {}).get('status_str') == 'error'
if (is_failed and include_failed) or (not is_failed and include_completed):
jobs.append(normalize_history_item(prompt_id, history_item))

jobs = apply_sorting(jobs, sort_by, sort_order)

total_count = len(jobs)

if offset > 0:
jobs = jobs[offset:]
if limit is not None:
jobs = jobs[:limit]

return (jobs, total_count)
102 changes: 102 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import nodes
import folder_paths
import execution
from comfy_execution.jobs import JobStatus, get_job, get_all_jobs
import uuid
import urllib
import json
Expand Down Expand Up @@ -694,6 +695,107 @@ async def get_object_info_node(request):
out[node_class] = node_info(node_class)
return web.json_response(out)

@routes.get("/api/jobs")
async def get_jobs(request):
"""List all jobs with filtering, sorting, and pagination."""
query = request.rel_url.query

status_param = query.get("status", None)
status_filter = None
if status_param:
status_filter = [s.strip() for s in status_param.split(',') if s.strip()]
valid_statuses = set(JobStatus.ALL)
status_filter = [s for s in status_filter if s in valid_statuses]
if not status_filter:
status_filter = None

sort_by = query.get('sort_by', 'created_at')
if sort_by not in {'created_at', 'execution_duration'}:
return web.json_response(
{"error": "sort_by must be 'created_at' or 'execution_duration'"},
status=400
)

sort_order = query.get('sort_order', 'desc')
if sort_order not in {'asc', 'desc'}:
return web.json_response(
{"error": "sort_order must be 'asc' or 'desc'"},
status=400
)

limit = None
if 'limit' in query:
try:
limit = int(query.get('limit'))
if limit <= 0:
return web.json_response(
{"error": "limit must be a positive integer"},
status=400
)
except (ValueError, TypeError):
return web.json_response(
{"error": "limit must be an integer"},
status=400
)

offset = 0
if 'offset' in query:
try:
offset = int(query.get('offset'))
if offset < 0:
offset = 0
except (ValueError, TypeError):
return web.json_response(
{"error": "offset must be an integer"},
status=400
)

running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history()

jobs, total = get_all_jobs(
running, queued, history,
status_filter=status_filter,
sort_by=sort_by,
sort_order=sort_order,
limit=limit,
offset=offset
)

has_more = (offset + len(jobs)) < total

return web.json_response({
'jobs': jobs,
'pagination': {
'offset': offset,
'limit': limit,
'total': total,
'has_more': has_more
}
})

@routes.get("/api/jobs/{job_id}")
async def get_job_by_id(request):
"""Get a single job by ID."""
job_id = request.match_info.get("job_id", None)
if not job_id:
return web.json_response(
{"error": "job_id is required"},
status=400
)

running, queued = self.prompt_queue.get_current_queue_volatile()
history = self.prompt_queue.get_history(prompt_id=job_id)

job = get_job(job_id, running, queued, history)
if job is None:
return web.json_response(
{"error": "Job not found"},
status=404
)

return web.json_response(job)

@routes.get("/history")
async def get_history(request):
max_items = request.rel_url.query.get("max_items", None)
Expand Down
Loading