diff --git a/comfy_execution/jobs.py b/comfy_execution/jobs.py new file mode 100644 index 000000000000..9362c7e99d0d --- /dev/null +++ b/comfy_execution/jobs.py @@ -0,0 +1,257 @@ +""" +Job utilities for the /api/jobs endpoint. +Provides normalization and helper functions for job status tracking. +""" + + +class JobStatus: + """Job status constants.""" + PENDING = 'pending' + IN_PROGRESS = 'in_progress' + COMPLETED = 'completed' + FAILED = 'failed' + CANCELLED = 'cancelled' + + ALL = [PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED] + + +# Media types that can be previewed in the frontend +PREVIEWABLE_MEDIA_TYPES = frozenset({'images', 'video', 'audio'}) + +# 3D file extensions for preview fallback (no dedicated media_type exists) +THREE_D_EXTENSIONS = frozenset({'.obj', '.fbx', '.gltf', '.glb'}) + + +def is_previewable(media_type, item): + """ + Check if an output item is previewable. + Matches frontend logic in ComfyUI_frontend/src/stores/queueStore.ts + + Priority: + 1. media_type is 'images', 'video', or 'audio' + 2. format field starts with 'video/' or 'audio/' + 3. filename has a 3D extension (.obj, .fbx, .gltf, .glb) + """ + if media_type in PREVIEWABLE_MEDIA_TYPES: + return True + + # Check format field (MIME type) + fmt = item.get('format', '') + if fmt and (fmt.startswith('video/') or fmt.startswith('audio/')): + return True + + # Check for 3D files by extension + filename = item.get('filename', '').lower() + if any(filename.endswith(ext) for ext in THREE_D_EXTENSIONS): + return True + + return False + + +def normalize_queue_item(item, status): + """Convert queue item tuple to unified job dict.""" + priority, prompt_id, _, extra_data, _ = item[:5] + create_time = extra_data.get('create_time') + extra_pnginfo = extra_data.get('extra_pnginfo') or {} + workflow_id = extra_pnginfo.get('workflow', {}).get('id') + + return { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_error': None, + 'execution_start_time': None, + 'execution_end_time': None, + 'outputs_count': 0, + 'preview_output': None, + 'workflow_id': workflow_id, + } + + +def normalize_history_item(prompt_id, history_item, include_outputs=False): + """Convert history item dict to unified job dict.""" + prompt_tuple = history_item['prompt'] + priority, _, prompt, extra_data, _ = prompt_tuple[:5] + create_time = extra_data.get('create_time') + extra_pnginfo = extra_data.get('extra_pnginfo') or {} + workflow_id = extra_pnginfo.get('workflow', {}).get('id') + + status_info = history_item.get('status', {}) + status_str = status_info.get('status_str') if status_info else None + if status_str == 'success': + status = JobStatus.COMPLETED + elif status_str == 'error': + status = JobStatus.FAILED + else: + status = JobStatus.COMPLETED + + outputs = history_item.get('outputs', {}) + outputs_count, preview_output = get_outputs_summary(outputs) + + execution_error = None + execution_start_time = None + execution_end_time = None + if status_info: + messages = status_info.get('messages', []) + for entry in messages: + if isinstance(entry, (list, tuple)) and len(entry) >= 2: + event_name, event_data = entry[0], entry[1] + if isinstance(event_data, dict): + if event_name == 'execution_start': + execution_start_time = event_data.get('timestamp') + elif event_name in ('execution_success', 'execution_error', 'execution_interrupted'): + execution_end_time = event_data.get('timestamp') + if event_name == 'execution_error': + execution_error = event_data + + job = { + 'id': prompt_id, + 'status': status, + 'priority': priority, + 'create_time': create_time, + 'execution_error': execution_error, + 'execution_start_time': execution_start_time, + 'execution_end_time': execution_end_time, + 'outputs_count': outputs_count, + 'preview_output': preview_output, + 'workflow_id': workflow_id, + } + + if include_outputs: + job['outputs'] = outputs + job['execution_status'] = status_info + job['workflow'] = { + 'prompt': prompt, + 'extra_data': extra_data, + } + + return job + + +def get_outputs_summary(outputs): + """ + Count outputs and find preview in a single pass. + Returns (outputs_count, preview_output). + + Preview priority (matching frontend): + 1. type="output" with previewable media + 2. Any previewable media + """ + count = 0 + preview_output = None + fallback_preview = None + + for node_id, node_outputs in outputs.items(): + for media_type, items in node_outputs.items(): + if media_type == 'animated' or not isinstance(items, list): + continue + + for item in items: + count += 1 + + if preview_output is None and is_previewable(media_type, item): + enriched = { + **item, + 'nodeId': node_id, + 'mediaType': media_type + } + if item.get('type') == 'output': + preview_output = enriched + elif fallback_preview is None: + fallback_preview = enriched + + return count, preview_output or fallback_preview + + +def apply_sorting(jobs, sort_by, sort_order): + """Sort jobs list by specified field and order.""" + reverse = (sort_order == 'desc') + + if sort_by == 'execution_duration': + def get_sort_key(job): + start = job.get('execution_start_time') or 0 + end = job.get('execution_end_time') or 0 + return end - start if end and start else 0 + else: + def get_sort_key(job): + return job.get('create_time') or 0 + + return sorted(jobs, key=get_sort_key, reverse=reverse) + + +def get_job(prompt_id, running, queued, history): + """ + Get a single job by prompt_id from history or queue. + + Args: + prompt_id: The prompt ID to look up + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + + Returns: + Job dict with full details, or None if not found + """ + if prompt_id in history: + return normalize_history_item(prompt_id, history[prompt_id], include_outputs=True) + + for item in running: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.IN_PROGRESS) + + for item in queued: + if item[1] == prompt_id: + return normalize_queue_item(item, JobStatus.PENDING) + + return None + + +def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_at", sort_order="desc", limit=None, offset=0): + """ + Get all jobs (running, pending, completed) with filtering and sorting. + + Args: + running: List of currently running queue items + queued: List of pending queue items + history: Dict of history items keyed by prompt_id + status_filter: List of statuses to include (from JobStatus.ALL) + sort_by: Field to sort by ('created_at', 'execution_duration') + sort_order: 'asc' or 'desc' + limit: Maximum number of items to return + offset: Number of items to skip + + Returns: + tuple: (jobs_list, total_count) + """ + jobs = [] + + if status_filter is None: + status_filter = JobStatus.ALL + + if JobStatus.IN_PROGRESS in status_filter: + for item in running: + jobs.append(normalize_queue_item(item, JobStatus.IN_PROGRESS)) + + if JobStatus.PENDING in status_filter: + for item in queued: + jobs.append(normalize_queue_item(item, JobStatus.PENDING)) + + include_completed = JobStatus.COMPLETED in status_filter + include_failed = JobStatus.FAILED in status_filter + if include_completed or include_failed: + for prompt_id, history_item in history.items(): + is_failed = history_item.get('status', {}).get('status_str') == 'error' + if (is_failed and include_failed) or (not is_failed and include_completed): + jobs.append(normalize_history_item(prompt_id, history_item)) + + jobs = apply_sorting(jobs, sort_by, sort_order) + + total_count = len(jobs) + + if offset > 0: + jobs = jobs[offset:] + if limit is not None: + jobs = jobs[:limit] + + return (jobs, total_count) diff --git a/server.py b/server.py index ac4f42222261..65a25fd16d00 100644 --- a/server.py +++ b/server.py @@ -7,6 +7,7 @@ import nodes import folder_paths import execution +from comfy_execution.jobs import JobStatus, get_job, get_all_jobs import uuid import urllib import json @@ -694,6 +695,107 @@ async def get_object_info_node(request): out[node_class] = node_info(node_class) return web.json_response(out) + @routes.get("/api/jobs") + async def get_jobs(request): + """List all jobs with filtering, sorting, and pagination.""" + query = request.rel_url.query + + status_param = query.get("status", None) + status_filter = None + if status_param: + status_filter = [s.strip() for s in status_param.split(',') if s.strip()] + valid_statuses = set(JobStatus.ALL) + status_filter = [s for s in status_filter if s in valid_statuses] + if not status_filter: + status_filter = None + + sort_by = query.get('sort_by', 'created_at') + if sort_by not in {'created_at', 'execution_duration'}: + return web.json_response( + {"error": "sort_by must be 'created_at' or 'execution_duration'"}, + status=400 + ) + + sort_order = query.get('sort_order', 'desc') + if sort_order not in {'asc', 'desc'}: + return web.json_response( + {"error": "sort_order must be 'asc' or 'desc'"}, + status=400 + ) + + limit = None + if 'limit' in query: + try: + limit = int(query.get('limit')) + if limit <= 0: + return web.json_response( + {"error": "limit must be a positive integer"}, + status=400 + ) + except (ValueError, TypeError): + return web.json_response( + {"error": "limit must be an integer"}, + status=400 + ) + + offset = 0 + if 'offset' in query: + try: + offset = int(query.get('offset')) + if offset < 0: + offset = 0 + except (ValueError, TypeError): + return web.json_response( + {"error": "offset must be an integer"}, + status=400 + ) + + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history() + + jobs, total = get_all_jobs( + running, queued, history, + status_filter=status_filter, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + offset=offset + ) + + has_more = (offset + len(jobs)) < total + + return web.json_response({ + 'jobs': jobs, + 'pagination': { + 'offset': offset, + 'limit': limit, + 'total': total, + 'has_more': has_more + } + }) + + @routes.get("/api/jobs/{job_id}") + async def get_job_by_id(request): + """Get a single job by ID.""" + job_id = request.match_info.get("job_id", None) + if not job_id: + return web.json_response( + {"error": "job_id is required"}, + status=400 + ) + + running, queued = self.prompt_queue.get_current_queue_volatile() + history = self.prompt_queue.get_history(prompt_id=job_id) + + job = get_job(job_id, running, queued, history) + if job is None: + return web.json_response( + {"error": "Job not found"}, + status=404 + ) + + return web.json_response(job) + @routes.get("/history") async def get_history(request): max_items = request.rel_url.query.get("max_items", None) diff --git a/tests/execution/test_execution.py b/tests/execution/test_execution.py index ace0d2279093..cbf94db529cb 100644 --- a/tests/execution/test_execution.py +++ b/tests/execution/test_execution.py @@ -99,6 +99,37 @@ def get_all_history(self, max_items=None, offset=None): with urllib.request.urlopen(url) as response: return json.loads(response.read()) + def get_jobs(self, status=None, limit=None, offset=None, sort_by=None, sort_order=None): + url = "http://{}/api/jobs".format(self.server_address) + params = {} + if status is not None: + params["status"] = status + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + if sort_by is not None: + params["sort_by"] = sort_by + if sort_order is not None: + params["sort_order"] = sort_order + + if params: + url_values = urllib.parse.urlencode(params) + url = "{}?{}".format(url, url_values) + + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + + def get_job(self, job_id): + url = "http://{}/api/jobs/{}".format(self.server_address, job_id) + try: + with urllib.request.urlopen(url) as response: + return json.loads(response.read()) + except urllib.error.HTTPError as e: + if e.code == 404: + return None + raise + def set_test_name(self, name): self.test_name = name @@ -877,3 +908,108 @@ def test_offset_near_end_returns_remaining_items_only( result = client.get_all_history(max_items=5, offset=len(all_history) - 1) assert len(result) <= 1, "Should return at most 1 item when offset is near end" + + # Jobs API tests + def test_jobs_api_job_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that job objects have required fields""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + assert len(jobs_response["jobs"]) > 0, "Should have at least one job" + + job = jobs_response["jobs"][0] + assert "id" in job, "Job should have id" + assert "status" in job, "Job should have status" + assert "create_time" in job, "Job should have create_time" + assert "outputs_count" in job, "Job should have outputs_count" + assert "preview_output" in job, "Job should have preview_output" + assert "workflow_id" in job, "Job should have workflow_id" + assert "execution_error" in job, "Job should have execution_error" + + def test_jobs_api_preview_output_structure( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test that preview_output has correct structure""" + self._create_history_item(client, builder) + + jobs_response = client.get_jobs(status="completed", limit=1) + job = jobs_response["jobs"][0] + + if job["preview_output"] is not None: + preview = job["preview_output"] + assert "filename" in preview, "Preview should have filename" + assert "nodeId" in preview, "Preview should have nodeId" + assert "mediaType" in preview, "Preview should have mediaType" + + def test_jobs_api_pagination( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API pagination""" + for _ in range(5): + self._create_history_item(client, builder) + + first_page = client.get_jobs(limit=2, offset=0) + second_page = client.get_jobs(limit=2, offset=2) + + assert len(first_page["jobs"]) <= 2, "First page should have at most 2 jobs" + assert len(second_page["jobs"]) <= 2, "Second page should have at most 2 jobs" + + first_ids = {j["id"] for j in first_page["jobs"]} + second_ids = {j["id"] for j in second_page["jobs"]} + assert first_ids.isdisjoint(second_ids), "Pages should have different jobs" + + def test_jobs_api_sorting( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API sorting""" + for _ in range(3): + self._create_history_item(client, builder) + + desc_jobs = client.get_jobs(sort_order="desc") + asc_jobs = client.get_jobs(sort_order="asc") + + if len(desc_jobs["jobs"]) >= 2: + desc_times = [j["create_time"] for j in desc_jobs["jobs"] if j["create_time"]] + asc_times = [j["create_time"] for j in asc_jobs["jobs"] if j["create_time"]] + if len(desc_times) >= 2: + assert desc_times == sorted(desc_times, reverse=True), "Desc should be newest first" + if len(asc_times) >= 2: + assert asc_times == sorted(asc_times), "Asc should be oldest first" + + def test_jobs_api_status_filter( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test jobs API status filtering""" + self._create_history_item(client, builder) + + completed_jobs = client.get_jobs(status="completed") + assert len(completed_jobs["jobs"]) > 0, "Should have completed jobs from history" + + for job in completed_jobs["jobs"]: + assert job["status"] == "completed", "Should only return completed jobs" + + # Pending jobs are transient - just verify filter doesn't error + pending_jobs = client.get_jobs(status="pending") + for job in pending_jobs["jobs"]: + assert job["status"] == "pending", "Should only return pending jobs" + + def test_get_job_by_id( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a single job by ID""" + result = self._create_history_item(client, builder) + prompt_id = result.get_prompt_id() + + job = client.get_job(prompt_id) + assert job is not None, "Should find the job" + assert job["id"] == prompt_id, "Job ID should match" + assert "outputs" in job, "Single job should include outputs" + + def test_get_job_not_found( + self, client: ComfyClient, builder: GraphBuilder + ): + """Test getting a non-existent job returns 404""" + job = client.get_job("nonexistent-job-id") + assert job is None, "Non-existent job should return None" diff --git a/tests/execution/test_jobs.py b/tests/execution/test_jobs.py new file mode 100644 index 000000000000..b01d26ecc3c9 --- /dev/null +++ b/tests/execution/test_jobs.py @@ -0,0 +1,362 @@ +"""Unit tests for comfy_execution/jobs.py""" + +from comfy_execution.jobs import ( + JobStatus, + is_previewable, + normalize_queue_item, + normalize_history_item, + get_outputs_summary, + apply_sorting, +) + + +class TestJobStatus: + """Test JobStatus constants.""" + + def test_status_values(self): + """Status constants should have expected string values.""" + assert JobStatus.PENDING == 'pending' + assert JobStatus.IN_PROGRESS == 'in_progress' + assert JobStatus.COMPLETED == 'completed' + assert JobStatus.FAILED == 'failed' + assert JobStatus.CANCELLED == 'cancelled' + + def test_all_contains_all_statuses(self): + """ALL should contain all status values.""" + assert JobStatus.PENDING in JobStatus.ALL + assert JobStatus.IN_PROGRESS in JobStatus.ALL + assert JobStatus.COMPLETED in JobStatus.ALL + assert JobStatus.FAILED in JobStatus.ALL + assert JobStatus.CANCELLED in JobStatus.ALL + assert len(JobStatus.ALL) == 5 + + +class TestIsPreviewable: + """Unit tests for is_previewable()""" + + def test_previewable_media_types(self): + """Images, video, audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + assert is_previewable(media_type, {}) is True + + def test_non_previewable_media_types(self): + """Other media types should not be previewable.""" + for media_type in ['latents', 'text', 'metadata', 'files']: + assert is_previewable(media_type, {}) is False + + def test_3d_extensions_previewable(self): + """3D file extensions should be previewable regardless of media_type.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + item = {'filename': f'model{ext}'} + assert is_previewable('files', item) is True + + def test_3d_extensions_case_insensitive(self): + """3D extension check should be case insensitive.""" + item = {'filename': 'MODEL.GLB'} + assert is_previewable('files', item) is True + + def test_video_format_previewable(self): + """Items with video/ format should be previewable.""" + item = {'format': 'video/mp4'} + assert is_previewable('files', item) is True + + def test_audio_format_previewable(self): + """Items with audio/ format should be previewable.""" + item = {'format': 'audio/wav'} + assert is_previewable('files', item) is True + + def test_other_format_not_previewable(self): + """Items with other format should not be previewable.""" + item = {'format': 'application/json'} + assert is_previewable('files', item) is False + + +class TestGetOutputsSummary: + """Unit tests for get_outputs_summary()""" + + def test_empty_outputs(self): + """Empty outputs should return 0 count and None preview.""" + count, preview = get_outputs_summary({}) + assert count == 0 + assert preview is None + + def test_counts_across_multiple_nodes(self): + """Outputs from multiple nodes should all be counted.""" + outputs = { + 'node1': {'images': [{'filename': 'a.png', 'type': 'output'}]}, + 'node2': {'images': [{'filename': 'b.png', 'type': 'output'}]}, + 'node3': {'images': [ + {'filename': 'c.png', 'type': 'output'}, + {'filename': 'd.png', 'type': 'output'} + ]} + } + count, preview = get_outputs_summary(outputs) + assert count == 4 + + def test_skips_animated_key_and_non_list_values(self): + """The 'animated' key and non-list values should be skipped.""" + outputs = { + 'node1': { + 'images': [{'filename': 'test.png', 'type': 'output'}], + 'animated': [True], # Should skip due to key name + 'metadata': 'string', # Should skip due to non-list + 'count': 42 # Should skip due to non-list + } + } + count, preview = get_outputs_summary(outputs) + assert count == 1 + + def test_preview_prefers_type_output(self): + """Items with type='output' should be preferred for preview.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp.png', 'type': 'temp'}, + {'filename': 'output.png', 'type': 'output'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview['filename'] == 'output.png' + + def test_preview_fallback_when_no_output_type(self): + """If no type='output', should use first previewable.""" + outputs = { + 'node1': { + 'images': [ + {'filename': 'temp1.png', 'type': 'temp'}, + {'filename': 'temp2.png', 'type': 'temp'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['filename'] == 'temp1.png' + + def test_non_previewable_media_types_counted_but_no_preview(self): + """Non-previewable media types should be counted but not used as preview.""" + outputs = { + 'node1': { + 'latents': [ + {'filename': 'latent1.safetensors'}, + {'filename': 'latent2.safetensors'} + ] + } + } + count, preview = get_outputs_summary(outputs) + assert count == 2 + assert preview is None + + def test_previewable_media_types(self): + """Images, video, and audio media types should be previewable.""" + for media_type in ['images', 'video', 'audio']: + outputs = { + 'node1': { + media_type: [{'filename': 'test.file', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"{media_type} should be previewable" + + def test_3d_files_previewable(self): + """3D file extensions should be previewable.""" + for ext in ['.obj', '.fbx', '.gltf', '.glb']: + outputs = { + 'node1': { + 'files': [{'filename': f'model{ext}', 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"3D file {ext} should be previewable" + + def test_format_mime_type_previewable(self): + """Files with video/ or audio/ format should be previewable.""" + for fmt in ['video/x-custom', 'audio/x-custom']: + outputs = { + 'node1': { + 'files': [{'filename': 'file.custom', 'format': fmt, 'type': 'output'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview is not None, f"Format {fmt} should be previewable" + + def test_preview_enriched_with_node_metadata(self): + """Preview should include nodeId, mediaType, and original fields.""" + outputs = { + 'node123': { + 'images': [{'filename': 'test.png', 'type': 'output', 'subfolder': 'outputs'}] + } + } + count, preview = get_outputs_summary(outputs) + assert preview['nodeId'] == 'node123' + assert preview['mediaType'] == 'images' + assert preview['subfolder'] == 'outputs' + + +class TestApplySorting: + """Unit tests for apply_sorting()""" + + def test_sort_by_create_time_desc(self): + """Default sort by create_time descending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'desc') + assert [j['id'] for j in result] == ['b', 'c', 'a'] + + def test_sort_by_create_time_asc(self): + """Sort by create_time ascending.""" + jobs = [ + {'id': 'a', 'create_time': 100}, + {'id': 'b', 'create_time': 300}, + {'id': 'c', 'create_time': 200}, + ] + result = apply_sorting(jobs, 'created_at', 'asc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_by_execution_duration(self): + """Sort by execution_duration should order by duration.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, # 5s + {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, # 1s + {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, # 3s + ] + result = apply_sorting(jobs, 'execution_duration', 'desc') + assert [j['id'] for j in result] == ['a', 'c', 'b'] + + def test_sort_with_none_values(self): + """Jobs with None values should sort as 0.""" + jobs = [ + {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, + {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, + {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, + ] + result = apply_sorting(jobs, 'execution_duration', 'asc') + assert result[0]['id'] == 'b' # None treated as 0, comes first + + +class TestNormalizeQueueItem: + """Unit tests for normalize_queue_item()""" + + def test_basic_normalization(self): + """Queue item should be normalized to job dict.""" + item = ( + 10, # priority/number + 'prompt-123', # prompt_id + {'nodes': {}}, # prompt + { + 'create_time': 1234567890, + 'extra_pnginfo': {'workflow': {'id': 'workflow-abc'}} + }, # extra_data + ['node1'], # outputs_to_execute + ) + job = normalize_queue_item(item, JobStatus.PENDING) + + assert job['id'] == 'prompt-123' + assert job['status'] == 'pending' + assert job['priority'] == 10 + assert job['create_time'] == 1234567890 + assert job['execution_start_time'] is None + assert job['execution_end_time'] is None + assert job['execution_error'] is None + assert job['outputs_count'] == 0 + assert job['workflow_id'] == 'workflow-abc' + + +class TestNormalizeHistoryItem: + """Unit tests for normalize_history_item()""" + + def test_completed_job(self): + """Completed history item should have correct status and times from messages.""" + history_item = { + 'prompt': ( + 5, # priority + 'prompt-456', + {'nodes': {}}, + { + 'create_time': 1234567890000, + 'extra_pnginfo': {'workflow': {'id': 'workflow-xyz'}} + }, + ['node1'], + ), + 'status': { + 'status_str': 'success', + 'completed': True, + 'messages': [ + ('execution_start', {'prompt_id': 'prompt-456', 'timestamp': 1234567890500}), + ('execution_success', {'prompt_id': 'prompt-456', 'timestamp': 1234567893000}), + ] + }, + 'outputs': {}, + } + job = normalize_history_item('prompt-456', history_item) + + assert job['id'] == 'prompt-456' + assert job['status'] == 'completed' + assert job['priority'] == 5 + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567893000 + assert job['workflow_id'] == 'workflow-xyz' + + def test_failed_job(self): + """Failed history item should have failed status and error from messages.""" + history_item = { + 'prompt': ( + 5, + 'prompt-789', + {'nodes': {}}, + {'create_time': 1234567890000}, + ['node1'], + ), + 'status': { + 'status_str': 'error', + 'completed': False, + 'messages': [ + ('execution_start', {'prompt_id': 'prompt-789', 'timestamp': 1234567890500}), + ('execution_error', { + 'prompt_id': 'prompt-789', + 'node_id': '5', + 'node_type': 'KSampler', + 'exception_message': 'CUDA out of memory', + 'exception_type': 'RuntimeError', + 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], + 'timestamp': 1234567891000, + }) + ] + }, + 'outputs': {}, + } + + job = normalize_history_item('prompt-789', history_item) + assert job['status'] == 'failed' + assert job['execution_start_time'] == 1234567890500 + assert job['execution_end_time'] == 1234567891000 + assert job['execution_error']['node_id'] == '5' + assert job['execution_error']['node_type'] == 'KSampler' + assert job['execution_error']['exception_message'] == 'CUDA out of memory' + + def test_include_outputs(self): + """When include_outputs=True, should include full output data.""" + history_item = { + 'prompt': ( + 5, + 'prompt-123', + {'nodes': {'1': {}}}, + {'create_time': 1234567890, 'client_id': 'abc'}, + ['node1'], + ), + 'status': {'status_str': 'success', 'completed': True, 'messages': []}, + 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, + } + job = normalize_history_item('prompt-123', history_item, include_outputs=True) + + assert 'outputs' in job + assert 'workflow' in job + assert 'execution_status' in job + assert job['outputs'] == {'node1': {'images': [{'filename': 'test.png'}]}} + assert job['workflow'] == { + 'prompt': {'nodes': {'1': {}}}, + 'extra_data': {'create_time': 1234567890, 'client_id': 'abc'}, + }