Skip to content

Commit b94bcf8

Browse files
authored
Cron error recovery (#544)
2 parents 5a305b9 + 6421b78 commit b94bcf8

File tree

3 files changed

+109
-47
lines changed

3 files changed

+109
-47
lines changed

src/cron.ts

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,27 @@ import {
1717
WorkerEvents,
1818
} from "./interfaces";
1919
import {
20+
calculateDelay,
2021
coerceError,
2122
CompiledOptions,
2223
CompiledSharedOptions,
2324
Releasers,
25+
RetryOptions,
26+
sleep,
2427
} from "./lib";
2528

2629
interface CronRequirements {
2730
pgPool: Pool;
2831
events: WorkerEvents;
2932
}
3033

34+
const CRON_RETRY: RetryOptions = {
35+
maxAttempts: Infinity,
36+
minDelay: 200,
37+
maxDelay: 60_000,
38+
multiplier: 1.5,
39+
};
40+
3141
/**
3242
* This function looks through all the cron items we have (e.g. from our
3343
* crontab file) and compares them to the items we already know about. If the
@@ -117,13 +127,15 @@ async function scheduleCronJobs(
117127
jobsAndIdentifiers: JobAndCronIdentifier[],
118128
ts: string,
119129
useNodeTime: boolean,
130+
workerSchema: string,
131+
preparedStatements: boolean,
120132
) {
121133
// TODO: refactor this to use `add_jobs`
122134

123135
// Note that `identifier` is guaranteed to be unique for every record
124136
// in `specs`.
125-
await pgPool.query(
126-
`
137+
await pgPool.query({
138+
text: `
127139
with specs as (
128140
select
129141
index,
@@ -166,26 +178,45 @@ async function scheduleCronJobs(
166178
inner join locks on (locks.identifier = specs.identifier)
167179
order by specs.index asc
168180
`,
169-
[
181+
values: [
170182
JSON.stringify(jobsAndIdentifiers),
171183
ts,
172184
useNodeTime ? new Date().toISOString() : null,
173185
],
174-
);
186+
name: !preparedStatements
187+
? undefined
188+
: `cron${useNodeTime ? "N" : ""}/${workerSchema}`,
189+
});
175190
}
176191

177192
/**
178193
* Marks any previously unknown crontab identifiers as now being known. Then
179194
* performs backfilling on any crontab tasks that need it.
180195
*/
181-
async function registerAndBackfillItems(
182-
ctx: CompiledSharedOptions,
183-
{ pgPool, events, cron }: { pgPool: Pool; events: WorkerEvents; cron: Cron },
184-
escapedWorkerSchema: string,
185-
parsedCronItems: ParsedCronItem[],
186-
startTime: Date,
187-
useNodeTime: boolean,
188-
) {
196+
async function registerAndBackfillItems(details: {
197+
ctx: CompiledSharedOptions;
198+
pgPool: Pool;
199+
events: WorkerEvents;
200+
cron: Cron;
201+
workerSchema: string;
202+
preparedStatements: boolean;
203+
escapedWorkerSchema: string;
204+
parsedCronItems: ParsedCronItem[];
205+
startTime: Date;
206+
useNodeTime: boolean;
207+
}) {
208+
const {
209+
ctx,
210+
pgPool,
211+
events,
212+
cron,
213+
workerSchema,
214+
preparedStatements,
215+
escapedWorkerSchema,
216+
parsedCronItems,
217+
startTime,
218+
useNodeTime,
219+
} = details;
189220
// First, scan the DB to get our starting point.
190221
const { rows } = await pgPool.query<KnownCrontab>(
191222
`SELECT * FROM ${escapedWorkerSchema}._private_known_crontabs as known_crontabs`,
@@ -273,6 +304,8 @@ async function registerAndBackfillItems(
273304
itemsToBackfill,
274305
ts,
275306
useNodeTime,
307+
workerSchema,
308+
preparedStatements,
276309
);
277310
}
278311

@@ -305,9 +338,10 @@ export const runCron = (
305338
const {
306339
logger,
307340
escapedWorkerSchema,
341+
workerSchema,
308342
events,
309343
resolvedPreset: {
310-
worker: { useNodeTime },
344+
worker: { useNodeTime, preparedStatements = true },
311345
},
312346
} = compiledSharedOptions;
313347

@@ -323,7 +357,7 @@ export const runCron = (
323357
if (!stopCalled) {
324358
stopCalled = true;
325359
if (e) {
326-
promise.reject(e);
360+
promise.reject(coerceError(e));
327361
} else {
328362
promise.resolve();
329363
}
@@ -334,6 +368,17 @@ export const runCron = (
334368
}
335369
}
336370

371+
let attempts = 0;
372+
function restartCronAfterDelay(error: Error) {
373+
++attempts;
374+
const delay = calculateDelay(attempts - 1, CRON_RETRY);
375+
logger.error(
376+
`Cron hit an error; restarting in ${delay}ms (attempt ${attempts}): ${error}`,
377+
{ error, attempts },
378+
);
379+
sleep(delay).then(cronMain).catch(restartCronAfterDelay);
380+
}
381+
337382
async function cronMain() {
338383
if (!cron._active) {
339384
return stop();
@@ -345,14 +390,18 @@ export const runCron = (
345390

346391
// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
347392
// work due to known_crontabs.last_execution having been updated.
348-
await registerAndBackfillItems(
393+
await registerAndBackfillItems({
349394
ctx,
350-
{ pgPool, events, cron },
395+
pgPool,
396+
events,
397+
cron,
398+
workerSchema,
399+
preparedStatements,
351400
escapedWorkerSchema,
352401
parsedCronItems,
353-
new Date(+start),
402+
startTime: new Date(+start),
354403
useNodeTime,
355-
);
404+
});
356405

357406
events.emit("cron:started", { ctx, cron, start });
358407

@@ -383,6 +432,8 @@ export const runCron = (
383432
if (!cron._active) {
384433
return stop();
385434
}
435+
// Healthy!
436+
attempts = 0;
386437

387438
// THIS MUST COME BEFORE nextTimestamp IS MUTATED
388439
const digest = digestTimestamp(nextTimestamp);
@@ -466,6 +517,8 @@ export const runCron = (
466517
jobsAndIdentifiers,
467518
ts,
468519
useNodeTime,
520+
workerSchema,
521+
preparedStatements,
469522
);
470523
events.emit("cron:scheduled", {
471524
ctx,
@@ -486,9 +539,10 @@ export const runCron = (
486539
// timestamps on error).
487540
scheduleNextLoop();
488541
} catch (e) {
489-
// If something goes wrong; abort. The calling code should re-schedule
490-
// which will re-trigger the backfilling code.
491-
return stop(coerceError(e));
542+
// If something goes wrong; abort the current loop and restart cron
543+
// after an exponential back-off. This is essential because we need to
544+
// re-trigger the backfilling code.
545+
return restartCronAfterDelay(coerceError(e));
492546
}
493547
}
494548

@@ -510,7 +564,7 @@ export const runCron = (
510564
promise,
511565
};
512566

513-
cronMain().catch(stop);
567+
cronMain().catch(restartCronAfterDelay);
514568

515569
return cron;
516570
};

src/interfaces.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -568,9 +568,9 @@ export interface WorkerPool {
568568

569569
export interface Runner {
570570
/** Attempts to cleanly shut down the runner */
571-
stop: () => Promise<void>;
571+
stop: (reason?: string) => Promise<void>;
572572
/** Use .stop() instead, unless you know what you're doing */
573-
kill: () => Promise<void>;
573+
kill: (reason?: string) => Promise<void>;
574574
addJob: AddJobFunction;
575575
promise: Promise<void>;
576576
events: WorkerEvents;

src/runner.ts

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,10 @@ function buildRunner(input: {
164164
});
165165

166166
let running = true;
167-
const stop = async () => {
168-
compiledOptions.logger.debug("Runner stopping");
167+
const stop = async (reason: string | null, itsFine = reason === null) => {
168+
compiledOptions.logger[itsFine ? "debug" : "warn"](
169+
`Runner stopping${reason ? ` (reason: ${reason})` : ""}`,
170+
);
169171
if (running) {
170172
running = false;
171173
events.emit("stop", { ctx });
@@ -190,25 +192,19 @@ function buildRunner(input: {
190192
throw new Error("Runner is already stopped");
191193
}
192194
};
193-
const kill = async () => {
194-
if (running) {
195-
stop().catch(() => {});
196-
}
197-
if (workerPool._active) {
198-
await workerPool.forcefulShutdown(`Terminated through .kill() command`);
199-
}
200-
};
201195

202-
const wp = workerPool.promise.finally(() => {
203-
if (running) {
204-
stop();
205-
}
206-
});
207-
const cp = cron.promise.finally(() => {
208-
if (running) {
209-
stop();
210-
}
211-
});
196+
const wp = workerPool.promise
197+
.then(
198+
() => (running ? stop("worker pool exited cleanly", true) : void 0),
199+
(e) => (running ? stop(`worker pool exited with error: ${e}`) : void 0),
200+
)
201+
.catch(noop);
202+
const cp = cron.promise
203+
.then(
204+
() => (running ? stop("cron exited cleanly", true) : void 0),
205+
(e) => (running ? stop(`cron exited with error: ${e}`) : void 0),
206+
)
207+
.catch(noop);
212208

213209
const promise = Promise.all([cp, wp]).then(
214210
() => {
@@ -217,7 +213,7 @@ function buildRunner(input: {
217213
async (error) => {
218214
if (running) {
219215
logger.error(`Stopping worker due to an error: ${error}`, { error });
220-
await stop();
216+
await stop(String(error));
221217
} else {
222218
logger.error(
223219
`Error occurred, but worker is already stopping: ${error}`,
@@ -229,10 +225,22 @@ function buildRunner(input: {
229225
);
230226

231227
return {
232-
stop,
233-
kill,
228+
// It's fine - the user told us to exit
229+
stop: (reason) => stop(reason ?? null, true),
230+
async kill(reason?: string) {
231+
if (running) {
232+
stop(`runner.kill() called${reason ? `: ${reason}` : ""}`).catch(noop);
233+
}
234+
if (workerPool._active) {
235+
await workerPool.forcefulShutdown(`Terminated through .kill() command`);
236+
}
237+
},
234238
addJob,
235239
promise,
236240
events,
237241
};
238242
}
243+
244+
function noop() {
245+
/* NOOP */
246+
}

0 commit comments

Comments
 (0)