Skip to content

Commit ad2564e

Browse files
authored
fix: ensure scheduling by default only handles default queue, add allQueues config to autoRun (#13395)
By default, `payload.jobs.run` only runs jobs from the `default` queue (since #12799). It exposes an `allQueues` property to run jobs from all queues. For handling schedules (`payload.jobs.handleSchedules` and `config.jobs.autoRun`), this behaves differently - jobs are run from all queues by default, and no `allQueues` property exists. This PR adds an `allQueues` property to scheduling, as well as changes the default behavior to only handle schedules for the `default` queue. That way, the behavior of running and scheduling jobs matches. --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1210982048221260
1 parent 995f96b commit ad2564e

File tree

9 files changed

+93
-15
lines changed

9 files changed

+93
-15
lines changed

packages/payload/src/fields/baseFields/baseIDField.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export const baseIDField: TextField = {
1414
defaultValue: () => new ObjectId().toHexString(),
1515
hooks: {
1616
beforeChange: [({ value }) => value || new ObjectId().toHexString()],
17+
// ID field values for arrays and blocks need to be unique when duplicating, as on postgres they are stored on the same table as primary keys.
1718
beforeDuplicate: [() => new ObjectId().toHexString()],
1819
},
1920
label: 'ID',

packages/payload/src/fields/hooks/beforeDuplicate/promise.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ export const promise = async <T>({
6363
let fieldData = siblingDoc?.[field.name!]
6464
const fieldIsLocalized = localization && fieldShouldBeLocalized({ field, parentIsLocalized })
6565

66-
// Run field beforeDuplicate hooks
66+
// Run field beforeDuplicate hooks.
67+
// These hooks are responsible for resetting the `id` field values of array and block rows. See `baseIDField`.
6768
if (Array.isArray(field.hooks?.beforeDuplicate)) {
6869
if (fieldIsLocalized) {
6970
const localeData: JsonObject = {}

packages/payload/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,7 @@ export class BasePayload {
873873
this.config.jobs.scheduling
874874
) {
875875
await this.jobs.handleSchedules({
876+
allQueues: cronConfig.allQueues,
876877
queue: cronConfig.queue,
877878
})
878879
}
@@ -891,6 +892,7 @@ export class BasePayload {
891892
}
892893

893894
await this.jobs.run({
895+
allQueues: cronConfig.allQueues,
894896
limit: cronConfig.limit ?? DEFAULT_LIMIT,
895897
queue: cronConfig.queue,
896898
silent: cronConfig.silent,

packages/payload/src/queues/config/types/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ import type { TaskConfig } from './taskTypes.js'
77
import type { WorkflowConfig } from './workflowTypes.js'
88

99
export type AutorunCronConfig = {
10+
/**
11+
* If you want to autoRUn jobs from all queues, set this to true.
12+
* If you set this to true, the `queue` property will be ignored.
13+
*
14+
* @default false
15+
*/
16+
allQueues?: boolean
1017
/**
1118
* The cron schedule for the job.
1219
* @default '* * * * *' (every minute).
@@ -43,6 +50,8 @@ export type AutorunCronConfig = {
4350
limit?: number
4451
/**
4552
* The queue name for the job.
53+
*
54+
* @default 'default'
4655
*/
4756
queue?: string
4857
/**

packages/payload/src/queues/endpoints/handleSchedules.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,18 @@ export const handleSchedulesJobsEndpoint: Endpoint = {
4545
)
4646
}
4747

48-
const { queue } = req.query as {
48+
const { allQueues, queue } = req.query as {
49+
allQueues?: 'false' | 'true'
4950
queue?: string
5051
}
5152

52-
const { errored, queued, skipped } = await handleSchedules({ queue, req })
53+
const runAllQueues = allQueues && !(typeof allQueues === 'string' && allQueues === 'false')
54+
55+
const { errored, queued, skipped } = await handleSchedules({
56+
allQueues: runAllQueues,
57+
queue,
58+
req,
59+
})
5360

5461
return Response.json(
5562
{

packages/payload/src/queues/endpoints/run.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export const runJobsEndpoint: Endpoint = {
5656

5757
if (shouldHandleSchedules && jobsConfig.scheduling) {
5858
// If should handle schedules and schedules are defined
59-
await req.payload.jobs.handleSchedules({ queue: runAllQueues ? undefined : queue, req })
59+
await req.payload.jobs.handleSchedules({ allQueues: runAllQueues, queue, req })
6060
}
6161

6262
const runJobsArgs: RunJobsArgs = {

packages/payload/src/queues/localAPI.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,28 @@ export type RunJobsSilent =
2222
| boolean
2323
export const getJobsLocalAPI = (payload: Payload) => ({
2424
handleSchedules: async (args?: {
25+
/**
26+
* If you want to schedule jobs from all queues, set this to true.
27+
* If you set this to true, the `queue` property will be ignored.
28+
*
29+
* @default false
30+
*/
31+
allQueues?: boolean
2532
// By default, schedule all queues - only scheduling jobs scheduled to be added to the `default` queue would not make sense
2633
// here, as you'd usually specify a different queue than `default` here, especially if this is used in combination with autorun.
2734
// The `queue` property for setting up schedules is required, and not optional.
2835
/**
2936
* If you want to only schedule jobs that are set to schedule in a specific queue, set this to the queue name.
3037
*
31-
* @default all jobs for all queues will be scheduled.
38+
* @default jobs from the `default` queue will be executed.
3239
*/
3340
queue?: string
3441
req?: PayloadRequest
3542
}): Promise<HandleSchedulesResult> => {
3643
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
3744

3845
return await handleSchedules({
46+
allQueues: args?.allQueues,
3947
queue: args?.queue,
4048
req: newReq,
4149
})

packages/payload/src/queues/operations/handleSchedules/index.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,26 @@ export type HandleSchedulesResult = {
2323
* after they are scheduled
2424
*/
2525
export async function handleSchedules({
26-
queue,
26+
allQueues = false,
27+
queue: _queue,
2728
req,
2829
}: {
30+
/**
31+
* If you want to schedule jobs from all queues, set this to true.
32+
* If you set this to true, the `queue` property will be ignored.
33+
*
34+
* @default false
35+
*/
36+
allQueues?: boolean
2937
/**
3038
* If you want to only schedule jobs that are set to schedule in a specific queue, set this to the queue name.
3139
*
32-
* @default all jobs for all queues will be scheduled.
40+
* @default jobs from the `default` queue will be executed.
3341
*/
3442
queue?: string
3543
req: PayloadRequest
3644
}): Promise<HandleSchedulesResult> {
45+
const queue = _queue ?? 'default'
3746
const jobsConfig = req.payload.config.jobs
3847
const queuesWithSchedules = getQueuesWithSchedules({
3948
jobsConfig,
@@ -53,7 +62,7 @@ export async function handleSchedules({
5362
// Need to know when that particular job was last scheduled in that particular queue
5463

5564
for (const [queueName, { schedules }] of Object.entries(queuesWithSchedules)) {
56-
if (queue && queueName !== queue) {
65+
if (!allQueues && queueName !== queue) {
5766
// If a queue is specified, only schedule jobs for that queue
5867
continue
5968
}

test/queues/schedules.int.spec.ts

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,28 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
6969

7070
it('can auto-schedule through local API and autorun jobs', async () => {
7171
// Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here
72-
await payload.jobs.handleSchedules()
72+
await payload.jobs.handleSchedules({ queue: 'autorunSecond' })
73+
74+
// Do not call payload.jobs.run{silent: true})
75+
76+
await waitUntilAutorunIsDone({
77+
payload,
78+
queue: 'autorunSecond',
79+
onlyScheduled: true,
80+
})
81+
82+
const allSimples = await payload.find({
83+
collection: 'simple',
84+
limit: 100,
85+
})
86+
87+
expect(allSimples.totalDocs).toBe(1)
88+
expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second')
89+
})
90+
91+
it('can auto-schedule through local API and autorun jobs when passing allQueues', async () => {
92+
// Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here
93+
await payload.jobs.handleSchedules({ queue: 'autorunSecond', allQueues: true })
7394

7495
// Do not call payload.jobs.run{silent: true})
7596

@@ -88,9 +109,29 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
88109
expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second')
89110
})
90111

112+
it('should not auto-schedule through local API and autorun jobs when not passing queue and schedule is not set on the default queue', async () => {
113+
// Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here
114+
await payload.jobs.handleSchedules()
115+
116+
// Do not call payload.jobs.run{silent: true})
117+
118+
await waitUntilAutorunIsDone({
119+
payload,
120+
queue: 'autorunSecond',
121+
onlyScheduled: true,
122+
})
123+
124+
const allSimples = await payload.find({
125+
collection: 'simple',
126+
limit: 100,
127+
})
128+
129+
expect(allSimples.totalDocs).toBe(0)
130+
})
131+
91132
it('can auto-schedule through handleSchedules REST API and autorun jobs', async () => {
92133
// Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here
93-
await restClient.GET('/payload-jobs/handle-schedules', {
134+
await restClient.GET('/payload-jobs/handle-schedules?queue=autorunSecond', {
94135
headers: {
95136
Authorization: `JWT ${token}`,
96137
},
@@ -115,7 +156,7 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
115156

116157
it('can auto-schedule through run REST API and autorun jobs', async () => {
117158
// Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here
118-
await restClient.GET('/payload-jobs/run?silent=true', {
159+
await restClient.GET('/payload-jobs/run?silent=true&allQueues=true', {
119160
headers: {
120161
Authorization: `JWT ${token}`,
121162
},
@@ -161,7 +202,7 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
161202
it('ensure scheduler does not schedule more jobs than needed if executed sequentially', async () => {
162203
await withoutAutoRun(async () => {
163204
for (let i = 0; i < 3; i++) {
164-
await payload.jobs.handleSchedules()
205+
await payload.jobs.handleSchedules({ allQueues: true })
165206
}
166207
})
167208

@@ -192,7 +233,7 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
192233
})
193234
}
194235
for (let i = 0; i < 3; i++) {
195-
await payload.jobs.handleSchedules()
236+
await payload.jobs.handleSchedules({ allQueues: true })
196237
}
197238
})
198239

@@ -271,8 +312,8 @@ describe('Queues - scheduling, without automatic scheduling handling', () => {
271312
for (let i = 0; i < 3; i++) {
272313
await withoutAutoRun(async () => {
273314
// Call it twice to test that it only schedules one
274-
await payload.jobs.handleSchedules()
275-
await payload.jobs.handleSchedules()
315+
await payload.jobs.handleSchedules({ allQueues: true })
316+
await payload.jobs.handleSchedules({ allQueues: true })
276317
})
277318
// Advance time to satisfy the waitUntil of newly scheduled jobs
278319
timeTravel(20)

0 commit comments

Comments
 (0)