Skip to content

Commit 54b05aa

Browse files
author
magne
committed
feat: add discard and requeue with annotations
1 parent 027d664 commit 54b05aa

File tree

2 files changed

+37
-13
lines changed

2 files changed

+37
-13
lines changed

src/delivery_context.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { Delivery, Receiver } from "rhea"
1+
import { Delivery, MessageAnnotations, Receiver } from "rhea"
22

33
export interface DeliveryContext {
44
accept(): void
5-
discard(): void
6-
requeue(): void
5+
discard(annotations?: MessageAnnotations): void
6+
requeue(annotations?: MessageAnnotations): void
77
}
88

99
export class AmqpDeliveryContext implements DeliveryContext {
@@ -18,15 +18,39 @@ export class AmqpDeliveryContext implements DeliveryContext {
1818
this.delivery.accept()
1919
}
2020

21-
discard(): void {
21+
discard(annotations?: MessageAnnotations): void {
2222
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
23+
if (!annotations) {
24+
this.delivery.reject()
25+
return
26+
}
2327

24-
this.delivery.reject()
28+
this.discardWithAnnotations(annotations)
2529
}
2630

27-
requeue(): void {
31+
private discardWithAnnotations(annotations: MessageAnnotations): void {
32+
this.delivery.modified({
33+
delivery_failed: true,
34+
undeliverable_here: true,
35+
message_annotations: annotations,
36+
})
37+
}
38+
39+
requeue(annotations?: MessageAnnotations): void {
2840
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
41+
if (!annotations) {
42+
this.delivery.release()
43+
return
44+
}
45+
46+
this.requeueWithAnnotations(annotations)
47+
}
2948

30-
this.delivery.release()
49+
private requeueWithAnnotations(annotations: MessageAnnotations): void {
50+
this.delivery.modified({
51+
delivery_failed: false,
52+
undeliverable_here: false,
53+
message_annotations: annotations,
54+
})
3155
}
3256
}

test/e2e/consumer.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ describe("Consumer", () => {
228228
})
229229
})
230230

231-
test.skip("consumer can discard a message with annotations in a queue", async () => {
231+
test("consumer can discard a message with annotations in a queue", async () => {
232232
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
233233
const expectedBody = "ciao"
234234
await publisher.publish(
@@ -240,7 +240,7 @@ describe("Consumer", () => {
240240
const consumer = await connection.createConsumer({
241241
queue: { name: discardQueueName },
242242
messageHandler: (context) => {
243-
context.discard()
243+
context.discard({ "x-opt-annotation-key": "annotation-value" })
244244
},
245245
})
246246
consumer.start()
@@ -296,7 +296,7 @@ describe("Consumer", () => {
296296
})
297297
})
298298

299-
test.skip("consumer can requeue a message with annotations in a queue", async () => {
299+
test("consumer can requeue a message with annotations in a queue", async () => {
300300
let toRequeue = true
301301
const messages: Message[] = []
302302
const consumer = await connection.createConsumer({
@@ -305,7 +305,7 @@ describe("Consumer", () => {
305305
messages.push(message)
306306
if (toRequeue) {
307307
toRequeue = false
308-
context.requeue()
308+
context.requeue({ "x-opt-annotation-key": "annotation-value" })
309309
return
310310
}
311311
context.accept()
@@ -327,7 +327,7 @@ describe("Consumer", () => {
327327
expect(messages[0].message_annotations!["x-opt-annotation-key"]).toBeUndefined()
328328
expect(messages[0].message_annotations!["x-delivery-count"]).toBeUndefined()
329329
expect(messages[1].message_annotations!["x-opt-annotation-key"]).toEqual("annotation-value")
330-
expect(messages[1].message_annotations!["x-delivery-count"]).toEqual("1")
330+
expect(messages[1].message_annotations!["x-delivery-count"]).toEqual(1)
331331
})
332-
})
332+
}, 15000)
333333
})

0 commit comments

Comments
 (0)