Skip to content

Commit bc42fc8

Browse files
committed
Refactor so registerAndBackfillItems can use prepared statement
1 parent 5a305b9 commit bc42fc8

File tree

1 file changed

+47
-17
lines changed

1 file changed

+47
-17
lines changed

src/cron.ts

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,15 @@ async function scheduleCronJobs(
117117
jobsAndIdentifiers: JobAndCronIdentifier[],
118118
ts: string,
119119
useNodeTime: boolean,
120+
workerSchema: string,
121+
preparedStatements: boolean,
120122
) {
121123
// TODO: refactor this to use `add_jobs`
122124

123125
// Note that `identifier` is guaranteed to be unique for every record
124126
// in `specs`.
125-
await pgPool.query(
126-
`
127+
await pgPool.query({
128+
text: `
127129
with specs as (
128130
select
129131
index,
@@ -166,26 +168,45 @@ async function scheduleCronJobs(
166168
inner join locks on (locks.identifier = specs.identifier)
167169
order by specs.index asc
168170
`,
169-
[
171+
values: [
170172
JSON.stringify(jobsAndIdentifiers),
171173
ts,
172174
useNodeTime ? new Date().toISOString() : null,
173175
],
174-
);
176+
name: !preparedStatements
177+
? undefined
178+
: `cron${useNodeTime ? "N" : ""}/${workerSchema}`,
179+
});
175180
}
176181

177182
/**
178183
* Marks any previously unknown crontab identifiers as now being known. Then
179184
* performs backfilling on any crontab tasks that need it.
180185
*/
181-
async function registerAndBackfillItems(
182-
ctx: CompiledSharedOptions,
183-
{ pgPool, events, cron }: { pgPool: Pool; events: WorkerEvents; cron: Cron },
184-
escapedWorkerSchema: string,
185-
parsedCronItems: ParsedCronItem[],
186-
startTime: Date,
187-
useNodeTime: boolean,
188-
) {
186+
async function registerAndBackfillItems(details: {
187+
ctx: CompiledSharedOptions;
188+
pgPool: Pool;
189+
events: WorkerEvents;
190+
cron: Cron;
191+
workerSchema: string;
192+
preparedStatements: boolean;
193+
escapedWorkerSchema: string;
194+
parsedCronItems: ParsedCronItem[];
195+
startTime: Date;
196+
useNodeTime: boolean;
197+
}) {
198+
const {
199+
ctx,
200+
pgPool,
201+
events,
202+
cron,
203+
workerSchema,
204+
preparedStatements,
205+
escapedWorkerSchema,
206+
parsedCronItems,
207+
startTime,
208+
useNodeTime,
209+
} = details;
189210
// First, scan the DB to get our starting point.
190211
const { rows } = await pgPool.query<KnownCrontab>(
191212
`SELECT * FROM ${escapedWorkerSchema}._private_known_crontabs as known_crontabs`,
@@ -273,6 +294,8 @@ async function registerAndBackfillItems(
273294
itemsToBackfill,
274295
ts,
275296
useNodeTime,
297+
workerSchema,
298+
preparedStatements,
276299
);
277300
}
278301

@@ -305,9 +328,10 @@ export const runCron = (
305328
const {
306329
logger,
307330
escapedWorkerSchema,
331+
workerSchema,
308332
events,
309333
resolvedPreset: {
310-
worker: { useNodeTime },
334+
worker: { useNodeTime, preparedStatements = true },
311335
},
312336
} = compiledSharedOptions;
313337

@@ -345,14 +369,18 @@ export const runCron = (
345369

346370
// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
347371
// work due to known_crontabs.last_execution having been updated.
348-
await registerAndBackfillItems(
372+
await registerAndBackfillItems({
349373
ctx,
350-
{ pgPool, events, cron },
374+
pgPool,
375+
events,
376+
cron,
377+
workerSchema,
378+
preparedStatements,
351379
escapedWorkerSchema,
352380
parsedCronItems,
353-
new Date(+start),
381+
startTime: new Date(+start),
354382
useNodeTime,
355-
);
383+
});
356384

357385
events.emit("cron:started", { ctx, cron, start });
358386

@@ -466,6 +494,8 @@ export const runCron = (
466494
jobsAndIdentifiers,
467495
ts,
468496
useNodeTime,
497+
workerSchema,
498+
preparedStatements,
469499
);
470500
events.emit("cron:scheduled", {
471501
ctx,

0 commit comments

Comments
 (0)