Skip to content

Commit 3e911df

Browse files
committed
Ensure cron jobs get rescheduled even if the handler throws an error
1 parent 99c4a70 commit 3e911df

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

packages/redis-worker/src/cron.test.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ describe("Worker with cron", () => {
5252

5353
await setTimeout(6_000);
5454

55-
expect(processedItems.length).toBe(2);
55+
expect(processedItems.length).toBeGreaterThanOrEqual(2);
5656

5757
const secondItem = processedItems[1];
5858
expect(secondItem?.timestamp).toBeGreaterThan(firstItem!.timestamp);
@@ -62,4 +62,69 @@ describe("Worker with cron", () => {
6262
await worker.stop();
6363
}
6464
);
65+
66+
redisTest(
67+
"continues processing cron items even when job handler throws errors",
68+
{ timeout: 180_000 },
69+
async ({ redisContainer }) => {
70+
const processedItems: CronSchema[] = [];
71+
let executionCount = 0;
72+
73+
const worker = new Worker({
74+
name: "test-worker-error",
75+
redisOptions: {
76+
host: redisContainer.getHost(),
77+
port: redisContainer.getPort(),
78+
password: redisContainer.getPassword(),
79+
},
80+
catalog: {
81+
cronJob: {
82+
cron: "*/3 * * * * *", // Every 3 seconds
83+
schema: CronSchema,
84+
visibilityTimeoutMs: 5000,
85+
retry: { maxAttempts: 1 }, // Only try once to fail faster
86+
jitter: 100,
87+
},
88+
},
89+
jobs: {
90+
cronJob: async ({ payload }) => {
91+
executionCount++;
92+
await setTimeout(30); // Simulate work
93+
94+
// Throw error on first and third execution
95+
if (executionCount === 1 || executionCount === 3) {
96+
throw new Error(`Simulated error on execution ${executionCount}`);
97+
}
98+
99+
processedItems.push(payload);
100+
},
101+
},
102+
concurrency: {
103+
workers: 2,
104+
tasksPerWorker: 3,
105+
},
106+
logger: new Logger("test", "debug"),
107+
}).start();
108+
109+
// Wait long enough for 4 executions (12 seconds + buffer)
110+
await setTimeout(14_000);
111+
112+
// Should have at least 4 executions total
113+
expect(executionCount).toBeGreaterThanOrEqual(4);
114+
115+
// Should have 2 successful items (executions 2 and 4)
116+
expect(processedItems.length).toBeGreaterThanOrEqual(2);
117+
118+
// Verify that some executions failed (execution count > successful count)
119+
// This proves that errors occurred but cron scheduling continued
120+
expect(executionCount).toBeGreaterThan(processedItems.length);
121+
122+
// Verify that successful executions still have correct structure
123+
const firstSuccessful = processedItems[0];
124+
expect(firstSuccessful?.timestamp).toBeGreaterThan(0);
125+
expect(firstSuccessful?.cron).toBe("*/3 * * * * *");
126+
127+
await worker.stop();
128+
}
129+
);
65130
});

packages/redis-worker/src/worker.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,13 @@ class Worker<TCatalog extends WorkerCatalog> {
590590
attempt: newAttempt,
591591
errorMessage,
592592
});
593+
593594
await this.queue.moveToDeadLetterQueue(id, errorMessage);
595+
596+
if (catalogItem.cron) {
597+
await this.rescheduleCronJob(job, catalogItem, item);
598+
}
599+
594600
return;
595601
}
596602

0 commit comments

Comments
 (0)