Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 8634653

Browse files
Add support for dead letter queues (#411)
* Add support for dead letter queues * fixup! Add support for dead letter queues Co-authored-by: Josh Wheeler <[email protected]>
1 parent b55ab40 commit 8634653

File tree

4 files changed

+128
-32
lines changed

4 files changed

+128
-32
lines changed

packages/queues/src/broker.ts

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ export type QueueErrorCode = "ERR_CONSUMER_ALREADY_SET";
1818

1919
export class QueueError extends MiniflareError<QueueErrorCode> {}
2020

21-
export const MAX_ATTEMPTS = 3;
22-
const kShouldAttemptRetry = Symbol("kShouldAttemptRetry");
21+
const kGetPendingRetry = Symbol("kGetPendingRetry");
22+
const kPrepareForRetry = Symbol("kPrepareForRetry");
23+
const kGetFailedAttempts = Symbol("kGetFailedAttempts");
2324

2425
export class Message<Body = unknown> implements MessageInterface<Body> {
2526
readonly body: Body;
@@ -48,24 +49,17 @@ export class Message<Body = unknown> implements MessageInterface<Body> {
4849
this.#pendingRetry = true;
4950
}
5051

51-
[kShouldAttemptRetry](): boolean {
52-
if (!this.#pendingRetry) {
53-
return false;
54-
}
55-
52+
[kPrepareForRetry]() {
53+
this.#pendingRetry = false;
5654
this.#failedAttempts++;
57-
if (this.#failedAttempts >= MAX_ATTEMPTS) {
58-
this.#log?.warn(
59-
`Dropped message "${this.id}" after ${
60-
this.#failedAttempts
61-
} failed attempts!`
62-
);
63-
return false;
64-
}
55+
}
6556

66-
this.#log?.debug(`Retrying message "${this.id}"...`);
67-
this.#pendingRetry = false;
68-
return true;
57+
[kGetPendingRetry](): boolean {
58+
return this.#pendingRetry;
59+
}
60+
61+
[kGetFailedAttempts](): number {
62+
return this.#failedAttempts;
6963
}
7064
}
7165

@@ -96,6 +90,7 @@ enum FlushType {
9690
export const kSetFlushCallback = Symbol("kSetFlushCallback");
9791

9892
export class Queue<Body = unknown> implements QueueInterface<Body> {
93+
readonly #broker: QueueBroker;
9994
readonly #queueName: string;
10095
readonly #log?: Log;
10196

@@ -109,7 +104,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
109104
// A callback to run after a flush() has been executed: useful for testing.
110105
#flushCallback?: () => void;
111106

112-
constructor(queueName: string, log?: Log) {
107+
constructor(broker: QueueBroker, queueName: string, log?: Log) {
108+
this.#broker = broker;
113109
this.#queueName = queueName;
114110
this.#log = log;
115111

@@ -202,6 +198,8 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
202198
if (!this.#consumer) {
203199
return;
204200
}
201+
const maxAttempts = this.#consumer.maxRetries + 1;
202+
const deadLetterQueueName = this.#consumer.deadLetterQueue;
205203

206204
// Create a batch and execute the queue event handler
207205
const batch = new MessageBatch<Body>(this.#queueName, [...this.#messages]);
@@ -216,13 +214,42 @@ export class Queue<Body = unknown> implements QueueInterface<Body> {
216214
// Reset state and check for any messages to retry
217215
this.#pendingFlush = FlushType.NONE;
218216
this.#timeout = undefined;
219-
const messagesToRetry = batch.messages.filter((msg) =>
220-
msg[kShouldAttemptRetry]()
221-
);
222-
this.#messages.push(...messagesToRetry);
223-
if (this.#messages.length > 0) {
217+
218+
const toRetry: Message<Body>[] = [];
219+
const toDLQ: Message<Body>[] = [];
220+
batch.messages.forEach((msg) => {
221+
if (!msg[kGetPendingRetry]()) {
222+
return;
223+
}
224+
225+
msg[kPrepareForRetry]();
226+
if (msg[kGetFailedAttempts]() < maxAttempts) {
227+
this.#log?.debug(`Retrying message "${msg.id}"...`);
228+
toRetry.push(msg);
229+
} else if (deadLetterQueueName) {
230+
this.#log?.warn(
231+
`Moving message "${msg.id}" to dead letter queue "${deadLetterQueueName}"...`
232+
);
233+
toDLQ.push(msg);
234+
} else {
235+
this.#log?.warn(
236+
`Dropped message "${msg.id}" after ${maxAttempts} failed attempts!`
237+
);
238+
}
239+
});
240+
241+
if (toRetry.length) {
242+
this.#messages.push(...toRetry);
224243
this.#ensurePendingFlush();
225244
}
245+
246+
if (deadLetterQueueName) {
247+
const deadLetterQueue =
248+
this.#broker.getOrCreateQueue(deadLetterQueueName);
249+
toDLQ.forEach((msg) => {
250+
deadLetterQueue.send(msg.body);
251+
});
252+
}
226253
}
227254

228255
[kSetFlushCallback](callback: () => void) {
@@ -242,7 +269,7 @@ export class QueueBroker implements QueueBrokerInterface {
242269
getOrCreateQueue(name: string): Queue {
243270
let queue = this.#queues.get(name);
244271
if (queue === undefined) {
245-
this.#queues.set(name, (queue = new Queue(name, this.#log)));
272+
this.#queues.set(name, (queue = new Queue(this, name, this.#log)));
246273
}
247274
return queue;
248275
}

packages/queues/src/plugin.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111

1212
export const DEFAULT_BATCH_SIZE = 5;
1313
export const DEFAULT_WAIT_MS = 1000;
14+
export const DEFAULT_RETRIES = 2;
1415

1516
export interface BindingOptions {
1617
name: string;
@@ -106,6 +107,8 @@ export class QueuesPlugin
106107
queueName: opts.queueName,
107108
maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE,
108109
maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS,
110+
maxRetries: opts.maxRetries ?? DEFAULT_RETRIES,
111+
deadLetterQueue: opts.deadLetterQueue,
109112
dispatcher: this.ctx.queueEventDispatcher,
110113
};
111114

packages/queues/test/broker.spec.ts

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import {
2-
MAX_ATTEMPTS,
3-
QueueBroker,
4-
kSetFlushCallback,
5-
} from "@miniflare/queues";
1+
import { QueueBroker, kSetFlushCallback } from "@miniflare/queues";
62
import {
73
Consumer,
84
LogLevel,
@@ -19,6 +15,7 @@ test("QueueBroker: flushes partial batches", async (t) => {
1915
queueName: "myQueue",
2016
maxBatchSize: 5,
2117
maxWaitMs: 1,
18+
maxRetries: 2,
2219
dispatcher: async (_batch) => {},
2320
};
2421
q[kSetConsumer](sub);
@@ -109,6 +106,7 @@ test("QueueBroker: flushes full batches", async (t) => {
109106
queueName: "myQueue",
110107
maxBatchSize: 5,
111108
maxWaitMs: 1,
109+
maxRetries: 2,
112110
dispatcher: async (_batch) => {},
113111
};
114112
q[kSetConsumer](sub);
@@ -193,6 +191,7 @@ test("QueueBroker: supports message retry()", async (t) => {
193191
queueName: "myQueue",
194192
maxBatchSize: 5,
195193
maxWaitMs: 1,
194+
maxRetries: 2,
196195
dispatcher: async (_batch) => {},
197196
};
198197
q[kSetConsumer](sub);
@@ -241,6 +240,7 @@ test("QueueBroker: automatic retryAll() on consumer error", async (t) => {
241240
queueName: "myQueue",
242241
maxBatchSize: 5,
243242
maxWaitMs: 1,
243+
maxRetries: 2,
244244
dispatcher: async (_batch) => {},
245245
};
246246
q[kSetConsumer](sub);
@@ -299,6 +299,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
299299
queueName: "myQueue",
300300
maxBatchSize: 5,
301301
maxWaitMs: 1,
302+
maxRetries: 4,
302303
dispatcher: async (_batch) => {},
303304
};
304305
q[kSetConsumer](sub);
@@ -312,7 +313,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
312313
// Expect the queue to flush() the maximum number of times
313314
q.send("message1");
314315

315-
for (let i = 0; i < MAX_ATTEMPTS; i++) {
316+
for (let i = 0; i < 5; i++) {
316317
const prom = new Promise<void>((resolve) => {
317318
q[kSetFlushCallback](() => resolve());
318319
});
@@ -323,7 +324,7 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
323324
// Check last log message is warning that message dropped
324325
t.deepEqual(log.logs[log.logs.length - 1], [
325326
LogLevel.WARN,
326-
'Dropped message "myQueue-0" after 3 failed attempts!',
327+
'Dropped message "myQueue-0" after 5 failed attempts!',
327328
]);
328329

329330
// To check that "message1" is dropped:
@@ -338,3 +339,66 @@ test("QueueBroker: drops messages after max retry()", async (t) => {
338339
});
339340
await prom;
340341
});
342+
343+
test("QueueBroker: dead letter queue support", async (t) => {
344+
const log = new TestLog();
345+
log.error = (message) =>
346+
log.logWithLevel(LogLevel.ERROR, message?.stack ?? "");
347+
348+
const broker = new QueueBroker(log);
349+
350+
// Setup the original queue
351+
const q = broker.getOrCreateQueue("myQueue");
352+
const originalConsumer: Consumer = {
353+
queueName: "myQueue",
354+
maxBatchSize: 5,
355+
maxWaitMs: 1,
356+
maxRetries: 1,
357+
deadLetterQueue: "myDLQ",
358+
dispatcher: async (_batch) => {},
359+
};
360+
q[kSetConsumer](originalConsumer);
361+
362+
const dlq = broker.getOrCreateQueue("myDLQ");
363+
const dlqConsumer: Consumer = {
364+
queueName: "myDLQ",
365+
maxBatchSize: 5,
366+
maxWaitMs: 1,
367+
maxRetries: 0,
368+
dispatcher: async (_batch) => {},
369+
};
370+
dlq[kSetConsumer](dlqConsumer);
371+
372+
// Set up the consumer for the original queue
373+
let originalInvocations = 0;
374+
originalConsumer.dispatcher = async (batch: MessageBatch) => {
375+
batch.messages[0].retry();
376+
originalInvocations++;
377+
};
378+
379+
// Set up the consumer for the dead letter queue
380+
let dlqInvocations = 0;
381+
dlqConsumer.dispatcher = async (_batch: MessageBatch) => {
382+
dlqInvocations++;
383+
};
384+
385+
const originalQProm = new Promise<void>((resolve) => {
386+
q[kSetFlushCallback](() => resolve());
387+
});
388+
q.send("message1");
389+
await originalQProm;
390+
391+
const dlqProm = new Promise<void>((resolve) => {
392+
dlq[kSetFlushCallback](() => resolve());
393+
});
394+
await dlqProm;
395+
396+
t.deepEqual(originalInvocations, 2);
397+
t.deepEqual(dlqInvocations, 1);
398+
399+
// Check last log message is warning that message dropped
400+
t.deepEqual(log.logs[log.logs.length - 1], [
401+
LogLevel.WARN,
402+
'Moving message "myQueue-0" to dead letter queue "myDLQ"...',
403+
]);
404+
});

packages/shared/src/queues.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ export interface Consumer {
1515
queueName: string;
1616
maxBatchSize: number;
1717
maxWaitMs: number;
18+
maxRetries: number;
19+
deadLetterQueue?: string;
1820
dispatcher: QueueEventDispatcher;
1921
}
2022

0 commit comments

Comments
 (0)