Skip to content

Commit c4b7a84

Browse files
committed
feat: create a /jobs api to return queue and history jobs
1 parent 2640acb commit c4b7a84

File tree

4 files changed

+454
-5
lines changed

4 files changed

+454
-5
lines changed

execution.py

Lines changed: 195 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,8 @@ class ExecutionStatus(NamedTuple):
11241124
messages: List[str]
11251125

11261126
def task_done(self, item_id, history_result,
1127-
status: Optional['PromptQueue.ExecutionStatus'], process_item=None):
1127+
status: Optional['PromptQueue.ExecutionStatus'], process_item=None,
1128+
execution_time: Optional[float] = None):
11281129
with self.mutex:
11291130
prompt = self.currently_running.pop(item_id)
11301131
if len(self.history) > MAXIMUM_HISTORY_SIZE:
@@ -1141,6 +1142,7 @@ def task_done(self, item_id, history_result,
11411142
"prompt": prompt,
11421143
"outputs": {},
11431144
'status': status_dict,
1145+
'execution_time': execution_time,
11441146
}
11451147
self.history[prompt[1]].update(history_result)
11461148
self.server.queue_updated()
@@ -1217,6 +1219,198 @@ def delete_history_item(self, id_to_delete):
12171219
with self.mutex:
12181220
self.history.pop(id_to_delete, None)
12191221

1222+
def get_job(self, prompt_id):
1223+
"""Get a single job by prompt_id from history or queue."""
1224+
with self.mutex:
1225+
if prompt_id in self.history:
1226+
return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True)
1227+
1228+
for item in self.currently_running.values():
1229+
if item[1] == prompt_id:
1230+
return self._normalize_queue_item(item, 'in_progress')
1231+
1232+
for item in self.queue:
1233+
if item[1] == prompt_id:
1234+
return self._normalize_queue_item(item, 'pending')
1235+
1236+
return None
1237+
1238+
def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0):
1239+
"""
1240+
Get all jobs (running, pending, completed) with filtering and sorting.
1241+
1242+
Args:
1243+
status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error']
1244+
sort_by: field to sort by ('created_at', 'execution_time')
1245+
sort_order: 'asc' or 'desc'
1246+
limit: maximum number of items to return
1247+
offset: number of items to skip
1248+
1249+
Returns:
1250+
tuple: (jobs_list, total_count)
1251+
"""
1252+
with self.mutex:
1253+
jobs = []
1254+
1255+
if status_filter is None:
1256+
status_filter = ['pending', 'in_progress', 'completed', 'error']
1257+
1258+
if 'in_progress' in status_filter:
1259+
for item in self.currently_running.values():
1260+
jobs.append(self._normalize_queue_item(item, 'in_progress'))
1261+
1262+
if 'pending' in status_filter:
1263+
for item in self.queue:
1264+
jobs.append(self._normalize_queue_item(item, 'pending'))
1265+
1266+
include_completed = 'completed' in status_filter
1267+
include_error = 'error' in status_filter
1268+
if include_completed or include_error:
1269+
for prompt_id, history_item in self.history.items():
1270+
is_error = history_item.get('status', {}).get('status_str') == 'error'
1271+
if (is_error and include_error) or (not is_error and include_completed):
1272+
jobs.append(self._normalize_history_item(prompt_id, history_item))
1273+
1274+
jobs = self._apply_sorting(jobs, sort_by, sort_order)
1275+
1276+
total_count = len(jobs)
1277+
1278+
if offset > 0:
1279+
jobs = jobs[offset:]
1280+
if limit is not None:
1281+
jobs = jobs[:limit]
1282+
1283+
return (jobs, total_count)
1284+
1285+
def _normalize_queue_item(self, item, status):
1286+
"""Convert queue item tuple to unified job dict."""
1287+
number, prompt_id, prompt, extra_data, outputs_to_execute = item[:5]
1288+
create_time = extra_data.get('create_time')
1289+
1290+
return {
1291+
'id': prompt_id,
1292+
'status': status,
1293+
'create_time': create_time,
1294+
'execution_time': None,
1295+
'error_message': None,
1296+
'outputs_count': 0,
1297+
'preview_output': None,
1298+
'workflow_id': None,
1299+
}
1300+
1301+
def _normalize_history_item(self, prompt_id, history_item, include_outputs=False):
1302+
"""Convert history item dict to unified job dict."""
1303+
prompt_tuple = history_item['prompt']
1304+
number, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5]
1305+
create_time = extra_data.get('create_time')
1306+
1307+
# Determine status from history status
1308+
status_info = history_item.get('status', {})
1309+
if status_info:
1310+
status = 'completed' if status_info.get('status_str') == 'success' else 'error'
1311+
else:
1312+
status = 'completed'
1313+
1314+
outputs = history_item.get('outputs', {})
1315+
1316+
outputs_count, preview_output = self._get_outputs_summary(outputs)
1317+
1318+
error_message = None
1319+
if status == 'error' and status_info:
1320+
messages = status_info.get('messages', [])
1321+
if messages:
1322+
error_message = messages[0] if isinstance(messages[0], str) else str(messages[0])
1323+
1324+
execution_time = history_item.get('execution_time')
1325+
1326+
job = {
1327+
'id': prompt_id,
1328+
'status': status,
1329+
'create_time': create_time,
1330+
'execution_time': execution_time,
1331+
'error_message': error_message,
1332+
'outputs_count': outputs_count,
1333+
'preview_output': preview_output,
1334+
'workflow_id': None,
1335+
}
1336+
1337+
if include_outputs:
1338+
job['outputs'] = outputs
1339+
job['prompt'] = prompt
1340+
job['extra_data'] = extra_data
1341+
job['outputs_to_execute'] = outputs_to_execute
1342+
1343+
return job
1344+
1345+
def _get_outputs_summary(self, outputs):
1346+
"""
1347+
Count outputs and find preview in a single pass.
1348+
Returns (outputs_count, preview_output).
1349+
1350+
Preview priority (matching frontend):
1351+
1. type="output" with previewable media
1352+
2. Any previewable media
1353+
"""
1354+
count = 0
1355+
preview_output = None
1356+
fallback_preview = None
1357+
1358+
for node_id, node_outputs in outputs.items():
1359+
for media_type, items in node_outputs.items():
1360+
if media_type == 'animated' or not isinstance(items, list):
1361+
continue
1362+
for item in items:
1363+
count += 1
1364+
1365+
# Skip if we already have the best preview (type=output)
1366+
if preview_output is not None:
1367+
continue
1368+
1369+
filename = item.get('filename', '').lower()
1370+
fmt = item.get('format', '')
1371+
1372+
# Check if previewable (image/video/audio/3D) - matching frontend logic
1373+
is_previewable = (
1374+
media_type == 'images' or
1375+
media_type == 'video' or
1376+
media_type == 'audio' or
1377+
filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images
1378+
filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video
1379+
filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio
1380+
filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D
1381+
(fmt and (fmt.startswith('video/') or fmt.startswith('audio/')))
1382+
)
1383+
1384+
if not is_previewable:
1385+
continue
1386+
1387+
enriched = {
1388+
**item,
1389+
'nodeId': node_id,
1390+
'mediaType': media_type
1391+
}
1392+
1393+
if item.get('type') == 'output':
1394+
preview_output = enriched
1395+
elif fallback_preview is None:
1396+
fallback_preview = enriched
1397+
1398+
return count, preview_output or fallback_preview
1399+
1400+
def _apply_sorting(self, jobs, sort_by, sort_order):
1401+
"""Sort jobs list by specified field and order."""
1402+
reverse = (sort_order == 'desc')
1403+
1404+
if sort_by == 'execution_time':
1405+
def get_sort_key(job):
1406+
return job.get('execution_time') or 0
1407+
else:
1408+
# Default to create_time
1409+
def get_sort_key(job):
1410+
return job.get('create_time') or 0
1411+
1412+
return sorted(jobs, key=get_sort_key, reverse=reverse)
1413+
12201414
def set_flag(self, name, data):
12211415
with self.mutex:
12221416
self.flags[name] = data

main.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,19 +202,21 @@ def prompt_worker(q, server_instance):
202202
e.execute(item[2], prompt_id, extra_data, item[4])
203203
need_gc = True
204204

205+
current_time = time.perf_counter()
206+
execution_time = current_time - execution_start_time
207+
205208
remove_sensitive = lambda prompt: prompt[:5] + prompt[6:]
206209
q.task_done(item_id,
207210
e.history_result,
208211
status=execution.PromptQueue.ExecutionStatus(
209212
status_str='success' if e.success else 'error',
210213
completed=e.success,
211-
messages=e.status_messages), process_item=remove_sensitive)
214+
messages=e.status_messages),
215+
process_item=remove_sensitive,
216+
execution_time=execution_time)
212217
if server_instance.client_id is not None:
213218
server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id)
214219

215-
current_time = time.perf_counter()
216-
execution_time = current_time - execution_start_time
217-
218220
# Log Time in a more readable way after 10 minutes
219221
if execution_time > 600:
220222
execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time))

server.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,96 @@ async def get_object_info_node(request):
688688
out[node_class] = node_info(node_class)
689689
return web.json_response(out)
690690

691+
@routes.get("/api/jobs")
692+
async def get_jobs(request):
693+
"""List all jobs with filtering, sorting, and pagination."""
694+
query = request.rel_url.query
695+
696+
status_param = query.get("status", None)
697+
status_filter = None
698+
if status_param:
699+
status_filter = [s.strip() for s in status_param.split(',') if s.strip()]
700+
valid_statuses = {'pending', 'in_progress', 'completed', 'error'}
701+
status_filter = [s for s in status_filter if s in valid_statuses]
702+
if not status_filter:
703+
status_filter = None
704+
705+
sort_by = query.get('sort', 'created_at')
706+
if sort_by != 'created_at':
707+
return web.json_response(
708+
{"error": "sort must be 'created_at'"},
709+
status=400
710+
)
711+
712+
sort_order = query.get('order', 'desc')
713+
if sort_order not in {'asc', 'desc'}:
714+
return web.json_response(
715+
{"error": "order must be 'asc' or 'desc'"},
716+
status=400
717+
)
718+
719+
limit = None
720+
if 'limit' in query:
721+
try:
722+
limit = int(query.get('limit'))
723+
if limit <= 0 or limit > 500:
724+
return web.json_response(
725+
{"error": "limit must be between 1 and 500"},
726+
status=400
727+
)
728+
except (ValueError, TypeError):
729+
return web.json_response(
730+
{"error": "limit must be an integer"},
731+
status=400
732+
)
733+
734+
offset = 0
735+
if 'offset' in query:
736+
try:
737+
offset = int(query.get('offset'))
738+
if offset < 0:
739+
offset = 0
740+
except (ValueError, TypeError):
741+
return web.json_response(
742+
{"error": "offset must be an integer"},
743+
status=400
744+
)
745+
746+
jobs, total = self.prompt_queue.get_all_jobs(
747+
status_filter=status_filter,
748+
sort_by=sort_by,
749+
sort_order=sort_order,
750+
limit=limit,
751+
offset=offset
752+
)
753+
754+
has_more = (offset + len(jobs)) < total
755+
756+
return web.json_response({
757+
'jobs': jobs,
758+
'total': total,
759+
'has_more': has_more
760+
})
761+
762+
@routes.get("/api/jobs/{job_id}")
763+
async def get_job(request):
764+
"""Get a single job by ID."""
765+
job_id = request.match_info.get("job_id", None)
766+
if not job_id:
767+
return web.json_response(
768+
{"error": "job_id is required"},
769+
status=400
770+
)
771+
772+
job = self.prompt_queue.get_job(job_id)
773+
if job is None:
774+
return web.json_response(
775+
{"error": "Job not found"},
776+
status=404
777+
)
778+
779+
return web.json_response(job)
780+
691781
@routes.get("/history")
692782
async def get_history(request):
693783
max_items = request.rel_url.query.get("max_items", None)

0 commit comments

Comments
 (0)