Skip to content

Commit 5cb2579

Browse files
ric-yuclaude
andcommitted
[feat] Add Jobs API infrastructure (PR 1 of 3)
Adds Jobs API types, fetchers, and new API methods without breaking existing code. This is the foundation for migrating from legacy /history and /queue endpoints to the unified /jobs endpoint. New files: - src/platform/remote/comfyui/jobs/types/jobTypes.ts - Zod schemas for Jobs API - src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts - Fetchers for /jobs endpoint - src/platform/remote/comfyui/jobs/index.ts - Barrel exports - tests-ui/tests/platform/remote/comfyui/jobs/fetchers/fetchJobs.test.ts API additions (non-breaking): - api.getQueueFromJobsApi() - Queue from /jobs endpoint - api.getHistoryFromJobsApi() - History from /jobs endpoint - api.getJobDetail() - Full job details including workflow and outputs Part of Jobs API migration. See docs/JOBS_API_MIGRATION_PLAN.md for details. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 431fe33 commit 5cb2579

File tree

4 files changed

+550
-0
lines changed

4 files changed

+550
-0
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* @fileoverview Jobs API Fetchers
3+
* @module platform/remote/comfyui/jobs/fetchers/fetchJobs
4+
*
5+
* Unified jobs API fetcher for history, queue, and job details.
6+
* All distributions use the /jobs endpoint.
7+
*/
8+
9+
import type { ComfyWorkflowJSON } from '@/platform/workflow/validation/schemas/workflowSchema'
10+
import type { PromptId } from '@/schemas/apiSchema'
11+
12+
import type {
13+
JobDetail,
14+
JobListItem,
15+
JobStatus,
16+
RawJobListItem
17+
} from '../types/jobTypes'
18+
import { zJobDetail, zJobsListResponse } from '../types/jobTypes'
19+
20+
// ============================================================================
21+
// Job List Fetchers
22+
// ============================================================================
23+
24+
interface FetchJobsRawResult {
25+
jobs: RawJobListItem[]
26+
total: number
27+
offset: number
28+
}
29+
30+
/**
31+
* Fetches raw jobs from /jobs endpoint
32+
* @internal
33+
*/
34+
async function fetchJobsRaw(
35+
fetchApi: (url: string) => Promise<Response>,
36+
statuses: JobStatus[],
37+
maxItems: number = 200,
38+
offset: number = 0
39+
): Promise<FetchJobsRawResult> {
40+
const statusParam = statuses.join(',')
41+
const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}`
42+
try {
43+
const res = await fetchApi(url)
44+
if (!res.ok) {
45+
console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`)
46+
return { jobs: [], total: 0, offset: 0 }
47+
}
48+
const data = zJobsListResponse.parse(await res.json())
49+
return { jobs: data.jobs, total: data.pagination.total, offset }
50+
} catch (error) {
51+
console.error('[Jobs API] Error fetching jobs:', error)
52+
return { jobs: [], total: 0, offset: 0 }
53+
}
54+
}
55+
56+
// Large offset to ensure running/pending jobs sort above history
57+
const QUEUE_PRIORITY_BASE = 1_000_000
58+
59+
/**
60+
* Assigns synthetic priority to jobs.
61+
* Only assigns if job doesn't already have a server-provided priority.
62+
*/
63+
function assignPriority(
64+
jobs: RawJobListItem[],
65+
basePriority: number
66+
): JobListItem[] {
67+
return jobs.map((job, index) => ({
68+
...job,
69+
priority: job.priority ?? basePriority - index
70+
}))
71+
}
72+
73+
/**
74+
* Fetches history (completed jobs)
75+
* Assigns synthetic priority starting from total (lower than queue jobs).
76+
*/
77+
export async function fetchHistory(
78+
fetchApi: (url: string) => Promise<Response>,
79+
maxItems: number = 200,
80+
offset: number = 0
81+
): Promise<JobListItem[]> {
82+
const { jobs, total } = await fetchJobsRaw(
83+
fetchApi,
84+
['completed'],
85+
maxItems,
86+
offset
87+
)
88+
// History gets priority based on total count (lower than queue)
89+
return assignPriority(jobs, total - offset)
90+
}
91+
92+
/**
93+
* Fetches queue (in_progress + pending jobs)
94+
* Pending jobs get highest priority, then running jobs.
95+
*/
96+
export async function fetchQueue(
97+
fetchApi: (url: string) => Promise<Response>
98+
): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> {
99+
const { jobs } = await fetchJobsRaw(
100+
fetchApi,
101+
['in_progress', 'pending'],
102+
200,
103+
0
104+
)
105+
106+
const running = jobs.filter((j) => j.status === 'in_progress')
107+
const pending = jobs.filter((j) => j.status === 'pending')
108+
109+
// Pending gets highest priority, then running
110+
// Both are above any history job due to QUEUE_PRIORITY_BASE
111+
return {
112+
Running: assignPriority(running, QUEUE_PRIORITY_BASE + running.length),
113+
Pending: assignPriority(
114+
pending,
115+
QUEUE_PRIORITY_BASE + running.length + pending.length
116+
)
117+
}
118+
}
119+
120+
// ============================================================================
121+
// Job Detail Fetcher
122+
// ============================================================================
123+
124+
/**
125+
* Fetches full job details from /jobs/{job_id}
126+
*/
127+
export async function fetchJobDetail(
128+
fetchApi: (url: string) => Promise<Response>,
129+
promptId: PromptId
130+
): Promise<JobDetail | undefined> {
131+
try {
132+
const res = await fetchApi(`/jobs/${promptId}`)
133+
134+
if (!res.ok) {
135+
console.warn(`Job not found for prompt ${promptId}`)
136+
return undefined
137+
}
138+
139+
return zJobDetail.parse(await res.json())
140+
} catch (error) {
141+
console.error(`Failed to fetch job detail for prompt ${promptId}:`, error)
142+
return undefined
143+
}
144+
}
145+
146+
/**
147+
* Extracts workflow from job detail response.
148+
* The workflow is nested at: workflow.extra_data.extra_pnginfo.workflow
149+
*/
150+
export function extractWorkflow(
151+
job: JobDetail | undefined
152+
): ComfyWorkflowJSON | undefined {
153+
// Cast is safe - workflow will be validated by loadGraphData -> validateComfyWorkflow
154+
const workflowData = job?.workflow as
155+
| { extra_data?: { extra_pnginfo?: { workflow?: unknown } } }
156+
| undefined
157+
return workflowData?.extra_data?.extra_pnginfo?.workflow as
158+
| ComfyWorkflowJSON
159+
| undefined
160+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/**
2+
* @fileoverview Jobs API module
3+
* @module platform/remote/comfyui/jobs
4+
*
5+
* Unified jobs API for history, queue, and job details.
6+
*/
7+
8+
export {
9+
extractWorkflow,
10+
fetchHistory,
11+
fetchJobDetail,
12+
fetchQueue
13+
} from './fetchers/fetchJobs'
14+
export type { JobDetail, JobListItem } from './types/jobTypes'
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* @fileoverview Jobs API types - Backend job API format
3+
* @module platform/remote/comfyui/jobs/types/jobTypes
4+
*
5+
* These types represent the jobs API format returned by the backend.
6+
* Jobs API provides a memory-optimized alternative to history API.
7+
*/
8+
9+
import { z } from 'zod'
10+
11+
import { resultItemType, zTaskOutput } from '@/schemas/apiSchema'
12+
13+
// ============================================================================
14+
// Zod Schemas
15+
// ============================================================================
16+
17+
const zJobStatus = z.enum([
18+
'pending',
19+
'in_progress',
20+
'completed',
21+
'failed',
22+
'cancelled'
23+
])
24+
25+
const zPreviewOutput = z
26+
.object({
27+
filename: z.string(),
28+
subfolder: z.string(),
29+
type: resultItemType
30+
})
31+
.passthrough() // Allow extra fields like nodeId, mediaType
32+
33+
/**
34+
* Execution error details for error jobs.
35+
* Contains the same structure as ExecutionErrorWsMessage from WebSocket.
36+
*/
37+
const zExecutionError = z
38+
.object({
39+
prompt_id: z.string().optional(),
40+
timestamp: z.number().optional(),
41+
node_id: z.string(),
42+
node_type: z.string(),
43+
executed: z.array(z.string()).optional(),
44+
exception_message: z.string(),
45+
exception_type: z.string(),
46+
traceback: z.array(z.string()),
47+
current_inputs: z.unknown(),
48+
current_outputs: z.unknown()
49+
})
50+
.passthrough()
51+
52+
export type ExecutionError = z.infer<typeof zExecutionError>
53+
54+
/**
55+
* Raw job from API - uses passthrough to allow extra fields
56+
*/
57+
const zRawJobListItem = z
58+
.object({
59+
id: z.string(),
60+
status: zJobStatus,
61+
create_time: z.number(),
62+
execution_start_time: z.number().nullable().optional(),
63+
execution_end_time: z.number().nullable().optional(),
64+
preview_output: zPreviewOutput.nullable().optional(),
65+
outputs_count: z.number().optional(),
66+
execution_error: zExecutionError.nullable().optional(),
67+
workflow_id: z.string().nullable().optional(),
68+
priority: z.number().optional()
69+
})
70+
.passthrough()
71+
72+
/**
73+
* Job detail - returned by GET /api/jobs/{job_id} (detail endpoint)
74+
* Includes full workflow and outputs for re-execution and downloads
75+
*/
76+
export const zJobDetail = zRawJobListItem
77+
.extend({
78+
workflow: z.unknown().optional(),
79+
outputs: zTaskOutput.optional(),
80+
update_time: z.number().optional(),
81+
execution_status: z.unknown().optional(),
82+
execution_meta: z.unknown().optional()
83+
})
84+
.passthrough()
85+
86+
/**
87+
* Pagination info from API
88+
*/
89+
const zPaginationInfo = z
90+
.object({
91+
offset: z.number(),
92+
limit: z.number(),
93+
total: z.number(),
94+
has_more: z.boolean()
95+
})
96+
.passthrough()
97+
98+
/**
99+
* Jobs list response structure
100+
*/
101+
export const zJobsListResponse = z
102+
.object({
103+
jobs: z.array(zRawJobListItem),
104+
pagination: zPaginationInfo
105+
})
106+
.passthrough()
107+
108+
// ============================================================================
109+
// TypeScript Types (derived from Zod schemas)
110+
// ============================================================================
111+
112+
export type JobStatus = z.infer<typeof zJobStatus>
113+
export type RawJobListItem = z.infer<typeof zRawJobListItem>
114+
/** Job list item with priority always set (server-provided or synthetic) */
115+
export type JobListItem = RawJobListItem & { priority: number }
116+
export type JobDetail = z.infer<typeof zJobDetail>

0 commit comments

Comments
 (0)