Skip to content

Commit ae57e94

Browse files
committed
feat: create a /jobs api to return queue and history jobs
1 parent 2640acb commit ae57e94

File tree

4 files changed

+652
-5
lines changed

4 files changed

+652
-5
lines changed

execution.py

Lines changed: 199 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,8 @@ class ExecutionStatus(NamedTuple):
11241124
messages: List[str]
11251125

11261126
def task_done(self, item_id, history_result,
1127-
status: Optional['PromptQueue.ExecutionStatus'], process_item=None):
1127+
status: Optional['PromptQueue.ExecutionStatus'], process_item=None,
1128+
execution_time: Optional[float] = None):
11281129
with self.mutex:
11291130
prompt = self.currently_running.pop(item_id)
11301131
if len(self.history) > MAXIMUM_HISTORY_SIZE:
@@ -1141,6 +1142,7 @@ def task_done(self, item_id, history_result,
11411142
"prompt": prompt,
11421143
"outputs": {},
11431144
'status': status_dict,
1145+
'execution_time': execution_time,
11441146
}
11451147
self.history[prompt[1]].update(history_result)
11461148
self.server.queue_updated()
@@ -1217,6 +1219,202 @@ def delete_history_item(self, id_to_delete):
12171219
with self.mutex:
12181220
self.history.pop(id_to_delete, None)
12191221

1222+
def get_job(self, prompt_id):
1223+
"""Get a single job by prompt_id from history or queue."""
1224+
with self.mutex:
1225+
# Check history first (most likely location)
1226+
if prompt_id in self.history:
1227+
return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True)
1228+
1229+
# Check currently running
1230+
for item in self.currently_running.values():
1231+
if item[1] == prompt_id:
1232+
return self._normalize_queue_item(item, 'in_progress')
1233+
1234+
# Check pending queue
1235+
for item in self.queue:
1236+
if item[1] == prompt_id:
1237+
return self._normalize_queue_item(item, 'pending')
1238+
1239+
return None
1240+
1241+
def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
1242+
"""
1243+
Get all jobs (running, pending, completed) with filtering and sorting.
1244+
1245+
Args:
1246+
status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error']
1247+
sort_by: field to sort by ('created_at', 'execution_time')
1248+
sort_order: 'asc' or 'desc'
1249+
limit: maximum number of items to return
1250+
offset: number of items to skip
1251+
1252+
Returns:
1253+
tuple: (jobs_list, total_count)
1254+
"""
1255+
with self.mutex:
1256+
jobs = []
1257+
1258+
if status_filter is None:
1259+
status_filter = ['pending', 'in_progress', 'completed', 'error']
1260+
1261+
if 'in_progress' in status_filter:
1262+
for item in self.currently_running.values():
1263+
jobs.append(self._normalize_queue_item(item, 'in_progress'))
1264+
1265+
if 'pending' in status_filter:
1266+
for item in self.queue:
1267+
jobs.append(self._normalize_queue_item(item, 'pending'))
1268+
1269+
include_completed = 'completed' in status_filter
1270+
include_error = 'error' in status_filter
1271+
if include_completed or include_error:
1272+
for prompt_id, history_item in self.history.items():
1273+
is_error = history_item.get('status', {}).get('status_str') == 'error'
1274+
if (is_error and include_error) or (not is_error and include_completed):
1275+
jobs.append(self._normalize_history_item(prompt_id, history_item))
1276+
1277+
jobs = self._apply_sorting(jobs, sort_by, sort_order)
1278+
1279+
total_count = len(jobs)
1280+
1281+
if offset > 0:
1282+
jobs = jobs[offset:]
1283+
if limit is not None:
1284+
jobs = jobs[:limit]
1285+
1286+
return (jobs, total_count)
1287+
1288+
def _normalize_queue_item(self, item, status):
1289+
"""Convert queue item tuple to unified job dict."""
1290+
number, prompt_id, prompt, extra_data, outputs_to_execute = item[:5]
1291+
create_time = extra_data.get('create_time')
1292+
1293+
return {
1294+
'id': prompt_id,
1295+
'status': status,
1296+
'create_time': create_time,
1297+
'execution_time': None,
1298+
'error_message': None,
1299+
'outputs_count': 0,
1300+
'preview_output': None,
1301+
'workflow_id': None,
1302+
}
1303+
1304+
def _normalize_history_item(self, prompt_id, history_item, include_outputs=False):
1305+
"""Convert history item dict to unified job dict."""
1306+
prompt_tuple = history_item['prompt']
1307+
number, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5]
1308+
create_time = extra_data.get('create_time')
1309+
1310+
# Determine status from history status
1311+
status_info = history_item.get('status', {})
1312+
if status_info:
1313+
status = 'completed' if status_info.get('status_str') == 'success' else 'error'
1314+
else:
1315+
status = 'completed'
1316+
1317+
outputs = history_item.get('outputs', {})
1318+
1319+
outputs_count, preview_output = self._get_outputs_summary(outputs)
1320+
1321+
# Get error message from status if error
1322+
error_message = None
1323+
if status == 'error' and status_info:
1324+
messages = status_info.get('messages', [])
1325+
if messages:
1326+
error_message = messages[0] if isinstance(messages[0], str) else str(messages[0])
1327+
1328+
execution_time = history_item.get('execution_time')
1329+
1330+
job = {
1331+
'id': prompt_id,
1332+
'status': status,
1333+
'create_time': create_time,
1334+
'execution_time': execution_time,
1335+
'error_message': error_message,
1336+
'outputs_count': outputs_count,
1337+
'preview_output': preview_output,
1338+
'workflow_id': None,
1339+
}
1340+
1341+
if include_outputs:
1342+
job['outputs'] = outputs
1343+
job['prompt'] = prompt
1344+
job['extra_data'] = extra_data
1345+
job['outputs_to_execute'] = outputs_to_execute
1346+
1347+
return job
1348+
1349+
def _get_outputs_summary(self, outputs):
1350+
"""
1351+
Count outputs and find preview in a single pass.
1352+
Returns (outputs_count, preview_output).
1353+
1354+
Preview priority (matching frontend):
1355+
1. type="output" with previewable media
1356+
2. Any previewable media
1357+
"""
1358+
count = 0
1359+
preview_output = None
1360+
fallback_preview = None
1361+
1362+
for node_id, node_outputs in outputs.items():
1363+
for media_type, items in node_outputs.items():
1364+
if media_type == 'animated' or not isinstance(items, list):
1365+
continue
1366+
for item in items:
1367+
count += 1
1368+
1369+
# Skip if we already have the best preview (type=output)
1370+
if preview_output is not None:
1371+
continue
1372+
1373+
filename = item.get('filename', '').lower()
1374+
fmt = item.get('format', '')
1375+
1376+
# Check if previewable (image/video/audio/3D) - matching frontend logic
1377+
is_previewable = (
1378+
media_type == 'images' or
1379+
media_type == 'video' or
1380+
media_type == 'audio' or
1381+
filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images
1382+
filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video
1383+
filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio
1384+
filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D
1385+
(fmt and (fmt.startswith('video/') or fmt.startswith('audio/')))
1386+
)
1387+
1388+
if not is_previewable:
1389+
continue
1390+
1391+
enriched = {
1392+
**item,
1393+
'nodeId': node_id,
1394+
'mediaType': media_type
1395+
}
1396+
1397+
if item.get('type') == 'output':
1398+
preview_output = enriched
1399+
elif fallback_preview is None:
1400+
fallback_preview = enriched
1401+
1402+
return count, preview_output or fallback_preview
1403+
1404+
def _apply_sorting(self, jobs, sort_by, sort_order):
1405+
"""Sort jobs list by specified field and order."""
1406+
reverse = (sort_order == 'desc')
1407+
1408+
if sort_by == 'execution_time':
1409+
def get_sort_key(job):
1410+
return job.get('execution_time') or 0
1411+
else:
1412+
# Default to create_time
1413+
def get_sort_key(job):
1414+
return job.get('create_time') or 0
1415+
1416+
return sorted(jobs, key=get_sort_key, reverse=reverse)
1417+
12201418
def set_flag(self, name, data):
12211419
with self.mutex:
12221420
self.flags[name] = data

main.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,21 @@ def prompt_worker(q, server_instance):
202202
e.execute(item[2], prompt_id, extra_data, item[4])
203203
need_gc = True
204204

205+
current_time = time.perf_counter()
206+
execution_time = current_time - execution_start_time
207+
205208
remove_sensitive = lambda prompt: prompt[:5] + prompt[6:]
206209
q.task_done(item_id,
207210
e.history_result,
208211
status=execution.PromptQueue.ExecutionStatus(
209212
status_str='success' if e.success else 'error',
210213
completed=e.success,
211-
messages=e.status_messages), process_item=remove_sensitive)
214+
messages=e.status_messages),
215+
process_item=remove_sensitive,
216+
execution_time=execution_time)
212217
if server_instance.client_id is not None:
213218
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id)
214219

215-
current_time = time.perf_counter()
216-
execution_time = current_time - execution_start_time
217-
218220
# Log Time in a more readable way after 10 minutes
219221
if execution_time > 600:
220222
execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time))

server.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,96 @@ async def get_object_info_node(request):
688688
out[node_class] = node_info(node_class)
689689
return web.json_response(out)
690690

691+
@routes.get("/api/jobs")
692+
async def get_jobs(request):
693+
"""List all jobs with filtering, sorting, and pagination."""
694+
query = request.rel_url.query
695+
696+
status_param = query.get("status", None)
697+
status_filter = None
698+
if status_param:
699+
status_filter = [s.strip() for s in status_param.split(',') if s.strip()]
700+
valid_statuses = {'pending', 'in_progress', 'completed', 'error'}
701+
status_filter = [s for s in status_filter if s in valid_statuses]
702+
if not status_filter:
703+
status_filter = None
704+
705+
sort_by = query.get('sort', 'created_at')
706+
if sort_by != 'created_at':
707+
return web.json_response(
708+
{"error": "sort must be 'created_at'"},
709+
status=400
710+
)
711+
712+
sort_order = query.get('order', 'desc')
713+
if sort_order not in {'asc', 'desc'}:
714+
return web.json_response(
715+
{"error": "order must be 'asc' or 'desc'"},
716+
status=400
717+
)
718+
719+
limit = None
720+
if 'limit' in query:
721+
try:
722+
limit = int(query.get('limit'))
723+
if limit <= 0 or limit > 500:
724+
return web.json_response(
725+
{"error": "limit must be between 1 and 500"},
726+
status=400
727+
)
728+
except (ValueError, TypeError):
729+
return web.json_response(
730+
{"error": "limit must be an integer"},
731+
status=400
732+
)
733+
734+
offset = 0
735+
if 'offset' in query:
736+
try:
737+
offset = int(query.get('offset'))
738+
if offset < 0:
739+
offset = 0
740+
except (ValueError, TypeError):
741+
return web.json_response(
742+
{"error": "offset must be an integer"},
743+
status=400
744+
)
745+
746+
jobs, total = self.prompt_queue.get_all_jobs(
747+
status_filter=status_filter,
748+
sort_by=sort_by,
749+
sort_order=sort_order,
750+
limit=limit,
751+
offset=offset
752+
)
753+
754+
has_more = (offset + len(jobs)) < total
755+
756+
return web.json_response({
757+
'jobs': jobs,
758+
'total': total,
759+
'has_more': has_more
760+
})
761+
762+
@routes.get("/api/jobs/{job_id}")
763+
async def get_job(request):
764+
"""Get a single job by ID."""
765+
job_id = request.match_info.get("job_id", None)
766+
if not job_id:
767+
return web.json_response(
768+
{"error": "job_id is required"},
769+
status=400
770+
)
771+
772+
job = self.prompt_queue.get_job(job_id)
773+
if job is None:
774+
return web.json_response(
775+
{"error": "Job not found"},
776+
status=404
777+
)
778+
779+
return web.json_response(job)
780+
691781
@routes.get("/history")
692782
async def get_history(request):
693783
max_items = request.rel_url.query.get("max_items", None)

0 commit comments

Comments
 (0)