Skip to content

Commit 90ea860

Browse files
authored
Use safeEmit() helper (#557)
2 parents 7c4cc12 + c9dd17d commit 90ea860

File tree

7 files changed

+112
-94
lines changed

7 files changed

+112
-94
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ Read more:
4646
- `backfillPeriod` is now marked as optional in TypeScript (defaults to 0).
4747
- Support for loading tasks from nested folders in crontab.
4848
- (`* * * * * nested/folder/task ?jobKey=my_key&jobKeyMode=preserve_run_at`)
49+
- Most of our event emitters now trap errors and output a log if such error were
50+
to occur - useful for debugging.
4951

5052
## v0.16.6
5153

src/cron.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
CompiledSharedOptions,
2424
Releasers,
2525
RetryOptions,
26+
safeEmit,
2627
sleep,
2728
} from "./lib";
2829

@@ -208,7 +209,6 @@ async function registerAndBackfillItems(details: {
208209
const {
209210
ctx,
210211
pgPool,
211-
events,
212212
cron,
213213
workerSchema,
214214
preparedStatements,
@@ -292,7 +292,7 @@ async function registerAndBackfillItems(details: {
292292
// the way the last_execution column of the known_crontabs table works.
293293
// At this time it's not expected that backfilling will be sufficiently
294294
// expensive to justify optimising this further.
295-
events.emit("cron:backfill", {
295+
safeEmit(ctx, "cron:backfill", {
296296
ctx,
297297
cron,
298298
itemsToBackfill,
@@ -386,7 +386,7 @@ export const runCron = (
386386

387387
const start = new Date();
388388
const ctx = compiledSharedOptions;
389-
events.emit("cron:starting", { ctx, cron, start });
389+
safeEmit(ctx, "cron:starting", { ctx, cron, start });
390390

391391
// We must backfill BEFORE scheduling any new jobs otherwise backfill won't
392392
// work due to known_crontabs.last_execution having been updated.
@@ -403,7 +403,7 @@ export const runCron = (
403403
useNodeTime,
404404
});
405405

406-
events.emit("cron:started", { ctx, cron, start });
406+
safeEmit(ctx, "cron:started", { ctx, cron, start });
407407

408408
if (!cron._active) {
409409
return stop();
@@ -465,7 +465,7 @@ export const runCron = (
465465
currentTimestamp,
466466
},
467467
);
468-
events.emit("cron:prematureTimer", {
468+
safeEmit(ctx, "cron:prematureTimer", {
469469
ctx,
470470
cron,
471471
currentTimestamp,
@@ -482,7 +482,7 @@ export const runCron = (
482482
((currentTimestamp - expectedTimestamp) % ONE_MINUTE) / 1000,
483483
)}s behind)`,
484484
);
485-
events.emit("cron:overdueTimer", {
485+
safeEmit(ctx, "cron:overdueTimer", {
486486
ctx,
487487
cron,
488488
currentTimestamp,
@@ -505,7 +505,7 @@ export const runCron = (
505505

506506
// Finally actually run the jobs.
507507
if (jobsAndIdentifiers.length) {
508-
events.emit("cron:schedule", {
508+
safeEmit(ctx, "cron:schedule", {
509509
ctx,
510510
cron,
511511
timestamp: expectedTimestamp,
@@ -520,7 +520,7 @@ export const runCron = (
520520
workerSchema,
521521
preparedStatements,
522522
);
523-
events.emit("cron:scheduled", {
523+
safeEmit(ctx, "cron:scheduled", {
524524
ctx,
525525
cron,
526526
timestamp: expectedTimestamp,

src/lib.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
RunOnceOptions,
2222
SharedOptions,
2323
WithPgClient,
24+
WorkerEventMap,
2425
WorkerOptions,
2526
WorkerPluginBaseContext,
2627
WorkerPluginContext,
@@ -632,3 +633,22 @@ export function calculateDelay(
632633
(0.5 + jitter)
633634
);
634635
}
636+
637+
export function safeEmit<TEvent extends keyof WorkerEventMap>(
638+
ctx: WorkerPluginContext,
639+
eventName: TEvent,
640+
payload: WorkerEventMap[TEvent],
641+
) {
642+
const { events, logger } = ctx;
643+
try {
644+
events.emit(eventName, payload);
645+
} catch (e) {
646+
const error = coerceError(e);
647+
logger.error(
648+
`Emitting ${eventName} raised an error - this should never happen: ${
649+
error.stack ?? error
650+
}`,
651+
{ error, eventName, payload },
652+
);
653+
}
654+
}

src/localQueue.ts

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
coerceError,
1616
RETRYABLE_ERROR_CODES,
1717
RetryOptions,
18+
safeEmit,
1819
sleep,
1920
} from "./lib";
2021
import { batchGetJobs } from "./sql/getJobs";
@@ -231,7 +232,7 @@ export class LocalQueue {
231232
private refetchDelayAbortThreshold: number = Infinity;
232233

233234
constructor(
234-
private readonly compiledSharedOptions: CompiledSharedOptions<WorkerPoolOptions>,
235+
private readonly ctx: CompiledSharedOptions<WorkerPoolOptions>,
235236
private readonly tasks: TaskList,
236237
private readonly withPgClient: EnhancedWithPgClient,
237238
public readonly workerPool: WorkerPool,
@@ -244,13 +245,10 @@ export class LocalQueue {
244245
private readonly continuous: boolean,
245246
private readonly onMajorError: (e: unknown) => void,
246247
) {
247-
this.ttl =
248-
compiledSharedOptions.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE;
249-
this.pollInterval =
250-
compiledSharedOptions.resolvedPreset.worker.pollInterval ?? 2 * SECOND;
248+
this.ttl = ctx.resolvedPreset.worker.localQueue?.ttl ?? 5 * MINUTE;
249+
this.pollInterval = ctx.resolvedPreset.worker.pollInterval ?? 2 * SECOND;
251250
const localQueueRefetchDelayDuration =
252-
compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay
253-
?.durationMs;
251+
ctx.resolvedPreset.worker.localQueue?.refetchDelay?.durationMs;
254252
if (
255253
localQueueRefetchDelayDuration != null &&
256254
localQueueRefetchDelayDuration > this.pollInterval
@@ -259,8 +257,8 @@ export class LocalQueue {
259257
`Invalid configuration; 'preset.worker.localQueue.refetchDelay.durationMs' (${localQueueRefetchDelayDuration}) must not be larger than 'preset.worker.pollInterval' (${this.pollInterval})`,
260258
);
261259
}
262-
compiledSharedOptions.events.emit("localQueue:init", {
263-
ctx: compiledSharedOptions,
260+
safeEmit(ctx, "localQueue:init", {
261+
ctx: ctx,
264262
localQueue: this,
265263
});
266264
// Immediately enter polling mode.
@@ -276,8 +274,8 @@ export class LocalQueue {
276274
const oldMode = this.mode;
277275
// Override the 'readonly'
278276
(this.mode as LocalQueueMode) = newMode;
279-
this.compiledSharedOptions.events.emit("localQueue:setMode", {
280-
ctx: this.compiledSharedOptions,
277+
safeEmit(this.ctx, "localQueue:setMode", {
278+
ctx: this.ctx,
281279
localQueue: this,
282280
oldMode,
283281
newMode,
@@ -321,7 +319,7 @@ export class LocalQueue {
321319
} else {
322320
// If we're not shutting down, view this as a temporary error (but give
323321
// Benjie a wrist slap anyway).
324-
this.compiledSharedOptions.logger.error(
322+
this.ctx.logger.error(
325323
`GraphileWorkerInternalError<cd483429-3372-42f0-bcf6-c78e045c760d>: Backgrounding should never yield errors when the queue is not RELEASED`,
326324
{ error: e },
327325
);
@@ -444,8 +442,8 @@ export class LocalQueue {
444442
}
445443
const jobsToReturn = this.jobQueue.splice(0, l);
446444

447-
this.compiledSharedOptions.events.emit("localQueue:returnJobs", {
448-
ctx: this.compiledSharedOptions,
445+
safeEmit(this.ctx, "localQueue:returnJobs", {
446+
ctx: this.ctx,
449447
localQueue: this,
450448
jobs: jobsToReturn,
451449
});
@@ -459,7 +457,7 @@ export class LocalQueue {
459457
initialError = lastError;
460458
}
461459

462-
this.compiledSharedOptions.logger.error(
460+
this.ctx.logger.error(
463461
`Failed to return jobs from local queue to database queue (attempt ${attempts}/${maxAttempts})`,
464462
{
465463
error: e,
@@ -502,7 +500,7 @@ export class LocalQueue {
502500
++attempts;
503501
return sleep(delay).then(() =>
504502
returnJobs(
505-
this.compiledSharedOptions,
503+
this.ctx,
506504
this.withPgClient, // We'll handle the retries via onError
507505
this.workerPool.id,
508506
jobsToReturn,
@@ -517,7 +515,7 @@ export class LocalQueue {
517515
// `onError` above, since `onError` returns the next promise each time.
518516
this.background(
519517
returnJobs(
520-
this.compiledSharedOptions,
518+
this.ctx,
521519
this.withPgClient, // We'll handle the retries via onError
522520
this.workerPool.id,
523521
jobsToReturn,
@@ -554,7 +552,7 @@ export class LocalQueue {
554552
this.background(
555553
this._fetch().catch((e) => {
556554
// This should not happen
557-
this.compiledSharedOptions.logger.error(`Error occurred during fetch`, {
555+
this.ctx.logger.error(`Error occurred during fetch`, {
558556
error: e,
559557
});
560558
}),
@@ -578,7 +576,7 @@ export class LocalQueue {
578576
/** How many jobs did we fetch? (Initialize to zero in case of error.) */
579577
let jobCount = 0;
580578
const refetchDelayOptions =
581-
this.compiledSharedOptions.resolvedPreset.worker.localQueue?.refetchDelay;
579+
this.ctx.resolvedPreset.worker.localQueue?.refetchDelay;
582580
try {
583581
assert.equal(this.mode, POLLING, "Can only fetch when in polling mode");
584582
assert.equal(
@@ -605,16 +603,16 @@ export class LocalQueue {
605603

606604
// The ONLY await in this function.
607605
const jobs = await batchGetJobs(
608-
this.compiledSharedOptions,
606+
this.ctx,
609607
this.withPgClient,
610608
this.tasks,
611609
this.workerPool.id,
612610
null, // `flagsToSkip` is not set, see `LocalQueue.getJob`
613611
this.getJobBatchSize,
614612
);
615613

616-
this.compiledSharedOptions.events.emit("localQueue:getJobs:complete", {
617-
ctx: this.compiledSharedOptions,
614+
safeEmit(this.ctx, "localQueue:getJobs:complete", {
615+
ctx: this.ctx,
618616
localQueue: this,
619617
jobs,
620618
});
@@ -634,7 +632,7 @@ export class LocalQueue {
634632
this.receivedJobs(jobs);
635633
} catch (e) {
636634
// Error happened; rely on poll interval.
637-
this.compiledSharedOptions.logger.error(
635+
this.ctx.logger.error(
638636
`Error occurred fetching jobs; will try again on next poll interval. Error: ${e}`,
639637
{ error: e },
640638
);
@@ -674,8 +672,8 @@ export class LocalQueue {
674672
this.refetchDelayCompleteOrAbort,
675673
refetchDelayMs,
676674
);
677-
this.compiledSharedOptions.events.emit("localQueue:refetchDelay:start", {
678-
ctx: this.compiledSharedOptions,
675+
safeEmit(this.ctx, "localQueue:refetchDelay:start", {
676+
ctx: this.ctx,
679677
localQueue: this,
680678
jobCount,
681679
threshold: refetchDelayOptions?.threshold ?? 0,
@@ -720,20 +718,17 @@ export class LocalQueue {
720718
// Force refetch because we've been notified of so many jobs!
721719
this.refetchDelayFetchOnComplete = true;
722720

723-
this.compiledSharedOptions.events.emit("localQueue:refetchDelay:abort", {
724-
ctx: this.compiledSharedOptions,
721+
safeEmit(this.ctx, "localQueue:refetchDelay:abort", {
722+
ctx: this.ctx,
725723
localQueue: this,
726724
count: this.refetchDelayCounter,
727725
abortThreshold: this.refetchDelayAbortThreshold,
728726
});
729727
} else {
730-
this.compiledSharedOptions.events.emit(
731-
"localQueue:refetchDelay:expired",
732-
{
733-
ctx: this.compiledSharedOptions,
734-
localQueue: this,
735-
},
736-
);
728+
safeEmit(this.ctx, "localQueue:refetchDelay:expired", {
729+
ctx: this.ctx,
730+
localQueue: this,
731+
});
737732
}
738733

739734
if (this.mode === POLLING && this.refetchDelayFetchOnComplete) {
@@ -789,7 +784,7 @@ export class LocalQueue {
789784
if (flagsToSkip !== null) {
790785
// PERF: we could actually batch for similar flags, I guess.
791786
const jobsPromise = batchGetJobs(
792-
this.compiledSharedOptions,
787+
this.ctx,
793788
this.withPgClient,
794789
this.tasks,
795790
this.workerPool.id,

0 commit comments

Comments
 (0)