From 5aa07d777cfd6369ef9689b0068a4dbd8c9a541a Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 11:17:25 -0800 Subject: [PATCH 01/18] feat: create a /jobs api to return queue and history jobs --- execution.py | 196 +++++++++++++++++++++++++++++- main.py | 10 +- server.py | 90 ++++++++++++++ tests/execution/test_execution.py | 163 +++++++++++++++++++++++++ 4 files changed, 454 insertions(+), 5 deletions(-) diff --git a/execution.py b/execution.py index c2186ac98147..815788ab275c 100644 --- a/execution.py +++ b/execution.py @@ -1130,7 +1130,8 @@ class ExecutionStatus(NamedTuple): messages: List[str] def task_done(self, item_id, history_result, - status: Optional['PromptQueue.ExecutionStatus'], process_item=None): + status: Optional['PromptQueue.ExecutionStatus'], process_item=None, + execution_time: Optional[float] = None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1147,6 +1148,7 @@ def task_done(self, item_id, history_result, "prompt": prompt, "outputs": {}, 'status': status_dict, + 'execution_time': execution_time, } self.history[prompt[1]].update(history_result) self.server.queue_updated() @@ -1223,6 +1225,198 @@ def delete_history_item(self, id_to_delete): with self.mutex: self.history.pop(id_to_delete, None) + def get_job(self, prompt_id): + """Get a single job by prompt_id from history or queue.""" + with self.mutex: + if prompt_id in self.history: + return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + + for item in self.currently_running.values(): + if item[1] == prompt_id: + return self._normalize_queue_item(item, 'in_progress') + + for item in self.queue: + if item[1] == prompt_id: + return self._normalize_queue_item(item, 'pending') + + return None + + def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): + """ + Get all jobs (running, pending, completed) with filtering and sorting. + + Args: + status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error'] + sort_by: field to sort by ('created_at', 'execution_time') + sort_order: 'asc' or 'desc' + limit: maximum number of items to return + offset: number of items to skip + + Returns: + tuple: (jobs_list, total_count) + """ + with self.mutex: + jobs = [] + + if status_filter is None: + status_filter = ['pending', 'in_progress', 'completed', 'error'] + + if 'in_progress' in status_filter: + for item in self.currently_running.values(): + jobs.append(self._normalize_queue_item(item, 'in_progress')) + + if 'pending' in status_filter: + for item in self.queue: + jobs.append(self._normalize_queue_item(item, 'pending')) + + include_completed = 'completed' in status_filter + include_error = 'error' in status_filter + if include_completed or include_error: + for prompt_id, history_item in self.history.items(): + is_error = history_item.get('status', {}).get('status_str') == 'error' + if (is_error and include_error) or (not is_error and include_completed): + jobs.append(self._normalize_history_item(prompt_id, history_item)) + + jobs = self._apply_sorting(jobs, sort_by, sort_order) + + total_count = len(jobs) + + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) + + def _normalize_queue_item(self, item, status): + """Convert queue item tuple to unified job dict.""" + number, prompt_id, prompt, extra_data, outputs_to_execute = item[:5] + create_time = extra_data.get('create_time') + + return { + 'id': prompt_id, + 'status': status, + 'create_time': create_time, + 'execution_time': None, + 'error_message': None, + 'outputs_count': 0, + 'preview_output': None, + 'workflow_id': None, + } + + def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): + """Convert history item dict to unified job dict.""" + prompt_tuple = history_item['prompt'] + number, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + create_time = extra_data.get('create_time') + + # Determine status from history status + status_info = history_item.get('status', {}) + if status_info: + status = 'completed' if status_info.get('status_str') == 'success' else 'error' + else: + status = 'completed' + + outputs = history_item.get('outputs', {}) + + outputs_count, preview_output = self._get_outputs_summary(outputs) + + error_message = None + if status == 'error' and status_info: + messages = status_info.get('messages', []) + if messages: + error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) + + execution_time = history_item.get('execution_time') + + job = { + 'id': prompt_id, + 'status': status, + 'create_time': create_time, + 'execution_time': execution_time, + 'error_message': error_message, + 'outputs_count': outputs_count, + 'preview_output': preview_output, + 'workflow_id': None, + } + + if include_outputs: + job['outputs'] = outputs + job['prompt'] = prompt + job['extra_data'] = extra_data + job['outputs_to_execute'] = outputs_to_execute + + return job + + def _get_outputs_summary(self, outputs): + """ + Count outputs and find preview in a single pass. + Returns (outputs_count, preview_output). + + Preview priority (matching frontend): + 1. type="output" with previewable media + 2. Any previewable media + """ + count = 0 + preview_output = None + fallback_preview = None + + for node_id, node_outputs in outputs.items(): + for media_type, items in node_outputs.items(): + if media_type == 'animated' or not isinstance(items, list): + continue + for item in items: + count += 1 + + # Skip if we already have the best preview (type=output) + if preview_output is not None: + continue + + filename = item.get('filename', '').lower() + fmt = item.get('format', '') + + # Check if previewable (image/video/audio/3D) - matching frontend logic + is_previewable = ( + media_type == 'images' or + media_type == 'video' or + media_type == 'audio' or + filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images + filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video + filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio + filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D + (fmt and (fmt.startswith('video/') or fmt.startswith('audio/'))) + ) + + if not is_previewable: + continue + + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched + + return count, preview_output or fallback_preview + + def _apply_sorting(self, jobs, sort_by, sort_order): + """Sort jobs list by specified field and order.""" + reverse = (sort_order == 'desc') + + if sort_by == 'execution_time': + def get_sort_key(job): + return job.get('execution_time') or 0 + else: + # Default to create_time + def get_sort_key(job): + return job.get('create_time') or 0 + + return sorted(jobs, key=get_sort_key, reverse=reverse) + def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/main.py b/main.py index 0cd815d9e620..7605f1f021be 100644 --- a/main.py +++ b/main.py @@ -229,19 +229,21 @@ def prompt_worker(q, server_instance): e.execute(item[2], prompt_id, extra_data, item[4]) need_gc = True + current_time = time.perf_counter() + execution_time = current_time - execution_start_time + remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, e.history_result, status=execution.PromptQueue.ExecutionStatus( status_str='success' if e.success else 'error', completed=e.success, - messages=e.status_messages), process_item=remove_sensitive) + messages=e.status_messages), + process_item=remove_sensitive, + execution_time=execution_time) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) - current_time = time.perf_counter() - execution_time = current_time - execution_start_time - # Log Time in a more readable way after 10 minutes if execution_time > 600: execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) diff --git a/server.py b/server.py index ac4f42222261..a9466c12d2e7 100644 --- a/server.py +++ b/server.py @@ -694,6 +694,96 @@ async def get_object_info_node(request): out[node_class] = node_info(node_class) return web.json_response(out) + @routes.get("/api/jobs") + async def get_jobs(request): + """List all jobs with filtering, sorting, and pagination.""" + query = request.rel_url.query + + status_param = query.get("status", None) + status_filter = None + if status_param: + status_filter = [s.strip() for s in status_param.split(',') if s.strip()] + valid_statuses = {'pending', 'in_progress', 'completed', 'error'} + status_filter = [s for s in status_filter if s in valid_statuses] + if not status_filter: + status_filter = None + + sort_by = query.get('sort', 'created_at') + if sort_by != 'created_at': + return web.json_response( + {"error": "sort must be 'created_at'"}, + status=400 + ) + + sort_order = query.get('order', 'desc') + if sort_order not in {'asc', 'desc'}: + return web.json_response( + {"error": "order must be 'asc' or 'desc'"}, + status=400 + ) + + limit = None + if 'limit' in query: + try: + limit = int(query.get('limit')) + if limit <= 0 or limit > 500: + return web.json_response( + {"error": "limit must be between 1 and 500"}, + status=400 + ) + except (ValueError, TypeError): + return web.json_response( + {"error": "limit must be an integer"}, + status=400 + ) + + offset = 0 + if 'offset' in query: + try: + offset = int(query.get('offset')) + if offset < 0: + offset = 0 + except (ValueError, TypeError): + return web.json_response( + {"error": "offset must be an integer"}, + status=400 + ) + + jobs, total = self.prompt_queue.get_all_jobs( + status_filter=status_filter, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + offset=offset + ) + + has_more = (offset + len(jobs)) < total + + return web.json_response({ + 'jobs': jobs, + 'total': total, + 'has_more': has_more + }) + + @routes.get("/api/jobs/{job_id}") + async def get_job(request): + """Get a single job by ID.""" + job_id = request.match_info.get("job_id", None) + if not job_id: + return web.json_response( + {"error": "job_id is required"}, + status=400 + ) + + job = self.prompt_queue.get_job(job_id) + if job is None: + return web.json_response( + {"error": "Job not found"}, + status=404 + ) + + return web.json_response(job) + @routes.get("/history") async def get_history(request): max_items = request.rel_url.query.get("max_items", None) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index ace0d2279093..5316848dfe64 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -99,6 +99,37 @@ def get_all_history(self, max_items=None, offset=None): with urllib.request.urlopen(url) as response: return json.loads(response.read()) + def get_jobs(self, status=None, limit=None, offset=None, sort=None, order=None): + url = "http://{}/api/jobs".format(self.server_address) + params = {} + if status is not None: + params["status"] = status + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + if sort is not None: + params["sort"] = sort + if order is not None: + params["order"] = order + + if params: + url_values = urllib.parse.urlencode(params) + url = "{}?{}".format(url, url_values) + + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + + def get_job(self, job_id): + url = "http://{}/api/jobs/{}".format(self.server_address, job_id) + try: + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as e: + if e.code == 404: + return None + raise + def set_test_name(self, name): self.test_name = name @@ -877,3 +908,135 @@ def test_offset_near_end_returns_remaining_items_only( result = client.get_all_history(max_items=5, offset=len(all_history) - 1) assert len(result) <= 1, "Should return at most 1 item when offset is near end" + + # Jobs API tests + def test_jobs_api_returns_completed_jobs( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that /api/jobs returns completed jobs""" + result = self._create_history_item(client, builder) + prompt_id = result.get_prompt_id() + + jobs_response = client.get_jobs(status="completed") + assert "jobs" in jobs_response, "Response should have jobs array" + assert "total" in jobs_response, "Response should have total count" + assert "has_more" in jobs_response, "Response should have has_more flag" + + job_ids = [j["id"] for j in jobs_response["jobs"]] + assert prompt_id in job_ids, "Completed job should appear in jobs list" + + def test_jobs_api_job_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that job objects have required fields""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + assert len(jobs_response["jobs"]) > 0, "Should have at least one job" + + job = jobs_response["jobs"][0] + assert "id" in job, "Job should have id" + assert "status" in job, "Job should have status" + assert "create_time" in job, "Job should have create_time" + assert "outputs_count" in job, "Job should have outputs_count" + assert "preview_output" in job, "Job should have preview_output" + assert "workflow_id" in job, "Job should have workflow_id" + assert "error_message" in job, "Job should have error_message" + + def test_jobs_api_preview_output_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that preview_output has correct structure""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + job = jobs_response["jobs"][0] + + if job["preview_output"] is not None: + preview = job["preview_output"] + assert "filename" in preview, "Preview should have filename" + assert "nodeId" in preview, "Preview should have nodeId" + assert "mediaType" in preview, "Preview should have mediaType" + + def test_jobs_api_pagination( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API pagination""" + for _ in range(5): + self._create_history_item(client, builder) + + first_page = client.get_jobs(limit=2, offset=0) + second_page = client.get_jobs(limit=2, offset=2) + + assert len(first_page["jobs"]) <= 2, "First page should have at most 2 jobs" + assert len(second_page["jobs"]) <= 2, "Second page should have at most 2 jobs" + + first_ids = {j["id"] for j in first_page["jobs"]} + second_ids = {j["id"] for j in second_page["jobs"]} + assert first_ids.isdisjoint(second_ids), "Pages should have different jobs" + + def test_jobs_api_sorting( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API sorting""" + for _ in range(3): + self._create_history_item(client, builder) + + desc_jobs = client.get_jobs(order="desc") + asc_jobs = client.get_jobs(order="asc") + + if len(desc_jobs["jobs"]) >= 2: + desc_times = [j["create_time"] for j in desc_jobs["jobs"] if j["create_time"]] + asc_times = [j["create_time"] for j in asc_jobs["jobs"] if j["create_time"]] + if len(desc_times) >= 2: + assert desc_times == sorted(desc_times, reverse=True), "Desc should be newest first" + if len(asc_times) >= 2: + assert asc_times == sorted(asc_times), "Asc should be oldest first" + + def test_jobs_api_status_filter( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API status filtering""" + self._create_history_item(client, builder) + + completed_jobs = client.get_jobs(status="completed") + assert len(completed_jobs["jobs"]) > 0, "Should have completed jobs from history" + + for job in completed_jobs["jobs"]: + assert job["status"] == "completed", "Should only return completed jobs" + + # Pending jobs are transient - just verify filter doesn't error + pending_jobs = client.get_jobs(status="pending") + for job in pending_jobs["jobs"]: + assert job["status"] == "pending", "Should only return pending jobs" + + def test_get_job_by_id( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a single job by ID""" + result = self._create_history_item(client, builder) + prompt_id = result.get_prompt_id() + + job = client.get_job(prompt_id) + assert job is not None, "Should find the job" + assert job["id"] == prompt_id, "Job ID should match" + assert "outputs" in job, "Single job should include outputs" + + def test_get_job_not_found( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a non-existent job returns 404""" + job = client.get_job("nonexistent-job-id") + assert job is None, "Non-existent job should return None" + + def test_jobs_list_excludes_outputs( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that job list doesn't include full outputs""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + job = jobs_response["jobs"][0] + + assert "outputs" not in job, "List should not include outputs" + assert "prompt" not in job, "List should not include prompt" From aa0064160636088fbdef2ea58d67c12c1f90f538 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 12:03:03 -0800 Subject: [PATCH 02/18] update unused vars --- execution.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/execution.py b/execution.py index 815788ab275c..299072ce0985 100644 --- a/execution.py +++ b/execution.py @@ -1290,7 +1290,7 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des def _normalize_queue_item(self, item, status): """Convert queue item tuple to unified job dict.""" - number, prompt_id, prompt, extra_data, outputs_to_execute = item[:5] + _, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') return { @@ -1307,7 +1307,7 @@ def _normalize_queue_item(self, item, status): def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): """Convert history item dict to unified job dict.""" prompt_tuple = history_item['prompt'] - number, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + _, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] create_time = extra_data.get('create_time') # Determine status from history status From b2bd48e71158f3bd8ec9fb4a817c1474f10e6e03 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 14:13:25 -0800 Subject: [PATCH 03/18] include priority --- execution.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/execution.py b/execution.py index 299072ce0985..42076e2c1848 100644 --- a/execution.py +++ b/execution.py @@ -1290,13 +1290,14 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des def _normalize_queue_item(self, item, status): """Convert queue item tuple to unified job dict.""" - _, prompt_id, _, extra_data, _ = item[:5] + priority, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') return { 'id': prompt_id, 'status': status, 'create_time': create_time, + 'priority': priority, 'execution_time': None, 'error_message': None, 'outputs_count': 0, From b874f469cea373dfe73b118cc8c3e294d287c9cd Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 18:45:25 -0800 Subject: [PATCH 04/18] create jobs helper file --- comfy_execution/jobs.py | 167 ++++++++++++++++++ execution.py | 162 ++--------------- server.py | 3 +- tests/execution/test_jobs.py | 331 +++++++++++++++++++++++++++++++++++ 4 files changed, 519 insertions(+), 144 deletions(-) create mode 100644 comfy_execution/jobs.py create mode 100644 tests/execution/test_jobs.py diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py new file mode 100644 index 000000000000..4f5758e07607 --- /dev/null +++ b/comfy_execution/jobs.py @@ -0,0 +1,167 @@ +""" +Job utilities for the /api/jobs endpoint. +Provides normalization and helper functions for job status tracking. +""" + + +class JobStatus: + """Job status constants.""" + PENDING = 'pending' + IN_PROGRESS = 'in_progress' + COMPLETED = 'completed' + ERROR = 'error' + + ALL = [PENDING, IN_PROGRESS, COMPLETED, ERROR] + + +# Media types that can be previewed in the frontend +PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'}) + +# 3D file extensions for preview fallback (no dedicated media_type exists) +THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'}) + + +def is_previewable(media_type, item): + """ + Check if an output item is previewable. + Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts + + Priority: + 1. media_type is 'images', 'video', or 'audio' + 2. format field starts with 'video/' or 'audio/' + 3. filename has a 3D extension (.obj, .fbx, .gltf, .glb) + """ + if media_type in PREVIEWABLE_MEDIA_TYPES: + return True + + # Check format field (MIME type) + fmt = item.get('format', '') + if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')): + return True + + # Check for 3D files by extension + filename = item.get('filename', '').lower() + if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS): + return True + + return False + + +def normalize_queue_item(item, status): + """Convert queue item tuple to unified job dict.""" + priority, prompt_id, _, extra_data, _ = item[:5] + create_time = extra_data.get('create_time') + + return { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_time': None, + 'error_message': None, + 'outputs_count': 0, + 'preview_output': None, + 'workflow_id': None, + } + + +def normalize_history_item(prompt_id, history_item, include_outputs=False): + """Convert history item dict to unified job dict.""" + prompt_tuple = history_item['prompt'] + priority, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + create_time = extra_data.get('create_time') + + status_info = history_item.get('status', {}) + status_str = status_info.get('status_str') if status_info else None + if status_str == 'success': + status = JobStatus.COMPLETED + elif status_str == 'error': + status = JobStatus.ERROR + else: + status = JobStatus.COMPLETED + + outputs = history_item.get('outputs', {}) + outputs_count, preview_output = get_outputs_summary(outputs) + + error_message = None + if status == JobStatus.ERROR and status_info: + messages = status_info.get('messages', []) + if messages: + error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) + + execution_time = history_item.get('execution_time') + + job = { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_time': execution_time, + 'error_message': error_message, + 'outputs_count': outputs_count, + 'preview_output': preview_output, + 'workflow_id': None, + } + + if include_outputs: + job['outputs'] = outputs + job['prompt'] = prompt + job['extra_data'] = extra_data + job['outputs_to_execute'] = outputs_to_execute + + return job + + +def get_outputs_summary(outputs): + """ + Count outputs and find preview in a single pass. + Returns (outputs_count, preview_output). + + Preview priority (matching frontend): + 1. type="output" with previewable media + 2. Any previewable media + """ + count = 0 + preview_output = None + fallback_preview = None + + for node_id, node_outputs in outputs.items(): + for media_type, items in node_outputs.items(): + if media_type == 'animated' or not isinstance(items, list): + continue + + for item in items: + count += 1 + + if preview_output is not None: + continue + + if not is_previewable(media_type, item): + continue + + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched + + return count, preview_output or fallback_preview + + +def apply_sorting(jobs, sort_by, sort_order): + """Sort jobs list by specified field and order.""" + reverse = (sort_order == 'desc') + + if sort_by == 'execution_time': + def get_sort_key(job): + return job.get('execution_time') or 0 + else: + def get_sort_key(job): + return job.get('create_time') or 0 + + return sorted(jobs, key=get_sort_key, reverse=reverse) diff --git a/execution.py b/execution.py index 42076e2c1848..5fc616dfd6de 100644 --- a/execution.py +++ b/execution.py @@ -33,6 +33,12 @@ from comfy_execution.validation import validate_node_input from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler from comfy_execution.utils import CurrentNodeContext +from comfy_execution.jobs import ( + JobStatus, + normalize_queue_item, + normalize_history_item, + apply_sorting, +) from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.latest import io, _io @@ -1229,15 +1235,15 @@ def get_job(self, prompt_id): """Get a single job by prompt_id from history or queue.""" with self.mutex: if prompt_id in self.history: - return self._normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + return normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) for item in self.currently_running.values(): if item[1] == prompt_id: - return self._normalize_queue_item(item, 'in_progress') + return normalize_queue_item(item, JobStatus.IN_PROGRESS) for item in self.queue: if item[1] == prompt_id: - return self._normalize_queue_item(item, 'pending') + return normalize_queue_item(item, JobStatus.PENDING) return None @@ -1246,7 +1252,7 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des Get all jobs (running, pending, completed) with filtering and sorting. Args: - status_filter: list of statuses to include ['pending', 'in_progress', 'completed', 'error'] + status_filter: list of statuses to include (from JobStatus.ALL) sort_by: field to sort by ('created_at', 'execution_time') sort_order: 'asc' or 'desc' limit: maximum number of items to return @@ -1259,25 +1265,25 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des jobs = [] if status_filter is None: - status_filter = ['pending', 'in_progress', 'completed', 'error'] + status_filter = JobStatus.ALL - if 'in_progress' in status_filter: + if JobStatus.IN_PROGRESS in status_filter: for item in self.currently_running.values(): - jobs.append(self._normalize_queue_item(item, 'in_progress')) + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - if 'pending' in status_filter: + if JobStatus.PENDING in status_filter: for item in self.queue: - jobs.append(self._normalize_queue_item(item, 'pending')) + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - include_completed = 'completed' in status_filter - include_error = 'error' in status_filter + include_completed = JobStatus.COMPLETED in status_filter + include_error = JobStatus.ERROR in status_filter if include_completed or include_error: for prompt_id, history_item in self.history.items(): is_error = history_item.get('status', {}).get('status_str') == 'error' if (is_error and include_error) or (not is_error and include_completed): - jobs.append(self._normalize_history_item(prompt_id, history_item)) + jobs.append(normalize_history_item(prompt_id, history_item)) - jobs = self._apply_sorting(jobs, sort_by, sort_order) + jobs = apply_sorting(jobs, sort_by, sort_order) total_count = len(jobs) @@ -1288,136 +1294,6 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des return (jobs, total_count) - def _normalize_queue_item(self, item, status): - """Convert queue item tuple to unified job dict.""" - priority, prompt_id, _, extra_data, _ = item[:5] - create_time = extra_data.get('create_time') - - return { - 'id': prompt_id, - 'status': status, - 'create_time': create_time, - 'priority': priority, - 'execution_time': None, - 'error_message': None, - 'outputs_count': 0, - 'preview_output': None, - 'workflow_id': None, - } - - def _normalize_history_item(self, prompt_id, history_item, include_outputs=False): - """Convert history item dict to unified job dict.""" - prompt_tuple = history_item['prompt'] - _, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] - create_time = extra_data.get('create_time') - - # Determine status from history status - status_info = history_item.get('status', {}) - if status_info: - status = 'completed' if status_info.get('status_str') == 'success' else 'error' - else: - status = 'completed' - - outputs = history_item.get('outputs', {}) - - outputs_count, preview_output = self._get_outputs_summary(outputs) - - error_message = None - if status == 'error' and status_info: - messages = status_info.get('messages', []) - if messages: - error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) - - execution_time = history_item.get('execution_time') - - job = { - 'id': prompt_id, - 'status': status, - 'create_time': create_time, - 'execution_time': execution_time, - 'error_message': error_message, - 'outputs_count': outputs_count, - 'preview_output': preview_output, - 'workflow_id': None, - } - - if include_outputs: - job['outputs'] = outputs - job['prompt'] = prompt - job['extra_data'] = extra_data - job['outputs_to_execute'] = outputs_to_execute - - return job - - def _get_outputs_summary(self, outputs): - """ - Count outputs and find preview in a single pass. - Returns (outputs_count, preview_output). - - Preview priority (matching frontend): - 1. type="output" with previewable media - 2. Any previewable media - """ - count = 0 - preview_output = None - fallback_preview = None - - for node_id, node_outputs in outputs.items(): - for media_type, items in node_outputs.items(): - if media_type == 'animated' or not isinstance(items, list): - continue - for item in items: - count += 1 - - # Skip if we already have the best preview (type=output) - if preview_output is not None: - continue - - filename = item.get('filename', '').lower() - fmt = item.get('format', '') - - # Check if previewable (image/video/audio/3D) - matching frontend logic - is_previewable = ( - media_type == 'images' or - media_type == 'video' or - media_type == 'audio' or - filename.endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp')) or # images - filename.endswith(('.mp4', '.webm', '.mov', '.avi')) or # video - filename.endswith(('.mp3', '.wav', '.ogg', '.flac')) or # audio - filename.endswith(('.obj', '.fbx', '.gltf', '.glb')) or # 3D - (fmt and (fmt.startswith('video/') or fmt.startswith('audio/'))) - ) - - if not is_previewable: - continue - - enriched = { - **item, - 'nodeId': node_id, - 'mediaType': media_type - } - - if item.get('type') == 'output': - preview_output = enriched - elif fallback_preview is None: - fallback_preview = enriched - - return count, preview_output or fallback_preview - - def _apply_sorting(self, jobs, sort_by, sort_order): - """Sort jobs list by specified field and order.""" - reverse = (sort_order == 'desc') - - if sort_by == 'execution_time': - def get_sort_key(job): - return job.get('execution_time') or 0 - else: - # Default to create_time - def get_sort_key(job): - return job.get('create_time') or 0 - - return sorted(jobs, key=get_sort_key, reverse=reverse) - def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/server.py b/server.py index a9466c12d2e7..0e0cc0daa814 100644 --- a/server.py +++ b/server.py @@ -7,6 +7,7 @@ import nodes import folder_paths import execution +from comfy_execution.jobs import JobStatus import uuid import urllib import json @@ -703,7 +704,7 @@ async def get_jobs(request): status_filter = None if status_param: status_filter = [s.strip() for s in status_param.split(',') if s.strip()] - valid_statuses = {'pending', 'in_progress', 'completed', 'error'} + valid_statuses = set(JobStatus.ALL) status_filter = [s for s in status_filter if s in valid_statuses] if not status_filter: status_filter = None diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py new file mode 100644 index 000000000000..07f882cac0ea --- /dev/null +++ b/tests/execution/test_jobs.py @@ -0,0 +1,331 @@ +"""Unit tests for comfy_execution/jobs.py""" +import pytest +from pytest import fixture +from unittest.mock import MagicMock + +from comfy_execution.jobs import ( + JobStatus, + PREVIEWABLE_MEDIA_TYPES, + THREE_D_EXTENSIONS, + is_previewable, + normalize_queue_item, + normalize_history_item, + get_outputs_summary, + apply_sorting, +) + + +class TestJobStatus: + """Test JobStatus constants.""" + + def test_status_values(self): + """Status constants should have expected string values.""" + assert JobStatus.PENDING == 'pending' + assert JobStatus.IN_PROGRESS == 'in_progress' + assert JobStatus.COMPLETED == 'completed' + assert JobStatus.ERROR == 'error' + + def test_all_contains_all_statuses(self): + """ALL should contain all status values.""" + assert JobStatus.PENDING in JobStatus.ALL + assert JobStatus.IN_PROGRESS in JobStatus.ALL + assert JobStatus.COMPLETED in JobStatus.ALL + assert JobStatus.ERROR in JobStatus.ALL + assert len(JobStatus.ALL) == 4 + + +class TestIsPreviewable: + """Unit tests for is_previewable()""" + + def test_previewable_media_types(self): + """Images, video, audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + assert is_previewable(media_type, {}) is True + + def test_non_previewable_media_types(self): + """Other media types should not be previewable.""" + for media_type in ['latents', 'text', 'metadata', 'files']: + assert is_previewable(media_type, {}) is False + + def test_3d_extensions_previewable(self): + """3D file extensions should be previewable regardless of media_type.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + item = {'filename': f'model{ext}'} + assert is_previewable('files', item) is True + + def test_3d_extensions_case_insensitive(self): + """3D extension check should be case insensitive.""" + item = {'filename': 'MODEL.GLB'} + assert is_previewable('files', item) is True + + def test_video_format_previewable(self): + """Items with video/ format should be previewable.""" + item = {'format': 'video/mp4'} + assert is_previewable('files', item) is True + + def test_audio_format_previewable(self): + """Items with audio/ format should be previewable.""" + item = {'format': 'audio/wav'} + assert is_previewable('files', item) is True + + def test_other_format_not_previewable(self): + """Items with other format should not be previewable.""" + item = {'format': 'application/json'} + assert is_previewable('files', item) is False + + +class TestGetOutputsSummary: + """Unit tests for get_outputs_summary()""" + + def test_empty_outputs(self): + """Empty outputs should return 0 count and None preview.""" + count, preview = get_outputs_summary({}) + assert count == 0 + assert preview is None + + def test_counts_across_multiple_nodes(self): + """Outputs from multiple nodes should all be counted.""" + outputs = { + 'node1': {'images': [{'filename': 'a.png', 'type': 'output'}]}, + 'node2': {'images': [{'filename': 'b.png', 'type': 'output'}]}, + 'node3': {'images': [ + {'filename': 'c.png', 'type': 'output'}, + {'filename': 'd.png', 'type': 'output'} + ]} + } + count, preview = get_outputs_summary(outputs) + assert count == 4 + + def test_skips_animated_key_and_non_list_values(self): + """The 'animated' key and non-list values should be skipped.""" + outputs = { + 'node1': { + 'images': [{'filename': 'test.png', 'type': 'output'}], + 'animated': [True], # Should skip due to key name + 'metadata': 'string', # Should skip due to non-list + 'count': 42 # Should skip due to non-list + } + } + count, preview = get_outputs_summary(outputs) + assert count == 1 + + def test_preview_prefers_type_output(self): + """Items with type='output' should be preferred for preview.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp.png', 'type': 'temp'}, + {'filename': 'output.png', 'type': 'output'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview['filename'] == 'output.png' + + def test_preview_fallback_when_no_output_type(self): + """If no type='output', should use first previewable.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp1.png', 'type': 'temp'}, + {'filename': 'temp2.png', 'type': 'temp'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['filename'] == 'temp1.png' + + def test_non_previewable_media_types_counted_but_no_preview(self): + """Non-previewable media types should be counted but not used as preview.""" + outputs = { + 'node1': { + 'latents': [ + {'filename': 'latent1.safetensors'}, + {'filename': 'latent2.safetensors'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview is None + + def test_previewable_media_types(self): + """Images, video, and audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + outputs = { + 'node1': { + media_type: [{'filename': 'test.file', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"{media_type} should be previewable" + + def test_3d_files_previewable(self): + """3D file extensions should be previewable.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + outputs = { + 'node1': { + 'files': [{'filename': f'model{ext}', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"3D file {ext} should be previewable" + + def test_format_mime_type_previewable(self): + """Files with video/ or audio/ format should be previewable.""" + for fmt in ['video/x-custom', 'audio/x-custom']: + outputs = { + 'node1': { + 'files': [{'filename': 'file.custom', 'format': fmt, 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"Format {fmt} should be previewable" + + def test_preview_enriched_with_node_metadata(self): + """Preview should include nodeId, mediaType, and original fields.""" + outputs = { + 'node123': { + 'images': [{'filename': 'test.png', 'type': 'output', 'subfolder': 'outputs'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['nodeId'] == 'node123' + assert preview['mediaType'] == 'images' + assert preview['subfolder'] == 'outputs' + + +class TestApplySorting: + """Unit tests for apply_sorting()""" + + def test_sort_by_create_time_desc(self): + """Default sort by create_time descending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'desc') + assert [j['id'] for j in result] == ['b', 'c', 'a'] + + def test_sort_by_create_time_asc(self): + """Sort by create_time ascending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'asc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_by_execution_time(self): + """Sort by execution_time should order by duration.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, + {'id': 'b', 'create_time': 300, 'execution_time': 1.0}, + {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + ] + result = apply_sorting(jobs, 'execution_time', 'desc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_with_none_values(self): + """Jobs with None values should sort as 0.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, + {'id': 'b', 'create_time': 300, 'execution_time': None}, + {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + ] + result = apply_sorting(jobs, 'execution_time', 'asc') + assert result[0]['id'] == 'b' # None treated as 0, comes first + + +class TestNormalizeQueueItem: + """Unit tests for normalize_queue_item()""" + + def test_basic_normalization(self): + """Queue item should be normalized to job dict.""" + item = ( + 10, # priority/number + 'prompt-123', # prompt_id + {'nodes': {}}, # prompt + {'create_time': 1234567890}, # extra_data + ['node1'], # outputs_to_execute + ) + job = normalize_queue_item(item, JobStatus.PENDING) + + assert job['id'] == 'prompt-123' + assert job['status'] == 'pending' + assert job['priority'] == 10 + assert job['create_time'] == 1234567890 + assert job['execution_time'] is None + assert job['error_message'] is None + assert job['outputs_count'] == 0 + + +class TestNormalizeHistoryItem: + """Unit tests for normalize_history_item()""" + + def test_completed_job(self): + """Completed history item should have correct status.""" + history_item = { + 'prompt': ( + 5, # priority + 'prompt-456', + {'nodes': {}}, + {'create_time': 1234567890}, + ['node1'], + ), + 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'outputs': {}, + 'execution_time': 2.5, + } + job = normalize_history_item('prompt-456', history_item) + + assert job['id'] == 'prompt-456' + assert job['status'] == 'completed' + assert job['execution_time'] == 2.5 + + def test_error_job(self): + """Error history item should have error status and message.""" + history_item = { + 'prompt': ( + 5, + 'prompt-789', + {'nodes': {}}, + {'create_time': 1234567890}, + ['node1'], + ), + 'status': { + 'status_str': 'error', + 'completed': False, + 'messages': ['Node failed: OutOfMemory'] + }, + 'outputs': {}, + 'execution_time': 1.0, + } + job = normalize_history_item('prompt-789', history_item) + + assert job['status'] == 'error' + assert job['error_message'] == 'Node failed: OutOfMemory' + + def test_include_outputs(self): + """When include_outputs=True, should include full output data.""" + history_item = { + 'prompt': ( + 5, + 'prompt-123', + {'nodes': {'1': {}}}, + {'create_time': 1234567890, 'client_id': 'abc'}, + ['node1'], + ), + 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, + 'execution_time': 2.5, + } + job = normalize_history_item('prompt-123', history_item, include_outputs=True) + + assert 'outputs' in job + assert 'prompt' in job + assert 'extra_data' in job + assert job['outputs'] == {'node1': {'images': [{'filename': 'test.png'}]}} From 380b6aec8e05feaf75994345fdf1ba55dbe4dbed Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Tue, 2 Dec 2025 19:33:22 -0800 Subject: [PATCH 05/18] fix ruff --- comfy_execution/jobs.py | 26 ++++++++++---------------- tests/execution/test_jobs.py | 5 ----- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 4f5758e07607..a5a6b53d437a 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -133,22 +133,16 @@ def get_outputs_summary(outputs): for item in items: count += 1 - if preview_output is not None: - continue - - if not is_previewable(media_type, item): - continue - - enriched = { - **item, - 'nodeId': node_id, - 'mediaType': media_type - } - - if item.get('type') == 'output': - preview_output = enriched - elif fallback_preview is None: - fallback_preview = enriched + if preview_output is None and is_previewable(media_type, item): + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched return count, preview_output or fallback_preview diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 07f882cac0ea..8c0a6ce067a0 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -1,12 +1,7 @@ """Unit tests for comfy_execution/jobs.py""" -import pytest -from pytest import fixture -from unittest.mock import MagicMock from comfy_execution.jobs import ( JobStatus, - PREVIEWABLE_MEDIA_TYPES, - THREE_D_EXTENSIONS, is_previewable, normalize_queue_item, normalize_history_item, From 048c413adc0a4f07b070288cc8d4f3dba50b43c9 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 15:02:11 -0800 Subject: [PATCH 06/18] update how we set error message --- comfy_execution/jobs.py | 8 ++++++-- tests/execution/test_jobs.py | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index a5a6b53d437a..d65936dbd755 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -86,8 +86,12 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): error_message = None if status == JobStatus.ERROR and status_info: messages = status_info.get('messages', []) - if messages: - error_message = messages[0] if isinstance(messages[0], str) else str(messages[0]) + for entry in messages: + if isinstance(entry, (list, tuple)) and len(entry) >= 2 and entry[0] == 'execution_error': + detail = entry[1] + if isinstance(detail, dict): + error_message = str(detail.get('exception_message', '')) + break execution_time = history_item.get('execution_time') diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 8c0a6ce067a0..9ffdbb3fbb95 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -294,7 +294,9 @@ def test_error_job(self): 'status': { 'status_str': 'error', 'completed': False, - 'messages': ['Node failed: OutOfMemory'] + 'messages': [ + ('execution_error', {'exception_message': 'Node failed: OutOfMemory', 'node_id': '5'}) + ] }, 'outputs': {}, 'execution_time': 1.0, From 2e0b26bdf3f3dd4e89445d9b1b699e40f771666f Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 16:16:48 -0800 Subject: [PATCH 07/18] include execution error in both responses --- comfy_execution/jobs.py | 4 ++++ tests/execution/test_jobs.py | 21 ++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index d65936dbd755..a8c0eec79d68 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -59,6 +59,7 @@ def normalize_queue_item(item, status): 'create_time': create_time, 'execution_time': None, 'error_message': None, + 'execution_error': None, 'outputs_count': 0, 'preview_output': None, 'workflow_id': None, @@ -84,6 +85,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): outputs_count, preview_output = get_outputs_summary(outputs) error_message = None + execution_error = None if status == JobStatus.ERROR and status_info: messages = status_info.get('messages', []) for entry in messages: @@ -91,6 +93,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): detail = entry[1] if isinstance(detail, dict): error_message = str(detail.get('exception_message', '')) + execution_error = detail break execution_time = history_item.get('execution_time') @@ -112,6 +115,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): job['prompt'] = prompt job['extra_data'] = extra_data job['outputs_to_execute'] = outputs_to_execute + job['execution_error'] = execution_error return job diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 9ffdbb3fbb95..a4a0dc51087a 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -283,6 +283,13 @@ def test_completed_job(self): def test_error_job(self): """Error history item should have error status and message.""" + error_detail = { + 'node_id': '5', + 'node_type': 'KSampler', + 'exception_message': 'CUDA out of memory', + 'exception_type': 'RuntimeError', + 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], + } history_item = { 'prompt': ( 5, @@ -295,16 +302,24 @@ def test_error_job(self): 'status_str': 'error', 'completed': False, 'messages': [ - ('execution_error', {'exception_message': 'Node failed: OutOfMemory', 'node_id': '5'}) + ('execution_error', error_detail) ] }, 'outputs': {}, 'execution_time': 1.0, } - job = normalize_history_item('prompt-789', history_item) + # List view - no execution_error + job = normalize_history_item('prompt-789', history_item) assert job['status'] == 'error' - assert job['error_message'] == 'Node failed: OutOfMemory' + assert job['error_message'] == 'CUDA out of memory' + assert 'execution_error' not in job + + # Detail view - includes execution_error + job_detail = normalize_history_item('prompt-789', history_item, include_outputs=True) + assert job_detail['execution_error'] == error_detail + assert job_detail['execution_error']['node_id'] == '5' + assert job_detail['execution_error']['node_type'] == 'KSampler' def test_include_outputs(self): """When include_outputs=True, should include full output data.""" From e4c713633add46d679c109c89d587f2171e73d4a Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 17:27:44 -0800 Subject: [PATCH 08/18] rename error -> failed, fix output shape --- comfy_execution/jobs.py | 42 ++++++++++++++++----------- execution.py | 8 +++--- server.py | 8 ++++-- tests/execution/test_jobs.py | 55 +++++++++++++++++++----------------- 4 files changed, 65 insertions(+), 48 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index a8c0eec79d68..95bfb1469a36 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -9,9 +9,10 @@ class JobStatus: PENDING = 'pending' IN_PROGRESS = 'in_progress' COMPLETED = 'completed' - ERROR = 'error' + FAILED = 'failed' + CANCELLED = 'cancelled' - ALL = [PENDING, IN_PROGRESS, COMPLETED, ERROR] + ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED] # Media types that can be previewed in the frontend @@ -49,17 +50,17 @@ def is_previewable(media_type, item): def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" - priority, prompt_id, _, extra_data, _ = item[:5] + _, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') return { 'id': prompt_id, 'status': status, - 'priority': priority, 'create_time': create_time, - 'execution_time': None, 'error_message': None, 'execution_error': None, + 'execution_start_time': None, + 'execution_end_time': None, 'outputs_count': 0, 'preview_output': None, 'workflow_id': None, @@ -69,7 +70,7 @@ def normalize_queue_item(item, status): def normalize_history_item(prompt_id, history_item, include_outputs=False): """Convert history item dict to unified job dict.""" prompt_tuple = history_item['prompt'] - priority, _, prompt, extra_data, outputs_to_execute = prompt_tuple[:5] + _, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') status_info = history_item.get('status', {}) @@ -77,7 +78,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): if status_str == 'success': status = JobStatus.COMPLETED elif status_str == 'error': - status = JobStatus.ERROR + status = JobStatus.FAILED else: status = JobStatus.COMPLETED @@ -86,7 +87,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): error_message = None execution_error = None - if status == JobStatus.ERROR and status_info: + if status == JobStatus.FAILED and status_info: messages = status_info.get('messages', []) for entry in messages: if isinstance(entry, (list, tuple)) and len(entry) >= 2 and entry[0] == 'execution_error': @@ -96,15 +97,21 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): execution_error = detail break - execution_time = history_item.get('execution_time') + execution_time_duration = history_item.get('execution_time') + execution_start_time = None + execution_end_time = None + if execution_time_duration is not None and create_time is not None: + execution_end_time = create_time + int(execution_time_duration * 1000) + execution_start_time = create_time job = { 'id': prompt_id, 'status': status, - 'priority': priority, 'create_time': create_time, - 'execution_time': execution_time, 'error_message': error_message, + 'execution_error': execution_error, + 'execution_start_time': execution_start_time, + 'execution_end_time': execution_end_time, 'outputs_count': outputs_count, 'preview_output': preview_output, 'workflow_id': None, @@ -112,10 +119,11 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): if include_outputs: job['outputs'] = outputs - job['prompt'] = prompt - job['extra_data'] = extra_data - job['outputs_to_execute'] = outputs_to_execute - job['execution_error'] = execution_error + job['execution_status'] = status_info + job['workflow'] = { + 'prompt': prompt, + 'extra_data': extra_data, + } return job @@ -161,7 +169,9 @@ def apply_sorting(jobs, sort_by, sort_order): if sort_by == 'execution_time': def get_sort_key(job): - return job.get('execution_time') or 0 + start = job.get('execution_start_time') or 0 + end = job.get('execution_end_time') or 0 + return end - start if end and start else 0 else: def get_sort_key(job): return job.get('create_time') or 0 diff --git a/execution.py b/execution.py index 5fc616dfd6de..558fb8b5eadf 100644 --- a/execution.py +++ b/execution.py @@ -1276,11 +1276,11 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des jobs.append(normalize_queue_item(item, JobStatus.PENDING)) include_completed = JobStatus.COMPLETED in status_filter - include_error = JobStatus.ERROR in status_filter - if include_completed or include_error: + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: for prompt_id, history_item in self.history.items(): - is_error = history_item.get('status', {}).get('status_str') == 'error' - if (is_error and include_error) or (not is_error and include_completed): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): jobs.append(normalize_history_item(prompt_id, history_item)) jobs = apply_sorting(jobs, sort_by, sort_order) diff --git a/server.py b/server.py index 0e0cc0daa814..6e0b7f31a6b4 100644 --- a/server.py +++ b/server.py @@ -762,8 +762,12 @@ async def get_jobs(request): return web.json_response({ 'jobs': jobs, - 'total': total, - 'has_more': has_more + 'pagination': { + 'offset': offset, + 'limit': limit, + 'total': total, + 'has_more': has_more + } }) @routes.get("/api/jobs/{job_id}") diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index a4a0dc51087a..4791696435f9 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -18,15 +18,17 @@ def test_status_values(self): assert JobStatus.PENDING == 'pending' assert JobStatus.IN_PROGRESS == 'in_progress' assert JobStatus.COMPLETED == 'completed' - assert JobStatus.ERROR == 'error' + assert JobStatus.FAILED == 'failed' + assert JobStatus.CANCELLED == 'cancelled' def test_all_contains_all_statuses(self): """ALL should contain all status values.""" assert JobStatus.PENDING in JobStatus.ALL assert JobStatus.IN_PROGRESS in JobStatus.ALL assert JobStatus.COMPLETED in JobStatus.ALL - assert JobStatus.ERROR in JobStatus.ALL - assert len(JobStatus.ALL) == 4 + assert JobStatus.FAILED in JobStatus.ALL + assert JobStatus.CANCELLED in JobStatus.ALL + assert len(JobStatus.ALL) == 5 class TestIsPreviewable: @@ -217,9 +219,9 @@ def test_sort_by_create_time_asc(self): def test_sort_by_execution_time(self): """Sort by execution_time should order by duration.""" jobs = [ - {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, - {'id': 'b', 'create_time': 300, 'execution_time': 1.0}, - {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s + {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s + {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s ] result = apply_sorting(jobs, 'execution_time', 'desc') assert [j['id'] for j in result] == ['a', 'c', 'b'] @@ -227,9 +229,9 @@ def test_sort_by_execution_time(self): def test_sort_with_none_values(self): """Jobs with None values should sort as 0.""" jobs = [ - {'id': 'a', 'create_time': 100, 'execution_time': 5.0}, - {'id': 'b', 'create_time': 300, 'execution_time': None}, - {'id': 'c', 'create_time': 200, 'execution_time': 3.0}, + {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, + {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, + {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, ] result = apply_sorting(jobs, 'execution_time', 'asc') assert result[0]['id'] == 'b' # None treated as 0, comes first @@ -251,9 +253,9 @@ def test_basic_normalization(self): assert job['id'] == 'prompt-123' assert job['status'] == 'pending' - assert job['priority'] == 10 assert job['create_time'] == 1234567890 - assert job['execution_time'] is None + assert job['execution_start_time'] is None + assert job['execution_end_time'] is None assert job['error_message'] is None assert job['outputs_count'] == 0 @@ -268,7 +270,7 @@ def test_completed_job(self): 5, # priority 'prompt-456', {'nodes': {}}, - {'create_time': 1234567890}, + {'create_time': 1234567890000}, # milliseconds ['node1'], ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, @@ -279,10 +281,11 @@ def test_completed_job(self): assert job['id'] == 'prompt-456' assert job['status'] == 'completed' - assert job['execution_time'] == 2.5 + assert job['execution_start_time'] == 1234567890000 + assert job['execution_end_time'] == 1234567890000 + 2500 # +2.5 seconds in ms - def test_error_job(self): - """Error history item should have error status and message.""" + def test_failed_job(self): + """Failed history item should have failed status and message.""" error_detail = { 'node_id': '5', 'node_type': 'KSampler', @@ -309,17 +312,13 @@ def test_error_job(self): 'execution_time': 1.0, } - # List view - no execution_error + # List view - includes execution_error job = normalize_history_item('prompt-789', history_item) - assert job['status'] == 'error' + assert job['status'] == 'failed' assert job['error_message'] == 'CUDA out of memory' - assert 'execution_error' not in job - - # Detail view - includes execution_error - job_detail = normalize_history_item('prompt-789', history_item, include_outputs=True) - assert job_detail['execution_error'] == error_detail - assert job_detail['execution_error']['node_id'] == '5' - assert job_detail['execution_error']['node_type'] == 'KSampler' + assert job['execution_error'] == error_detail + assert job['execution_error']['node_id'] == '5' + assert job['execution_error']['node_type'] == 'KSampler' def test_include_outputs(self): """When include_outputs=True, should include full output data.""" @@ -338,6 +337,10 @@ def test_include_outputs(self): job = normalize_history_item('prompt-123', history_item, include_outputs=True) assert 'outputs' in job - assert 'prompt' in job - assert 'extra_data' in job + assert 'workflow' in job + assert 'execution_status' in job assert job['outputs'] == {'node1': {'images': [{'filename': 'test.png'}]}} + assert job['workflow'] == { + 'prompt': {'nodes': {'1': {}}}, + 'extra_data': {'create_time': 1234567890, 'client_id': 'abc'}, + } From 90fb5cc1e86eadc6cbb2f0295d351acb628902a5 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 20:03:38 -0800 Subject: [PATCH 09/18] re-use queue and history functions --- comfy_execution/jobs.py | 8 ++-- execution.py | 79 +++++++++++++++++++----------------- main.py | 12 +++--- server.py | 4 +- tests/execution/test_jobs.py | 14 +++---- 5 files changed, 61 insertions(+), 56 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 95bfb1469a36..01da70517ff8 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -97,11 +97,11 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): execution_error = detail break - execution_time_duration = history_item.get('execution_time') + execution_duration = history_item.get('execution_duration') execution_start_time = None execution_end_time = None - if execution_time_duration is not None and create_time is not None: - execution_end_time = create_time + int(execution_time_duration * 1000) + if execution_duration is not None and create_time is not None: + execution_end_time = create_time + int(execution_duration * 1000) execution_start_time = create_time job = { @@ -167,7 +167,7 @@ def apply_sorting(jobs, sort_by, sort_order): """Sort jobs list by specified field and order.""" reverse = (sort_order == 'desc') - if sort_by == 'execution_time': + if sort_by == 'execution_duration': def get_sort_key(job): start = job.get('execution_start_time') or 0 end = job.get('execution_end_time') or 0 diff --git a/execution.py b/execution.py index 558fb8b5eadf..99c07bfec467 100644 --- a/execution.py +++ b/execution.py @@ -1137,7 +1137,7 @@ class ExecutionStatus(NamedTuple): def task_done(self, item_id, history_result, status: Optional['PromptQueue.ExecutionStatus'], process_item=None, - execution_time: Optional[float] = None): + execution_duration: Optional[float] = None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1154,7 +1154,7 @@ def task_done(self, item_id, history_result, "prompt": prompt, "outputs": {}, 'status': status_dict, - 'execution_time': execution_time, + 'execution_duration': execution_duration, } self.history[prompt[1]].update(history_result) self.server.queue_updated() @@ -1233,19 +1233,22 @@ def delete_history_item(self, id_to_delete): def get_job(self, prompt_id): """Get a single job by prompt_id from history or queue.""" - with self.mutex: - if prompt_id in self.history: - return normalize_history_item(prompt_id, self.history[prompt_id], include_outputs=True) + history = self.get_history(prompt_id=prompt_id) + + if prompt_id in history: + return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) + + running, queued = self.get_current_queue_volatile() - for item in self.currently_running.values(): - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.IN_PROGRESS) + for item in running: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.IN_PROGRESS) - for item in self.queue: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.PENDING) + for item in queued: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.PENDING) - return None + return None def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): """ @@ -1253,7 +1256,7 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des Args: status_filter: list of statuses to include (from JobStatus.ALL) - sort_by: field to sort by ('created_at', 'execution_time') + sort_by: field to sort by ('created_at', 'execution_duration') sort_order: 'asc' or 'desc' limit: maximum number of items to return offset: number of items to skip @@ -1261,38 +1264,40 @@ def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="des Returns: tuple: (jobs_list, total_count) """ - with self.mutex: - jobs = [] + running, queued = self.get_current_queue_volatile() + history = self.get_history() + + jobs = [] - if status_filter is None: - status_filter = JobStatus.ALL + if status_filter is None: + status_filter = JobStatus.ALL - if JobStatus.IN_PROGRESS in status_filter: - for item in self.currently_running.values(): - jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) + if JobStatus.IN_PROGRESS in status_filter: + for item in running: + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - if JobStatus.PENDING in status_filter: - for item in self.queue: - jobs.append(normalize_queue_item(item, JobStatus.PENDING)) + if JobStatus.PENDING in status_filter: + for item in queued: + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - include_completed = JobStatus.COMPLETED in status_filter - include_failed = JobStatus.FAILED in status_filter - if include_completed or include_failed: - for prompt_id, history_item in self.history.items(): - is_failed = history_item.get('status', {}).get('status_str') == 'error' - if (is_failed and include_failed) or (not is_failed and include_completed): - jobs.append(normalize_history_item(prompt_id, history_item)) + include_completed = JobStatus.COMPLETED in status_filter + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: + for prompt_id, history_item in history.items(): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): + jobs.append(normalize_history_item(prompt_id, history_item)) - jobs = apply_sorting(jobs, sort_by, sort_order) + jobs = apply_sorting(jobs, sort_by, sort_order) - total_count = len(jobs) + total_count = len(jobs) - if offset > 0: - jobs = jobs[offset:] - if limit is not None: - jobs = jobs[:limit] + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] - return (jobs, total_count) + return (jobs, total_count) def set_flag(self, name, data): with self.mutex: diff --git a/main.py b/main.py index 7605f1f021be..d2e3682fa2e8 100644 --- a/main.py +++ b/main.py @@ -230,7 +230,7 @@ def prompt_worker(q, server_instance): need_gc = True current_time = time.perf_counter() - execution_time = current_time - execution_start_time + execution_duration = current_time - execution_start_time remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, @@ -240,16 +240,16 @@ def prompt_worker(q, server_instance): completed=e.success, messages=e.status_messages), process_item=remove_sensitive, - execution_time=execution_time) + execution_duration=execution_duration) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) # Log Time in a more readable way after 10 minutes - if execution_time > 600: - execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) - logging.info(f"Prompt executed in {execution_time}") + if execution_duration > 600: + execution_duration_str = time.strftime("%H:%M:%S", time.gmtime(execution_duration)) + logging.info(f"Prompt executed in {execution_duration_str}") else: - logging.info("Prompt executed in {:.2f} seconds".format(execution_time)) + logging.info("Prompt executed in {:.2f} seconds".format(execution_duration)) flags = q.get_flags() free_memory = flags.get("free_memory", False) diff --git a/server.py b/server.py index 6e0b7f31a6b4..f72142a806e7 100644 --- a/server.py +++ b/server.py @@ -727,9 +727,9 @@ async def get_jobs(request): if 'limit' in query: try: limit = int(query.get('limit')) - if limit <= 0 or limit > 500: + if limit <= 0: return web.json_response( - {"error": "limit must be between 1 and 500"}, + {"error": "limit must be a positive integer"}, status=400 ) except (ValueError, TypeError): diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 4791696435f9..c08e07b2b24a 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -216,14 +216,14 @@ def test_sort_by_create_time_asc(self): result = apply_sorting(jobs, 'created_at', 'asc') assert [j['id'] for j in result] == ['a', 'c', 'b'] - def test_sort_by_execution_time(self): - """Sort by execution_time should order by duration.""" + def test_sort_by_execution_duration(self): + """Sort by execution_duration should order by duration.""" jobs = [ {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s ] - result = apply_sorting(jobs, 'execution_time', 'desc') + result = apply_sorting(jobs, 'execution_duration', 'desc') assert [j['id'] for j in result] == ['a', 'c', 'b'] def test_sort_with_none_values(self): @@ -233,7 +233,7 @@ def test_sort_with_none_values(self): {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, ] - result = apply_sorting(jobs, 'execution_time', 'asc') + result = apply_sorting(jobs, 'execution_duration', 'asc') assert result[0]['id'] == 'b' # None treated as 0, comes first @@ -275,7 +275,7 @@ def test_completed_job(self): ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {}, - 'execution_time': 2.5, + 'execution_duration': 2.5, } job = normalize_history_item('prompt-456', history_item) @@ -309,7 +309,7 @@ def test_failed_job(self): ] }, 'outputs': {}, - 'execution_time': 1.0, + 'execution_duration': 1.0, } # List view - includes execution_error @@ -332,7 +332,7 @@ def test_include_outputs(self): ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, - 'execution_time': 2.5, + 'execution_duration': 2.5, } job = normalize_history_item('prompt-123', history_item, include_outputs=True) From 8a2bb7c20aecd47683e1c5ca1463a95840dc15ab Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 20:14:55 -0800 Subject: [PATCH 10/18] set workflow id --- comfy_execution/jobs.py | 8 ++++++-- tests/execution/test_jobs.py | 12 ++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index 01da70517ff8..ac70fb1e4056 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -52,6 +52,8 @@ def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" _, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') + extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + workflow_id = extra_pnginfo.get('workflow', {}).get('id') return { 'id': prompt_id, @@ -63,7 +65,7 @@ def normalize_queue_item(item, status): 'execution_end_time': None, 'outputs_count': 0, 'preview_output': None, - 'workflow_id': None, + 'workflow_id': workflow_id, } @@ -72,6 +74,8 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): prompt_tuple = history_item['prompt'] _, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') + extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + workflow_id = extra_pnginfo.get('workflow', {}).get('id') status_info = history_item.get('status', {}) status_str = status_info.get('status_str') if status_info else None @@ -114,7 +118,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): 'execution_end_time': execution_end_time, 'outputs_count': outputs_count, 'preview_output': preview_output, - 'workflow_id': None, + 'workflow_id': workflow_id, } if include_outputs: diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index c08e07b2b24a..30e64b1e71af 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -246,7 +246,10 @@ def test_basic_normalization(self): 10, # priority/number 'prompt-123', # prompt_id {'nodes': {}}, # prompt - {'create_time': 1234567890}, # extra_data + { + 'create_time': 1234567890, + 'extra_pnginfo': {'workflow': {'id': 'workflow-abc'}} + }, # extra_data ['node1'], # outputs_to_execute ) job = normalize_queue_item(item, JobStatus.PENDING) @@ -258,6 +261,7 @@ def test_basic_normalization(self): assert job['execution_end_time'] is None assert job['error_message'] is None assert job['outputs_count'] == 0 + assert job['workflow_id'] == 'workflow-abc' class TestNormalizeHistoryItem: @@ -270,7 +274,10 @@ def test_completed_job(self): 5, # priority 'prompt-456', {'nodes': {}}, - {'create_time': 1234567890000}, # milliseconds + { + 'create_time': 1234567890000, + 'extra_pnginfo': {'workflow': {'id': 'workflow-xyz'}} + }, # milliseconds ['node1'], ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, @@ -283,6 +290,7 @@ def test_completed_job(self): assert job['status'] == 'completed' assert job['execution_start_time'] == 1234567890000 assert job['execution_end_time'] == 1234567890000 + 2500 # +2.5 seconds in ms + assert job['workflow_id'] == 'workflow-xyz' def test_failed_job(self): """Failed history item should have failed status and message.""" From b7c7712e313e0d6e8298844f6a23a08be5d7bc3d Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 21:21:51 -0800 Subject: [PATCH 11/18] allow srot by exec duration --- server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.py b/server.py index f72142a806e7..6b34d74c3d07 100644 --- a/server.py +++ b/server.py @@ -710,9 +710,9 @@ async def get_jobs(request): status_filter = None sort_by = query.get('sort', 'created_at') - if sort_by != 'created_at': + if sort_by not in {'created_at', 'execution_duration'}: return web.json_response( - {"error": "sort must be 'created_at'"}, + {"error": "sort must be 'created_at' or 'execution_duration'"}, status=400 ) From a38aacf6e9bdaa8391afb1b3380de59bc0190a9f Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Wed, 3 Dec 2025 21:45:29 -0800 Subject: [PATCH 12/18] fix tests --- tests/execution/test_execution.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index 5316848dfe64..eedc9e27a39a 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -919,8 +919,9 @@ def test_jobs_api_returns_completed_jobs( jobs_response = client.get_jobs(status="completed") assert "jobs" in jobs_response, "Response should have jobs array" - assert "total" in jobs_response, "Response should have total count" - assert "has_more" in jobs_response, "Response should have has_more flag" + assert "pagination" in jobs_response, "Response should have pagination object" + assert "total" in jobs_response["pagination"], "Pagination should have total count" + assert "has_more" in jobs_response["pagination"], "Pagination should have has_more flag" job_ids = [j["id"] for j in jobs_response["jobs"]] assert prompt_id in job_ids, "Completed job should appear in jobs list" From 1b673069fe33eb6de6f1d3aac302ef8d9f184d60 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 12:31:43 -0800 Subject: [PATCH 13/18] send priority and remove error msg --- comfy_execution/jobs.py | 10 ++++------ tests/execution/test_execution.py | 2 +- tests/execution/test_jobs.py | 6 ++++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index ac70fb1e4056..d9e23ceed2e8 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -50,7 +50,7 @@ def is_previewable(media_type, item): def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" - _, prompt_id, _, extra_data, _ = item[:5] + priority, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') @@ -58,8 +58,8 @@ def normalize_queue_item(item, status): return { 'id': prompt_id, 'status': status, + 'priority': priority, 'create_time': create_time, - 'error_message': None, 'execution_error': None, 'execution_start_time': None, 'execution_end_time': None, @@ -72,7 +72,7 @@ def normalize_queue_item(item, status): def normalize_history_item(prompt_id, history_item, include_outputs=False): """Convert history item dict to unified job dict.""" prompt_tuple = history_item['prompt'] - _, _, prompt, extra_data, _ = prompt_tuple[:5] + priority, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') @@ -89,7 +89,6 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): outputs = history_item.get('outputs', {}) outputs_count, preview_output = get_outputs_summary(outputs) - error_message = None execution_error = None if status == JobStatus.FAILED and status_info: messages = status_info.get('messages', []) @@ -97,7 +96,6 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): if isinstance(entry, (list, tuple)) and len(entry) >= 2 and entry[0] == 'execution_error': detail = entry[1] if isinstance(detail, dict): - error_message = str(detail.get('exception_message', '')) execution_error = detail break @@ -111,8 +109,8 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): job = { 'id': prompt_id, 'status': status, + 'priority': priority, 'create_time': create_time, - 'error_message': error_message, 'execution_error': execution_error, 'execution_start_time': execution_start_time, 'execution_end_time': execution_end_time, diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index eedc9e27a39a..d86e192d68ef 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -942,7 +942,7 @@ def test_jobs_api_job_structure( assert "outputs_count" in job, "Job should have outputs_count" assert "preview_output" in job, "Job should have preview_output" assert "workflow_id" in job, "Job should have workflow_id" - assert "error_message" in job, "Job should have error_message" + assert "execution_error" in job, "Job should have execution_error" def test_jobs_api_preview_output_structure( self, client: ComfyClient, builder: GraphBuilder diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 30e64b1e71af..2b63e1e4e912 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -256,10 +256,11 @@ def test_basic_normalization(self): assert job['id'] == 'prompt-123' assert job['status'] == 'pending' + assert job['priority'] == 10 assert job['create_time'] == 1234567890 assert job['execution_start_time'] is None assert job['execution_end_time'] is None - assert job['error_message'] is None + assert job['execution_error'] is None assert job['outputs_count'] == 0 assert job['workflow_id'] == 'workflow-abc' @@ -288,6 +289,7 @@ def test_completed_job(self): assert job['id'] == 'prompt-456' assert job['status'] == 'completed' + assert job['priority'] == 5 assert job['execution_start_time'] == 1234567890000 assert job['execution_end_time'] == 1234567890000 + 2500 # +2.5 seconds in ms assert job['workflow_id'] == 'workflow-xyz' @@ -323,10 +325,10 @@ def test_failed_job(self): # List view - includes execution_error job = normalize_history_item('prompt-789', history_item) assert job['status'] == 'failed' - assert job['error_message'] == 'CUDA out of memory' assert job['execution_error'] == error_detail assert job['execution_error']['node_id'] == '5' assert job['execution_error']['node_type'] == 'KSampler' + assert job['execution_error']['exception_message'] == 'CUDA out of memory' def test_include_outputs(self): """When include_outputs=True, should include full output data.""" From 5274b91f257b82cd465bd33ea8a4070a44db6504 Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 12:51:24 -0800 Subject: [PATCH 14/18] use ws messages to get start and end times --- comfy_execution/jobs.py | 25 ++++++++++---------- execution.py | 4 +--- main.py | 13 +++++----- tests/execution/test_jobs.py | 46 ++++++++++++++++++++---------------- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index d9e23ceed2e8..ca7401b1c0d8 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -90,21 +90,20 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): outputs_count, preview_output = get_outputs_summary(outputs) execution_error = None - if status == JobStatus.FAILED and status_info: - messages = status_info.get('messages', []) - for entry in messages: - if isinstance(entry, (list, tuple)) and len(entry) >= 2 and entry[0] == 'execution_error': - detail = entry[1] - if isinstance(detail, dict): - execution_error = detail - break - - execution_duration = history_item.get('execution_duration') execution_start_time = None execution_end_time = None - if execution_duration is not None and create_time is not None: - execution_end_time = create_time + int(execution_duration * 1000) - execution_start_time = create_time + if status_info: + messages = status_info.get('messages', []) + for entry in messages: + if isinstance(entry, (list, tuple)) and len(entry) >= 2: + event_name, event_data = entry[0], entry[1] + if isinstance(event_data, dict): + if event_name == 'execution_start': + execution_start_time = event_data.get('timestamp') + elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'): + execution_end_time = event_data.get('timestamp') + if event_name == 'execution_error': + execution_error = event_data job = { 'id': prompt_id, diff --git a/execution.py b/execution.py index 99c07bfec467..c552214d4984 100644 --- a/execution.py +++ b/execution.py @@ -1136,8 +1136,7 @@ class ExecutionStatus(NamedTuple): messages: List[str] def task_done(self, item_id, history_result, - status: Optional['PromptQueue.ExecutionStatus'], process_item=None, - execution_duration: Optional[float] = None): + status: Optional['PromptQueue.ExecutionStatus'], process_item=None): with self.mutex: prompt = self.currently_running.pop(item_id) if len(self.history) > MAXIMUM_HISTORY_SIZE: @@ -1154,7 +1153,6 @@ def task_done(self, item_id, history_result, "prompt": prompt, "outputs": {}, 'status': status_dict, - 'execution_duration': execution_duration, } self.history[prompt[1]].update(history_result) self.server.queue_updated() diff --git a/main.py b/main.py index d2e3682fa2e8..1eb4423b8cd4 100644 --- a/main.py +++ b/main.py @@ -230,7 +230,7 @@ def prompt_worker(q, server_instance): need_gc = True current_time = time.perf_counter() - execution_duration = current_time - execution_start_time + execution_time = current_time - execution_start_time remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, @@ -239,17 +239,16 @@ def prompt_worker(q, server_instance): status_str='success' if e.success else 'error', completed=e.success, messages=e.status_messages), - process_item=remove_sensitive, - execution_duration=execution_duration) + process_item=remove_sensitive) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) # Log Time in a more readable way after 10 minutes - if execution_duration > 600: - execution_duration_str = time.strftime("%H:%M:%S", time.gmtime(execution_duration)) - logging.info(f"Prompt executed in {execution_duration_str}") + if execution_time > 600: + execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) + logging.info(f"Prompt executed in {execution_time}") else: - logging.info("Prompt executed in {:.2f} seconds".format(execution_duration)) + logging.info("Prompt executed in {:.2f} seconds".format(execution_time)) flags = q.get_flags() free_memory = flags.get("free_memory", False) diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py index 2b63e1e4e912..b01d26ecc3c9 100644 --- a/tests/execution/test_jobs.py +++ b/tests/execution/test_jobs.py @@ -269,7 +269,7 @@ class TestNormalizeHistoryItem: """Unit tests for normalize_history_item()""" def test_completed_job(self): - """Completed history item should have correct status.""" + """Completed history item should have correct status and times from messages.""" history_item = { 'prompt': ( 5, # priority @@ -278,54 +278,61 @@ def test_completed_job(self): { 'create_time': 1234567890000, 'extra_pnginfo': {'workflow': {'id': 'workflow-xyz'}} - }, # milliseconds + }, ['node1'], ), - 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'status': { + 'status_str': 'success', + 'completed': True, + 'messages': [ + ('execution_start', {'prompt_id': 'prompt-456', 'timestamp': 1234567890500}), + ('execution_success', {'prompt_id': 'prompt-456', 'timestamp': 1234567893000}), + ] + }, 'outputs': {}, - 'execution_duration': 2.5, } job = normalize_history_item('prompt-456', history_item) assert job['id'] == 'prompt-456' assert job['status'] == 'completed' assert job['priority'] == 5 - assert job['execution_start_time'] == 1234567890000 - assert job['execution_end_time'] == 1234567890000 + 2500 # +2.5 seconds in ms + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567893000 assert job['workflow_id'] == 'workflow-xyz' def test_failed_job(self): - """Failed history item should have failed status and message.""" - error_detail = { - 'node_id': '5', - 'node_type': 'KSampler', - 'exception_message': 'CUDA out of memory', - 'exception_type': 'RuntimeError', - 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], - } + """Failed history item should have failed status and error from messages.""" history_item = { 'prompt': ( 5, 'prompt-789', {'nodes': {}}, - {'create_time': 1234567890}, + {'create_time': 1234567890000}, ['node1'], ), 'status': { 'status_str': 'error', 'completed': False, 'messages': [ - ('execution_error', error_detail) + ('execution_start', {'prompt_id': 'prompt-789', 'timestamp': 1234567890500}), + ('execution_error', { + 'prompt_id': 'prompt-789', + 'node_id': '5', + 'node_type': 'KSampler', + 'exception_message': 'CUDA out of memory', + 'exception_type': 'RuntimeError', + 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], + 'timestamp': 1234567891000, + }) ] }, 'outputs': {}, - 'execution_duration': 1.0, } - # List view - includes execution_error job = normalize_history_item('prompt-789', history_item) assert job['status'] == 'failed' - assert job['execution_error'] == error_detail + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567891000 assert job['execution_error']['node_id'] == '5' assert job['execution_error']['node_type'] == 'KSampler' assert job['execution_error']['exception_message'] == 'CUDA out of memory' @@ -342,7 +349,6 @@ def test_include_outputs(self): ), 'status': {'status_str': 'success', 'completed': True, 'messages': []}, 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, - 'execution_duration': 2.5, } job = normalize_history_item('prompt-123', history_item, include_outputs=True) From c860cc612bc9c9a7377dc28b140bb41219e20d6f Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 12:52:36 -0800 Subject: [PATCH 15/18] revert main.py fully --- main.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/main.py b/main.py index 1eb4423b8cd4..0cd815d9e620 100644 --- a/main.py +++ b/main.py @@ -229,20 +229,19 @@ def prompt_worker(q, server_instance): e.execute(item[2], prompt_id, extra_data, item[4]) need_gc = True - current_time = time.perf_counter() - execution_time = current_time - execution_start_time - remove_sensitive = lambda prompt: prompt[:5] + prompt[6:] q.task_done(item_id, e.history_result, status=execution.PromptQueue.ExecutionStatus( status_str='success' if e.success else 'error', completed=e.success, - messages=e.status_messages), - process_item=remove_sensitive) + messages=e.status_messages), process_item=remove_sensitive) if server_instance.client_id is not None: server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, server_instance.client_id) + current_time = time.perf_counter() + execution_time = current_time - execution_start_time + # Log Time in a more readable way after 10 minutes if execution_time > 600: execution_time = time.strftime("%H:%M:%S", time.gmtime(execution_time)) From 5ed90a1583cc95d612ed261c9ed85d396061400d Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 13:36:18 -0800 Subject: [PATCH 16/18] refactor: move all /jobs business logic to jobs.py --- comfy_execution/jobs.py | 81 ++++++++++++++++++++++++++++++++++++++++- execution.py | 74 ------------------------------------- server.py | 23 ++++++++---- 3 files changed, 94 insertions(+), 84 deletions(-) diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py index ca7401b1c0d8..9362c7e99d0d 100644 --- a/comfy_execution/jobs.py +++ b/comfy_execution/jobs.py @@ -52,7 +52,7 @@ def normalize_queue_item(item, status): """Convert queue item tuple to unified job dict.""" priority, prompt_id, _, extra_data, _ = item[:5] create_time = extra_data.get('create_time') - extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') return { @@ -74,7 +74,7 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False): prompt_tuple = history_item['prompt'] priority, _, prompt, extra_data, _ = prompt_tuple[:5] create_time = extra_data.get('create_time') - extra_pnginfo = extra_data.get('extra_pnginfo', {}) or {} + extra_pnginfo = extra_data.get('extra_pnginfo') or {} workflow_id = extra_pnginfo.get('workflow', {}).get('id') status_info = history_item.get('status', {}) @@ -178,3 +178,80 @@ def get_sort_key(job): return job.get('create_time') or 0 return sorted(jobs, key=get_sort_key, reverse=reverse) + + +def get_job(prompt_id, running, queued, history): + """ + Get a single job by prompt_id from history or queue. + + Args: + prompt_id: The prompt ID to look up + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + + Returns: + Job dict with full details, or None if not found + """ + if prompt_id in history: + return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) + + for item in running: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.IN_PROGRESS) + + for item in queued: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.PENDING) + + return None + + +def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): + """ + Get all jobs (running, pending, completed) with filtering and sorting. + + Args: + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + status_filter: List of statuses to include (from JobStatus.ALL) + sort_by: Field to sort by ('created_at', 'execution_duration') + sort_order: 'asc' or 'desc' + limit: Maximum number of items to return + offset: Number of items to skip + + Returns: + tuple: (jobs_list, total_count) + """ + jobs = [] + + if status_filter is None: + status_filter = JobStatus.ALL + + if JobStatus.IN_PROGRESS in status_filter: + for item in running: + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) + + if JobStatus.PENDING in status_filter: + for item in queued: + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) + + include_completed = JobStatus.COMPLETED in status_filter + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: + for prompt_id, history_item in history.items(): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): + jobs.append(normalize_history_item(prompt_id, history_item)) + + jobs = apply_sorting(jobs, sort_by, sort_order) + + total_count = len(jobs) + + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) diff --git a/execution.py b/execution.py index c552214d4984..c2186ac98147 100644 --- a/execution.py +++ b/execution.py @@ -33,12 +33,6 @@ from comfy_execution.validation import validate_node_input from comfy_execution.progress import get_progress_state, reset_progress_state, add_progress_handler, WebUIProgressHandler from comfy_execution.utils import CurrentNodeContext -from comfy_execution.jobs import ( - JobStatus, - normalize_queue_item, - normalize_history_item, - apply_sorting, -) from comfy_api.internal import _ComfyNodeInternal, _NodeOutputInternal, first_real_override, is_class, make_locked_method_func from comfy_api.latest import io, _io @@ -1229,74 +1223,6 @@ def delete_history_item(self, id_to_delete): with self.mutex: self.history.pop(id_to_delete, None) - def get_job(self, prompt_id): - """Get a single job by prompt_id from history or queue.""" - history = self.get_history(prompt_id=prompt_id) - - if prompt_id in history: - return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) - - running, queued = self.get_current_queue_volatile() - - for item in running: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.IN_PROGRESS) - - for item in queued: - if item[1] == prompt_id: - return normalize_queue_item(item, JobStatus.PENDING) - - return None - - def get_all_jobs(self, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): - """ - Get all jobs (running, pending, completed) with filtering and sorting. - - Args: - status_filter: list of statuses to include (from JobStatus.ALL) - sort_by: field to sort by ('created_at', 'execution_duration') - sort_order: 'asc' or 'desc' - limit: maximum number of items to return - offset: number of items to skip - - Returns: - tuple: (jobs_list, total_count) - """ - running, queued = self.get_current_queue_volatile() - history = self.get_history() - - jobs = [] - - if status_filter is None: - status_filter = JobStatus.ALL - - if JobStatus.IN_PROGRESS in status_filter: - for item in running: - jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) - - if JobStatus.PENDING in status_filter: - for item in queued: - jobs.append(normalize_queue_item(item, JobStatus.PENDING)) - - include_completed = JobStatus.COMPLETED in status_filter - include_failed = JobStatus.FAILED in status_filter - if include_completed or include_failed: - for prompt_id, history_item in history.items(): - is_failed = history_item.get('status', {}).get('status_str') == 'error' - if (is_failed and include_failed) or (not is_failed and include_completed): - jobs.append(normalize_history_item(prompt_id, history_item)) - - jobs = apply_sorting(jobs, sort_by, sort_order) - - total_count = len(jobs) - - if offset > 0: - jobs = jobs[offset:] - if limit is not None: - jobs = jobs[:limit] - - return (jobs, total_count) - def set_flag(self, name, data): with self.mutex: self.flags[name] = data diff --git a/server.py b/server.py index 6b34d74c3d07..65a25fd16d00 100644 --- a/server.py +++ b/server.py @@ -7,7 +7,7 @@ import nodes import folder_paths import execution -from comfy_execution.jobs import JobStatus +from comfy_execution.jobs import JobStatus, get_job, get_all_jobs import uuid import urllib import json @@ -709,17 +709,17 @@ async def get_jobs(request): if not status_filter: status_filter = None - sort_by = query.get('sort', 'created_at') + sort_by = query.get('sort_by', 'created_at') if sort_by not in {'created_at', 'execution_duration'}: return web.json_response( - {"error": "sort must be 'created_at' or 'execution_duration'"}, + {"error": "sort_by must be 'created_at' or 'execution_duration'"}, status=400 ) - sort_order = query.get('order', 'desc') + sort_order = query.get('sort_order', 'desc') if sort_order not in {'asc', 'desc'}: return web.json_response( - {"error": "order must be 'asc' or 'desc'"}, + {"error": "sort_order must be 'asc' or 'desc'"}, status=400 ) @@ -750,7 +750,11 @@ async def get_jobs(request): status=400 ) - jobs, total = self.prompt_queue.get_all_jobs( + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history() + + jobs, total = get_all_jobs( + running, queued, history, status_filter=status_filter, sort_by=sort_by, sort_order=sort_order, @@ -771,7 +775,7 @@ async def get_jobs(request): }) @routes.get("/api/jobs/{job_id}") - async def get_job(request): + async def get_job_by_id(request): """Get a single job by ID.""" job_id = request.match_info.get("job_id", None) if not job_id: @@ -780,7 +784,10 @@ async def get_job(request): status=400 ) - job = self.prompt_queue.get_job(job_id) + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history(prompt_id=job_id) + + job = get_job(job_id, running, queued, history) if job is None: return web.json_response( {"error": "Job not found"}, From b14ae80c758ac9989e318417278c239450a4dd3e Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 14:07:49 -0800 Subject: [PATCH 17/18] fix failing test --- tests/execution/test_execution.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index d86e192d68ef..0821e4ca113b 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -99,7 +99,7 @@ def get_all_history(self, max_items=None, offset=None): with urllib.request.urlopen(url) as response: return json.loads(response.read()) - def get_jobs(self, status=None, limit=None, offset=None, sort=None, order=None): + def get_jobs(self, status=None, limit=None, offset=None, sort_by=None, sort_order=None): url = "http://{}/api/jobs".format(self.server_address) params = {} if status is not None: @@ -108,10 +108,10 @@ def get_jobs(self, status=None, limit=None, offset=None, sort=None, order=None): params["limit"] = limit if offset is not None: params["offset"] = offset - if sort is not None: - params["sort"] = sort - if order is not None: - params["order"] = order + if sort_by is not None: + params["sort_by"] = sort_by + if sort_order is not None: + params["sort_order"] = sort_order if params: url_values = urllib.parse.urlencode(params) @@ -983,8 +983,8 @@ def test_jobs_api_sorting( for _ in range(3): self._create_history_item(client, builder) - desc_jobs = client.get_jobs(order="desc") - asc_jobs = client.get_jobs(order="asc") + desc_jobs = client.get_jobs(sort_order="desc") + asc_jobs = client.get_jobs(sort_order="asc") if len(desc_jobs["jobs"]) >= 2: desc_times = [j["create_time"] for j in desc_jobs["jobs"] if j["create_time"]] From a10cb30afa5a634b39fd23563e74e49345b92c1a Mon Sep 17 00:00:00 2001 From: Richard Yu Date: Thu, 4 Dec 2025 14:20:35 -0800 Subject: [PATCH 18/18] remove some tests --- tests/execution/test_execution.py | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index 0821e4ca113b..cbf94db529cb 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -910,22 +910,6 @@ def test_offset_near_end_returns_remaining_items_only( assert len(result) <= 1, "Should return at most 1 item when offset is near end" # Jobs API tests - def test_jobs_api_returns_completed_jobs( - self, client: ComfyClient, builder: GraphBuilder - ): - """Test that /api/jobs returns completed jobs""" - result = self._create_history_item(client, builder) - prompt_id = result.get_prompt_id() - - jobs_response = client.get_jobs(status="completed") - assert "jobs" in jobs_response, "Response should have jobs array" - assert "pagination" in jobs_response, "Response should have pagination object" - assert "total" in jobs_response["pagination"], "Pagination should have total count" - assert "has_more" in jobs_response["pagination"], "Pagination should have has_more flag" - - job_ids = [j["id"] for j in jobs_response["jobs"]] - assert prompt_id in job_ids, "Completed job should appear in jobs list" - def test_jobs_api_job_structure( self, client: ComfyClient, builder: GraphBuilder ): @@ -1029,15 +1013,3 @@ def test_get_job_not_found( """Test getting a non-existent job returns 404""" job = client.get_job("nonexistent-job-id") assert job is None, "Non-existent job should return None" - - def test_jobs_list_excludes_outputs( - self, client: ComfyClient, builder: GraphBuilder - ): - """Test that job list doesn't include full outputs""" - self._create_history_item(client, builder) - - jobs_response = client.get_jobs(status="completed", limit=1) - job = jobs_response["jobs"][0] - - assert "outputs" not in job, "List should not include outputs" - assert "prompt" not in job, "List should not include prompt"