diff --git a/src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts new file mode 100644 index 0000000000..2b6cde8c9d --- /dev/null +++ b/src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts @@ -0,0 +1,160 @@ +/** + * @fileoverview Jobs API Fetchers + * @module platform/remote/comfyui/jobs/fetchers/fetchJobs + * + * Unified jobs API fetcher for history, queue, and job details. + * All distributions use the /jobs endpoint. + */ + +import type { ComfyWorkflowJSON } from '@/platform/workflow/validation/schemas/workflowSchema' +import type { PromptId } from '@/schemas/apiSchema' + +import type { + JobDetail, + JobListItem, + JobStatus, + RawJobListItem +} from '../types/jobTypes' +import { zJobDetail, zJobsListResponse } from '../types/jobTypes' + +// ============================================================================ +// Job List Fetchers +// ============================================================================ + +interface FetchJobsRawResult { + jobs: RawJobListItem[] + total: number + offset: number +} + +/** + * Fetches raw jobs from /jobs endpoint + * @internal + */ +async function fetchJobsRaw( + fetchApi: (url: string) => Promise, + statuses: JobStatus[], + maxItems: number = 200, + offset: number = 0 +): Promise { + const statusParam = statuses.join(',') + const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}` + try { + const res = await fetchApi(url) + if (!res.ok) { + console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`) + return { jobs: [], total: 0, offset: 0 } + } + const data = zJobsListResponse.parse(await res.json()) + return { jobs: data.jobs, total: data.pagination.total, offset } + } catch (error) { + console.error('[Jobs API] Error fetching jobs:', error) + return { jobs: [], total: 0, offset: 0 } + } +} + +// Large offset to ensure running/pending jobs sort above history +const QUEUE_PRIORITY_BASE = 1_000_000 + +/** + * Assigns synthetic priority to jobs. + * Only assigns if job doesn't already have a server-provided priority. + */ +function assignPriority( + jobs: RawJobListItem[], + basePriority: number +): JobListItem[] { + return jobs.map((job, index) => ({ + ...job, + priority: job.priority ?? basePriority - index + })) +} + +/** + * Fetches history (completed jobs) + * Assigns synthetic priority starting from total (lower than queue jobs). + */ +export async function fetchHistory( + fetchApi: (url: string) => Promise, + maxItems: number = 200, + offset: number = 0 +): Promise { + const { jobs, total } = await fetchJobsRaw( + fetchApi, + ['completed'], + maxItems, + offset + ) + // History gets priority based on total count (lower than queue) + return assignPriority(jobs, total - offset) +} + +/** + * Fetches queue (in_progress + pending jobs) + * Pending jobs get highest priority, then running jobs. + */ +export async function fetchQueue( + fetchApi: (url: string) => Promise +): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> { + const { jobs } = await fetchJobsRaw( + fetchApi, + ['in_progress', 'pending'], + 200, + 0 + ) + + const running = jobs.filter((j) => j.status === 'in_progress') + const pending = jobs.filter((j) => j.status === 'pending') + + // Pending gets highest priority, then running + // Both are above any history job due to QUEUE_PRIORITY_BASE + return { + Running: assignPriority(running, QUEUE_PRIORITY_BASE + running.length), + Pending: assignPriority( + pending, + QUEUE_PRIORITY_BASE + running.length + pending.length + ) + } +} + +// ============================================================================ +// Job Detail Fetcher +// ============================================================================ + +/** + * Fetches full job details from /jobs/{job_id} + */ +export async function fetchJobDetail( + fetchApi: (url: string) => Promise, + promptId: PromptId +): Promise { + try { + const res = await fetchApi(`/jobs/${promptId}`) + + if (!res.ok) { + console.warn(`Job not found for prompt ${promptId}`) + return undefined + } + + return zJobDetail.parse(await res.json()) + } catch (error) { + console.error(`Failed to fetch job detail for prompt ${promptId}:`, error) + return undefined + } +} + +/** + * Extracts workflow from job detail response. + * The workflow is nested at: workflow.extra_data.extra_pnginfo.workflow + */ +export function extractWorkflow( + job: JobDetail | undefined +): ComfyWorkflowJSON | undefined { + // Cast is safe - workflow will be validated by loadGraphData -> validateComfyWorkflow + const workflowData = job?.workflow as + | { extra_data?: { extra_pnginfo?: { workflow?: unknown } } } + | undefined + return workflowData?.extra_data?.extra_pnginfo?.workflow as + | ComfyWorkflowJSON + | undefined +} diff --git a/src/platform/remote/comfyui/jobs/index.ts b/src/platform/remote/comfyui/jobs/index.ts new file mode 100644 index 0000000000..b401c3457a --- /dev/null +++ b/src/platform/remote/comfyui/jobs/index.ts @@ -0,0 +1,13 @@ +/** + * @fileoverview Jobs API module + * @module platform/remote/comfyui/jobs + * + * Unified jobs API for history, queue, and job details. + */ + +export { + extractWorkflow, + fetchHistory, + fetchJobDetail, + fetchQueue +} from './fetchers/fetchJobs' diff --git a/src/platform/remote/comfyui/jobs/types/jobTypes.ts b/src/platform/remote/comfyui/jobs/types/jobTypes.ts new file mode 100644 index 0000000000..106963cffc --- /dev/null +++ b/src/platform/remote/comfyui/jobs/types/jobTypes.ts @@ -0,0 +1,114 @@ +/** + * @fileoverview Jobs API types - Backend job API format + * @module platform/remote/comfyui/jobs/types/jobTypes + * + * These types represent the jobs API format returned by the backend. + * Jobs API provides a memory-optimized alternative to history API. + */ + +import { z } from 'zod' + +import { resultItemType, zTaskOutput } from '@/schemas/apiSchema' + +// ============================================================================ +// Zod Schemas +// ============================================================================ + +const zJobStatus = z.enum([ + 'pending', + 'in_progress', + 'completed', + 'failed', + 'cancelled' +]) + +const zPreviewOutput = z + .object({ + filename: z.string(), + subfolder: z.string(), + type: resultItemType + }) + .passthrough() // Allow extra fields like nodeId, mediaType + +/** + * Execution error details for error jobs. + * Contains the same structure as ExecutionErrorWsMessage from WebSocket. + */ +const zExecutionError = z + .object({ + prompt_id: z.string().optional(), + timestamp: z.number().optional(), + node_id: z.string(), + node_type: z.string(), + executed: z.array(z.string()).optional(), + exception_message: z.string(), + exception_type: z.string(), + traceback: z.array(z.string()), + current_inputs: z.unknown(), + current_outputs: z.unknown() + }) + .passthrough() + +/** + * Raw job from API - uses passthrough to allow extra fields + */ +const zRawJobListItem = z + .object({ + id: z.string(), + status: zJobStatus, + create_time: z.number(), + execution_start_time: z.number().nullable().optional(), + execution_end_time: z.number().nullable().optional(), + preview_output: zPreviewOutput.nullable().optional(), + outputs_count: z.number().optional(), + execution_error: zExecutionError.nullable().optional(), + workflow_id: z.string().nullable().optional(), + priority: z.number().optional() + }) + .passthrough() + +/** + * Job detail - returned by GET /api/jobs/{job_id} (detail endpoint) + * Includes full workflow and outputs for re-execution and downloads + */ +export const zJobDetail = zRawJobListItem + .extend({ + workflow: z.unknown().optional(), + outputs: zTaskOutput.optional(), + update_time: z.number().optional(), + execution_status: z.unknown().optional(), + execution_meta: z.unknown().optional() + }) + .passthrough() + +/** + * Pagination info from API + */ +const zPaginationInfo = z + .object({ + offset: z.number(), + limit: z.number(), + total: z.number(), + has_more: z.boolean() + }) + .passthrough() + +/** + * Jobs list response structure + */ +export const zJobsListResponse = z + .object({ + jobs: z.array(zRawJobListItem), + pagination: zPaginationInfo + }) + .passthrough() + +// ============================================================================ +// TypeScript Types (derived from Zod schemas) +// ============================================================================ + +export type JobStatus = z.infer +export type RawJobListItem = z.infer +/** Job list item with priority always set (server-provided or synthetic) */ +export type JobListItem = RawJobListItem & { priority: number } +export type JobDetail = z.infer diff --git a/tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts b/tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts new file mode 100644 index 0000000000..5b961f8cf9 --- /dev/null +++ b/tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts @@ -0,0 +1,260 @@ +import { describe, expect, it, vi } from 'vitest' + +import { + extractWorkflow, + fetchHistory, + fetchJobDetail, + fetchQueue +} from '@/platform/remote/comfyui/jobs' + +// Helper to create a mock job +function createMockJob( + id: string, + status: 'pending' | 'in_progress' | 'completed' = 'completed', + overrides: Record = {} +) { + return { + id, + status, + create_time: Date.now(), + execution_start_time: null, + execution_end_time: null, + preview_output: null, + outputs_count: 0, + ...overrides + } +} + +// Helper to create mock API response +function createMockResponse( + jobs: ReturnType[], + total: number = jobs.length +) { + return { + jobs, + pagination: { + offset: 0, + limit: 200, + total, + has_more: false + } + } +} + +describe('fetchJobs', () => { + describe('fetchHistory', () => { + it('fetches completed jobs', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([ + createMockJob('job1', 'completed'), + createMockJob('job2', 'completed') + ]) + ) + }) + + const result = await fetchHistory(mockFetch) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed&limit=200&offset=0' + ) + expect(result).toHaveLength(2) + expect(result[0].id).toBe('job1') + expect(result[1].id).toBe('job2') + }) + + it('assigns synthetic priorities', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse( + [ + createMockJob('job1', 'completed'), + createMockJob('job2', 'completed'), + createMockJob('job3', 'completed') + ], + 3 + ) + ) + }) + + const result = await fetchHistory(mockFetch) + + // Priority should be assigned from total down + expect(result[0].priority).toBe(3) // total - 0 - 0 + expect(result[1].priority).toBe(2) // total - 0 - 1 + expect(result[2].priority).toBe(1) // total - 0 - 2 + }) + + it('preserves server-provided priority', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([ + createMockJob('job1', 'completed', { priority: 999 }) + ]) + ) + }) + + const result = await fetchHistory(mockFetch) + + expect(result[0].priority).toBe(999) + }) + + it('returns empty array on error', async () => { + const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) + + const result = await fetchHistory(mockFetch) + + expect(result).toEqual([]) + }) + + it('returns empty array on non-ok response', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: false, + status: 500 + }) + + const result = await fetchHistory(mockFetch) + + expect(result).toEqual([]) + }) + }) + + describe('fetchQueue', () => { + it('fetches running and pending jobs', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([ + createMockJob('running1', 'in_progress'), + createMockJob('pending1', 'pending'), + createMockJob('pending2', 'pending') + ]) + ) + }) + + const result = await fetchQueue(mockFetch) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=in_progress,pending&limit=200&offset=0' + ) + expect(result.Running).toHaveLength(1) + expect(result.Pending).toHaveLength(2) + expect(result.Running[0].id).toBe('running1') + expect(result.Pending[0].id).toBe('pending1') + }) + + it('assigns queue priorities above history', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([ + createMockJob('running1', 'in_progress'), + createMockJob('pending1', 'pending') + ]) + ) + }) + + const result = await fetchQueue(mockFetch) + + // Queue priorities should be above 1_000_000 (QUEUE_PRIORITY_BASE) + expect(result.Running[0].priority).toBeGreaterThan(1_000_000) + expect(result.Pending[0].priority).toBeGreaterThan(1_000_000) + // Pending should have higher priority than running + expect(result.Pending[0].priority).toBeGreaterThan( + result.Running[0].priority + ) + }) + + it('returns empty arrays on error', async () => { + const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) + + const result = await fetchQueue(mockFetch) + + expect(result).toEqual({ Running: [], Pending: [] }) + }) + }) + + describe('fetchJobDetail', () => { + it('fetches job detail by id', async () => { + const jobDetail = { + ...createMockJob('job1', 'completed'), + workflow: { extra_data: { extra_pnginfo: { workflow: {} } } }, + outputs: { + '1': { + images: [{ filename: 'test.png', subfolder: '', type: 'output' }] + } + } + } + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => Promise.resolve(jobDetail) + }) + + const result = await fetchJobDetail(mockFetch, 'job1') + + expect(mockFetch).toHaveBeenCalledWith('/jobs/job1') + expect(result?.id).toBe('job1') + expect(result?.outputs).toBeDefined() + }) + + it('returns undefined for non-ok response', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: false, + status: 404 + }) + + const result = await fetchJobDetail(mockFetch, 'nonexistent') + + expect(result).toBeUndefined() + }) + + it('returns undefined on error', async () => { + const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) + + const result = await fetchJobDetail(mockFetch, 'job1') + + expect(result).toBeUndefined() + }) + }) + + describe('extractWorkflow', () => { + it('extracts workflow from nested structure', () => { + const jobDetail = { + ...createMockJob('job1', 'completed'), + workflow: { + extra_data: { + extra_pnginfo: { + workflow: { nodes: [], links: [] } + } + } + } + } + + const workflow = extractWorkflow(jobDetail) + + expect(workflow).toEqual({ nodes: [], links: [] }) + }) + + it('returns undefined if workflow not present', () => { + const jobDetail = createMockJob('job1', 'completed') + + const workflow = extractWorkflow(jobDetail) + + expect(workflow).toBeUndefined() + }) + + it('returns undefined for undefined input', () => { + const workflow = extractWorkflow(undefined) + + expect(workflow).toBeUndefined() + }) + }) +})