Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* @fileoverview Jobs API Fetchers
* @module platform/remote/comfyui/jobs/fetchers/fetchJobs
*
* Unified jobs API fetcher for history, queue, and job details.
* All distributions use the /jobs endpoint.
*/

import type { ComfyWorkflowJSON } from '@/platform/workflow/validation/schemas/workflowSchema'
import type { PromptId } from '@/schemas/apiSchema'

import type {
JobDetail,
JobListItem,
JobStatus,
RawJobListItem
} from '../types/jobTypes'
import { zJobDetail, zJobsListResponse } from '../types/jobTypes'

// ============================================================================
// Job List Fetchers
// ============================================================================

interface FetchJobsRawResult {
jobs: RawJobListItem[]
total: number
offset: number
}

/**
* Fetches raw jobs from /jobs endpoint
* @internal
*/
async function fetchJobsRaw(
fetchApi: (url: string) => Promise<Response>,
statuses: JobStatus[],
maxItems: number = 200,
offset: number = 0
): Promise<FetchJobsRawResult> {
const statusParam = statuses.join(',')
const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}`
try {
const res = await fetchApi(url)
if (!res.ok) {
console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`)
return { jobs: [], total: 0, offset: 0 }
}
const data = zJobsListResponse.parse(await res.json())
return { jobs: data.jobs, total: data.pagination.total, offset }
} catch (error) {
console.error('[Jobs API] Error fetching jobs:', error)
return { jobs: [], total: 0, offset: 0 }
}
}

// Large offset to ensure running/pending jobs sort above history
const QUEUE_PRIORITY_BASE = 1_000_000

/**
* Assigns synthetic priority to jobs.
* Only assigns if job doesn't already have a server-provided priority.
*/
function assignPriority(
jobs: RawJobListItem[],
basePriority: number
): JobListItem[] {
return jobs.map((job, index) => ({
...job,
priority: job.priority ?? basePriority - index
}))
}

/**
* Fetches history (completed jobs)
* Assigns synthetic priority starting from total (lower than queue jobs).
*/
export async function fetchHistory(
fetchApi: (url: string) => Promise<Response>,
maxItems: number = 200,
offset: number = 0
): Promise<JobListItem[]> {
const { jobs, total } = await fetchJobsRaw(
fetchApi,
['completed'],
maxItems,
offset
)
// History gets priority based on total count (lower than queue)
return assignPriority(jobs, total - offset)
}

/**
* Fetches queue (in_progress + pending jobs)
* Pending jobs get highest priority, then running jobs.
*/
export async function fetchQueue(
fetchApi: (url: string) => Promise<Response>
): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> {
const { jobs } = await fetchJobsRaw(
fetchApi,
['in_progress', 'pending'],
200,
0
)

const running = jobs.filter((j) => j.status === 'in_progress')
const pending = jobs.filter((j) => j.status === 'pending')

// Pending gets highest priority, then running
// Both are above any history job due to QUEUE_PRIORITY_BASE
return {
Running: assignPriority(running, QUEUE_PRIORITY_BASE + running.length),
Pending: assignPriority(
pending,
QUEUE_PRIORITY_BASE + running.length + pending.length
)
}
}

// ============================================================================
// Job Detail Fetcher
// ============================================================================

/**
* Fetches full job details from /jobs/{job_id}
*/
export async function fetchJobDetail(
fetchApi: (url: string) => Promise<Response>,
promptId: PromptId
): Promise<JobDetail | undefined> {
try {
const res = await fetchApi(`/jobs/${promptId}`)

if (!res.ok) {
console.warn(`Job not found for prompt ${promptId}`)
return undefined
}

return zJobDetail.parse(await res.json())
} catch (error) {
console.error(`Failed to fetch job detail for prompt ${promptId}:`, error)
return undefined
}
}

/**
* Extracts workflow from job detail response.
* The workflow is nested at: workflow.extra_data.extra_pnginfo.workflow
*/
export function extractWorkflow(
job: JobDetail | undefined
): ComfyWorkflowJSON | undefined {
// Cast is safe - workflow will be validated by loadGraphData -> validateComfyWorkflow
const workflowData = job?.workflow as
| { extra_data?: { extra_pnginfo?: { workflow?: unknown } } }
| undefined
return workflowData?.extra_data?.extra_pnginfo?.workflow as
| ComfyWorkflowJSON
| undefined
}
Comment on lines +150 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider type guards or Zod for safer workflow extraction.

The type casts bypass TypeScript's type checking. While the comment explains downstream validation will catch issues, a runtime type check here would provide earlier error detection and better type safety.

Consider using a type guard or Zod schema:

+import { zComfyWorkflow } from '@/platform/workflow/validation/schemas/workflowSchema'
+
 export function extractWorkflow(
   job: JobDetail | undefined
 ): ComfyWorkflowJSON | undefined {
-  // Cast is safe - workflow will be validated by loadGraphData -> validateComfyWorkflow
-  const workflowData = job?.workflow as
-    | { extra_data?: { extra_pnginfo?: { workflow?: unknown } } }
-    | undefined
-  return workflowData?.extra_data?.extra_pnginfo?.workflow as
-    | ComfyWorkflowJSON
-    | undefined
+  const workflowData = job?.workflow
+  if (
+    typeof workflowData === 'object' &&
+    workflowData !== null &&
+    'extra_data' in workflowData
+  ) {
+    const extraData = (workflowData as Record<string, unknown>).extra_data
+    if (typeof extraData === 'object' && extraData !== null && 'extra_pnginfo' in extraData) {
+      const pnginfo = (extraData as Record<string, unknown>).extra_pnginfo
+      if (typeof pnginfo === 'object' && pnginfo !== null && 'workflow' in pnginfo) {
+        return (pnginfo as Record<string, unknown>).workflow as ComfyWorkflowJSON | undefined
+      }
+    }
+  }
+  return undefined
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +24 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Fetchers, priority logic, and error handling look solid with a couple of trade‑offs to be aware of

Overall the implementation matches the design goals well:

  • fetchJobsRaw centralizes URL building and response validation, and safely degrades to empty results on any HTTP/parse error.
  • assignPriority’s “only synthesize when priority is nullish” behavior cleanly preserves server ordering while still giving you a total/offset‑based fallback.
  • fetchHistory and fetchQueue cooperate to ensure the intended ordering (Pending > Running > History), and the total - offset base in history should keep priorities consistent across pages.
  • fetchJobDetail and extractWorkflow give callers a simple undefined contract on failure while logging enough context for debugging.

Two non‑blocking considerations:

  • QUEUE_PRIORITY_BASE = 1_000_000 assumes the maximum history priority will remain below that; if very large installations could exceed this, you might eventually want to derive the base from an upper bound (e.g., a config or Number.MAX_SAFE_INTEGER margin).
  • All failures in fetchJobsRaw surface to callers as “no jobs” rather than a hard error; if future UX needs to distinguish “empty history/queue” from “failed to load”, you may want a variant that returns an error flag alongside the jobs.

Nothing here looks blocking; the current behavior is coherent and in line with the PR scope.

🤖 Prompt for AI Agents
In src/platform/remote/comfyui/jobs/fetchers/fetchJobs.ts around lines 24-160,
address the two reviewer suggestions: (1) Replace the hardcoded
QUEUE_PRIORITY_BASE with a configurable or derived value — e.g., read from
config or compute from a safe upper bound (Number.MAX_SAFE_INTEGER minus a
margin) and use that variable where QUEUE_PRIORITY_BASE is currently referenced;
(2) Make fetchJobsRaw distinguish between “empty result” and “fetch/parse error”
by returning an additional error flag or an Error object (or by throwing) so
callers can opt into treating failures differently from empty lists; update
callers (fetchHistory/fetchQueue) to handle the new return shape or catch the
thrown error accordingly.

13 changes: 13 additions & 0 deletions src/platform/remote/comfyui/jobs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* @fileoverview Jobs API module
* @module platform/remote/comfyui/jobs
*
* Unified jobs API for history, queue, and job details.
*/

export {
extractWorkflow,
fetchHistory,
fetchJobDetail,
fetchQueue
} from './fetchers/fetchJobs'
114 changes: 114 additions & 0 deletions src/platform/remote/comfyui/jobs/types/jobTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* @fileoverview Jobs API types - Backend job API format
* @module platform/remote/comfyui/jobs/types/jobTypes
*
* These types represent the jobs API format returned by the backend.
* Jobs API provides a memory-optimized alternative to history API.
*/

import { z } from 'zod'

import { resultItemType, zTaskOutput } from '@/schemas/apiSchema'

// ============================================================================
// Zod Schemas
// ============================================================================

const zJobStatus = z.enum([
'pending',
'in_progress',
'completed',
'failed',
'cancelled'
])

const zPreviewOutput = z
.object({
filename: z.string(),
subfolder: z.string(),
type: resultItemType
})
.passthrough() // Allow extra fields like nodeId, mediaType

/**
* Execution error details for error jobs.
* Contains the same structure as ExecutionErrorWsMessage from WebSocket.
*/
const zExecutionError = z
.object({
prompt_id: z.string().optional(),
timestamp: z.number().optional(),
node_id: z.string(),
node_type: z.string(),
executed: z.array(z.string()).optional(),
exception_message: z.string(),
exception_type: z.string(),
traceback: z.array(z.string()),
current_inputs: z.unknown(),
current_outputs: z.unknown()
})
.passthrough()

/**
* Raw job from API - uses passthrough to allow extra fields
*/
const zRawJobListItem = z
.object({
id: z.string(),
status: zJobStatus,
create_time: z.number(),
execution_start_time: z.number().nullable().optional(),
execution_end_time: z.number().nullable().optional(),
preview_output: zPreviewOutput.nullable().optional(),
outputs_count: z.number().optional(),
execution_error: zExecutionError.nullable().optional(),
workflow_id: z.string().nullable().optional(),
priority: z.number().optional()
})
.passthrough()

/**
* Job detail - returned by GET /api/jobs/{job_id} (detail endpoint)
* Includes full workflow and outputs for re-execution and downloads
*/
export const zJobDetail = zRawJobListItem
.extend({
workflow: z.unknown().optional(),
outputs: zTaskOutput.optional(),
update_time: z.number().optional(),
execution_status: z.unknown().optional(),
execution_meta: z.unknown().optional()
})
.passthrough()

/**
* Pagination info from API
*/
const zPaginationInfo = z
.object({
offset: z.number(),
limit: z.number(),
total: z.number(),
has_more: z.boolean()
})
.passthrough()

/**
* Jobs list response structure
*/
export const zJobsListResponse = z
.object({
jobs: z.array(zRawJobListItem),
pagination: zPaginationInfo
})
.passthrough()

// ============================================================================
// TypeScript Types (derived from Zod schemas)
// ============================================================================

export type JobStatus = z.infer<typeof zJobStatus>
export type RawJobListItem = z.infer<typeof zRawJobListItem>
/** Job list item with priority always set (server-provided or synthetic) */
export type JobListItem = RawJobListItem & { priority: number }
export type JobDetail = z.infer<typeof zJobDetail>
Loading