Skip to content

Commit 09246a4

Browse files
authored
feat: add payload.jobs.runByID (#9875)
1 parent da6bc55 commit 09246a4

File tree

4 files changed

+99
-12
lines changed

4 files changed

+99
-12
lines changed

docs/jobs-queue/queues.mdx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,23 @@ After the project is deployed to Vercel, the Vercel Cron job will automatically
9898

9999
If you want to process jobs programmatically from your server-side code, you can use the Local API:
100100

101+
**Run all jobs:**
102+
101103
```ts
102104
const results = await payload.jobs.run()
103105

104106
// You can customize the queue name and limit by passing them as arguments:
105107
await payload.jobs.run({ queue: 'nightly', limit: 100 })
106108
```
107109

110+
**Run a single job:**
111+
112+
```ts
113+
const results = await payload.jobs.runByID({
114+
id: myJobID
115+
})
116+
```
117+
108118
#### Bin script
109119

110120
Finally, you can process jobs via the bin script that comes with Payload out of the box.

packages/payload/src/queues/localAPI.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,18 @@ export const getJobsLocalAPI = (payload: Payload) => ({
8080
})
8181
return result
8282
},
83+
84+
runByID: async (args: {
85+
id: number | string
86+
overrideAccess?: boolean
87+
req?: PayloadRequest
88+
}): Promise<ReturnType<typeof runJobs>> => {
89+
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
90+
const result = await runJobs({
91+
id: args.id,
92+
overrideAccess: args?.overrideAccess !== false,
93+
req: newReq,
94+
})
95+
return result
96+
},
8397
})

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import { runJob } from './runJob/index.js'
1717
import { runJSONJob } from './runJSONJob/index.js'
1818

1919
export type RunJobsArgs = {
20+
/**
21+
* ID of the job to run
22+
*/
23+
id?: number | string
2024
limit?: number
2125
overrideAccess?: boolean
2226
queue?: string
@@ -36,6 +40,7 @@ export type RunJobsResult = {
3640
}
3741

3842
export const runJobs = async ({
43+
id,
3944
limit = 10,
4045
overrideAccess,
4146
queue,
@@ -91,18 +96,36 @@ export const runJobs = async ({
9196

9297
// Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of
9398
// the same job being picked up by another worker
94-
const jobsQuery = (await req.payload.update({
95-
collection: 'payload-jobs',
96-
data: {
97-
processing: true,
98-
seenByWorker: true,
99-
},
100-
depth: req.payload.config.jobs.depth,
101-
disableTransaction: true,
102-
limit,
103-
showHiddenFields: true,
104-
where,
105-
})) as unknown as PaginatedDocs<BaseJob>
99+
const jobsQuery: {
100+
docs: BaseJob[]
101+
} = id
102+
? {
103+
docs: [
104+
(await req.payload.update({
105+
id,
106+
collection: 'payload-jobs',
107+
data: {
108+
processing: true,
109+
seenByWorker: true,
110+
},
111+
depth: req.payload.config.jobs.depth,
112+
disableTransaction: true,
113+
showHiddenFields: true,
114+
})) as BaseJob,
115+
],
116+
}
117+
: ((await req.payload.update({
118+
collection: 'payload-jobs',
119+
data: {
120+
processing: true,
121+
seenByWorker: true,
122+
},
123+
depth: req.payload.config.jobs.depth,
124+
disableTransaction: true,
125+
limit,
126+
showHiddenFields: true,
127+
where,
128+
})) as unknown as PaginatedDocs<BaseJob>)
106129

107130
/**
108131
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).

test/queues/int.spec.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,4 +925,44 @@ describe('Queues', () => {
925925
expect(allSimples.totalDocs).toBe(1)
926926
expect(allSimples.docs[0].title).toBe('externalWorkflow')
927927
})
928+
929+
it('ensure payload.jobs.runByID works and only runs the specified job', async () => {
930+
payload.config.jobs.deleteJobOnComplete = false
931+
932+
let lastJobID: string = null
933+
for (let i = 0; i < 3; i++) {
934+
const job = await payload.jobs.queue({
935+
task: 'CreateSimple',
936+
input: {
937+
message: 'from single task',
938+
},
939+
})
940+
lastJobID = job.id
941+
}
942+
943+
await payload.jobs.runByID({
944+
id: lastJobID,
945+
})
946+
947+
const allSimples = await payload.find({
948+
collection: 'simple',
949+
limit: 100,
950+
})
951+
952+
expect(allSimples.totalDocs).toBe(1)
953+
expect(allSimples.docs[0].title).toBe('from single task')
954+
955+
const allCompletedJobs = await payload.find({
956+
collection: 'payload-jobs',
957+
limit: 100,
958+
where: {
959+
completedAt: {
960+
exists: true,
961+
},
962+
},
963+
})
964+
965+
expect(allCompletedJobs.totalDocs).toBe(1)
966+
expect(allCompletedJobs.docs[0].id).toBe(lastJobID)
967+
})
928968
})

0 commit comments

Comments
 (0)