Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 55 additions & 12 deletions packages/payload/src/queues/localAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { jobAfterRead, jobsCollectionSlug } from './config/index.js'
import { runJobs } from './operations/runJobs/index.js'
import { updateJob, updateJobs } from './utilities/updateJob.js'

export const getJobsLocalAPI = (payload: Payload) => ({
queue: async <
export const getJobsLocalAPI = (payload: Payload) => {
const queue = async <
// eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents
TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'],
>(
Expand Down Expand Up @@ -95,9 +95,9 @@ export const getJobsLocalAPI = (payload: Payload) => ({
}),
}) as unknown as ReturnType
}
},
}

run: async (args?: {
const run = async (args?: {
limit?: number
overrideAccess?: boolean
/**
Expand Down Expand Up @@ -126,23 +126,25 @@ export const getJobsLocalAPI = (payload: Payload) => ({
sequential: args?.sequential,
where: args?.where,
})
},
}

runByID: async (args: {
const runByID = async (args: {
id: number | string
overrideAccess?: boolean
req?: PayloadRequest
returnJob?: boolean
}): Promise<ReturnType<typeof runJobs>> => {
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))

return await runJobs({
id: args.id,
overrideAccess: args.overrideAccess !== false,
req: newReq,
returnJobs: args.returnJob,
})
},
}

cancel: async (args: {
const cancel = async (args: {
overrideAccess?: boolean
queue?: string
req?: PayloadRequest
Expand Down Expand Up @@ -191,9 +193,9 @@ export const getJobsLocalAPI = (payload: Payload) => ({
returning: false,
where: { and },
})
},
}

cancelByID: async (args: {
const cancelByID = async (args: {
id: number | string
overrideAccess?: boolean
req?: PayloadRequest
Expand All @@ -219,5 +221,46 @@ export const getJobsLocalAPI = (payload: Payload) => ({
req: newReq,
returning: false,
})
},
})
}

const queueAndRun = async <
// eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents
TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'],
>(
args:
| {
input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input']
queue?: string
req?: PayloadRequest
task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never
waitUntil?: Date
workflow?: never
}
| {
input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input']
queue?: string
req?: PayloadRequest
task?: never
waitUntil?: Date
workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? TTaskOrWorkflowSlug
: never
},
): Promise<
TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>
> => {
const queued = await queue<TTaskOrWorkflowSlug>(args as Parameters<typeof queue>[0])

const result = await runByID({ id: queued.id, req: args.req, returnJob: true })

type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>

return result.jobs?.[0] as ReturnType
}

return { cancel, cancelByID, queue, queueAndRun, run, runByID }
}
32 changes: 30 additions & 2 deletions packages/payload/src/queues/operations/runJobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export type RunJobsArgs = {
processingOrder?: Sort
queue?: string
req: PayloadRequest
/**
* Return the processed job documents. Defaults to `false` to maintain
* backward compatibility.
*/
returnJobs?: boolean
/**
* By default, jobs are run in parallel.
* If you want to run them in sequence, set this to true.
Expand All @@ -44,6 +49,10 @@ export type RunJobsArgs = {
}

export type RunJobsResult = {
/**
* The jobs that were processed. Only returned when `returnJobs` is `true`.
*/
jobs?: BaseJob[]
jobStatus?: Record<string, RunJobResult>
/**
* If this is false, there for sure are no jobs remaining, regardless of the limit
Expand All @@ -63,6 +72,7 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
processingOrder,
queue,
req,
returnJobs,
sequential,
where: whereFromProps,
} = args
Expand Down Expand Up @@ -125,6 +135,8 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
docs: BaseJob[]
} = { docs: [] }

const processedJobs: BaseJob[] = []

if (id) {
// Only one job to run
jobsQuery.docs = [
Expand Down Expand Up @@ -190,10 +202,16 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
)

if (!jobsQuery.docs.length) {
return {
const result: RunJobsResult = {
noJobsRemaining: true,
remainingJobsFromQueried: 0,
}

if (returnJobs) {
result.jobs = []
}

return result
}

if (jobsQuery?.docs?.length) {
Expand Down Expand Up @@ -273,6 +291,8 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
jobsToDelete.push(job.id)
}

processedJobs.push(job)

return { id: job.id, result }
} else {
const result = await runJSONJob({
Expand All @@ -287,6 +307,8 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
jobsToDelete.push(job.id)
}

processedJobs.push(job)

return { id: job.id, result }
}
}
Expand Down Expand Up @@ -343,8 +365,14 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
}

return {
const returnValue: RunJobsResult = {
jobStatus: resultsObject,
remainingJobsFromQueried,
}

if (returnJobs) {
returnValue.jobs = processedJobs
}

return returnValue
}