Skip to content

Commit d4715e2

Browse files
l4mbymagne
andauthored
[IS-44/feat]: add context to consumer message handler (#51)
* wip: annotations still missing * wip: tests for modified delivery * wip: rebase pr * feat: add context in consumer message handler * chore: update examples accordingly * chore: add error if receiver link is closed --------- Co-authored-by: magne <[email protected]>
1 parent f2d7864 commit d4715e2

File tree

7 files changed

+226
-22
lines changed

7 files changed

+226
-22
lines changed

conf/enabled_plugins

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
[rabbit_stream,rabbitmq_stream_management,rabbitmq_consistent_hash_exchange].
1+
[rabbit_stream,rabbitmq_stream_management,rabbitmq_management,rabbitmq_top,rabbitmq_consistent_hash_exchange].

conf/rabbitmq.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
loopback_users = none
2+
loopback_users.guest = true
3+
4+
log.console = true
5+
log.console.level = debug
6+
log.exchange = true
7+
8+
listeners.tcp.default = 5672
9+
10+
deprecated_features.permit.amqp_address_v1 = false

examples/index.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,12 @@ async function main() {
4949
publisher.close()
5050

5151
console.log("Opening a consumer and consuming messages...")
52-
const consumer = await connection.createConsumer(testQueue, {
53-
messageHandler: (msg) => console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`),
52+
const consumer = await connection.createConsumer({
53+
queue: { name: testQueue },
54+
messageHandler: (context, msg) => {
55+
context.accept()
56+
console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`)
57+
},
5458
})
5559
consumer.start()
5660
await sleep(5000)

src/consumer.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ import {
1919
} from "./utils.js"
2020
import { createConsumerAddressFrom } from "./message.js"
2121
import { QueueOptions } from "./message.js"
22+
import { AmqpDeliveryContext, DeliveryContext } from "./delivery_context.js"
2223

23-
export type ConsumerMessageHandler = (message: Message) => void
24+
export type ConsumerMessageHandler = (context: DeliveryContext, message: Message) => void
2425

2526
export type StreamOptions = {
2627
name: string
@@ -103,12 +104,8 @@ export class AmqpConsumer implements Consumer {
103104
start() {
104105
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
105106
if (context.message && context.delivery) {
106-
try {
107-
this.params.messageHandler(context.message)
108-
context.delivery.accept()
109-
} catch (e) {
110-
context.delivery.reject({ condition: "Message Handler error", info: e })
111-
}
107+
const deliveryContext = new AmqpDeliveryContext(context.delivery, this.receiverLink)
108+
this.params.messageHandler(deliveryContext, context.message)
112109
}
113110
})
114111
}

src/delivery_context.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { Delivery, Receiver } from "rhea"
2+
3+
export interface DeliveryContext {
4+
accept(): void
5+
discard(): void
6+
requeue(): void
7+
}
8+
9+
export class AmqpDeliveryContext implements DeliveryContext {
10+
constructor(
11+
private readonly delivery: Delivery,
12+
private readonly receiverLink: Receiver
13+
) {}
14+
15+
accept(): void {
16+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
17+
18+
this.delivery.accept()
19+
}
20+
21+
discard(): void {
22+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
23+
24+
this.delivery.reject()
25+
}
26+
27+
requeue(): void {
28+
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")
29+
30+
this.delivery.release()
31+
}
32+
}

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ export { Connection, AmqpConnection } from "./connection.js"
44
export { Publisher, AmqpPublisher } from "./publisher.js"
55
export { Consumer, AmqpConsumer } from "./consumer.js"
66
export { createAmqpMessage } from "./message.js"
7-
export { OutcomeState } from "./utils.js"
7+
export { OutcomeState, Offset, OffsetType } from "./utils.js"

test/e2e/consumer.test.ts

Lines changed: 172 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,32 @@
11
import { Management } from "../../src/index.js"
22
import { afterEach, beforeEach, describe, expect, test } from "vitest"
3-
import { eventually, host, password, port, username, cleanRabbit } from "../support/util.js"
3+
import { eventually, host, password, port, username, cleanRabbit, wait } from "../support/util.js"
44
import { createEnvironment, Environment } from "../../src/environment.js"
55
import { Connection } from "../../src/connection.js"
66
import { Queue } from "../../src/queue.js"
77
import { Exchange } from "../../src/exchange.js"
88
import { createAmqpMessage } from "../../src/message.js"
99
import { Offset } from "../../src/utils.js"
10+
import { Message } from "rhea"
1011

1112
describe("Consumer", () => {
1213
let environment: Environment
1314
let connection: Connection
1415
let management: Management
1516
let queue: Queue
17+
let deadLetterQueue: Queue
1618
let exchange: Exchange
19+
let deadLetterExchange: Exchange
1720

1821
const exchangeName = "test-exchange"
1922
const queueName = "test-queue"
23+
const discardQueueName = "test-discard-queue"
24+
const requeueQueueName = "test-requeue-queue"
2025
const bindingKey = "test-binding"
2126
const streamName = "test-stream"
27+
const deadLetterExchangeName = "test-dead-letter-exchange"
28+
const deadLetterQueueName = "test-dead-letter-queue"
29+
const deadLetterBindingKey = "test-dead-letter-binding"
2230

2331
beforeEach(async () => {
2432
environment = createEnvironment({
@@ -31,8 +39,23 @@ describe("Consumer", () => {
3139
management = connection.management()
3240
queue = await management.declareQueue(queueName)
3341
await management.declareQueue(streamName, { type: "stream" })
42+
await management.declareQueue(discardQueueName, {
43+
type: "quorum",
44+
durable: true,
45+
arguments: {
46+
"x-dead-letter-exchange": deadLetterExchangeName,
47+
"x-dead-letter-routing-key": deadLetterBindingKey,
48+
},
49+
})
50+
await management.declareQueue(requeueQueueName, {
51+
type: "quorum",
52+
durable: true,
53+
})
54+
deadLetterQueue = await management.declareQueue(deadLetterQueueName, { exclusive: true })
3455
exchange = await management.declareExchange(exchangeName)
56+
deadLetterExchange = await management.declareExchange(deadLetterExchangeName, { type: "fanout", auto_delete: true })
3557
await management.bind(bindingKey, { source: exchange, destination: queue })
58+
await management.bind(deadLetterBindingKey, { source: deadLetterExchange, destination: deadLetterQueue })
3659
})
3760

3861
afterEach(async () => {
@@ -45,26 +68,27 @@ describe("Consumer", () => {
4568
}
4669
})
4770

48-
test("consumer can handle message on exchange", async () => {
71+
test("consumer can handle a message published to an exchange", async () => {
4972
const publisher = await connection.createPublisher({ exchange: { name: exchangeName, routingKey: bindingKey } })
5073
const expectedBody = "ciao"
5174
await publisher.publish(createAmqpMessage({ body: expectedBody }))
5275
let received: string = ""
5376

5477
const consumer = await connection.createConsumer({
5578
queue: { name: queueName },
56-
messageHandler: (message) => {
79+
messageHandler: (context, message) => {
80+
context.accept()
5781
received = message.body
5882
},
5983
})
6084
consumer.start()
6185

62-
await eventually(() => {
86+
await eventually(async () => {
6387
expect(received).to.be.eql(expectedBody)
6488
})
6589
})
6690

67-
test("consumer can handle message on exchange, destination on message", async () => {
91+
test("consumer can handle a message published to an exchange with the destination directly on the message", async () => {
6892
const publisher = await connection.createPublisher()
6993
const expectedBody = "ciao"
7094
await publisher.publish(
@@ -77,7 +101,8 @@ describe("Consumer", () => {
77101

78102
const consumer = await connection.createConsumer({
79103
queue: { name: queueName },
80-
messageHandler: (message) => {
104+
messageHandler: (context, message) => {
105+
context.accept()
81106
received = message.body
82107
},
83108
})
@@ -88,7 +113,7 @@ describe("Consumer", () => {
88113
})
89114
})
90115

91-
test("consumer can handle message on queue", async () => {
116+
test("consumer can handle a message published to a queue", async () => {
92117
const publisher = await connection.createPublisher({ queue: { name: queueName } })
93118
const expectedBody = "ciao"
94119
await publisher.publish(
@@ -100,7 +125,8 @@ describe("Consumer", () => {
100125

101126
const consumer = await connection.createConsumer({
102127
queue: { name: queueName },
103-
messageHandler: (message) => {
128+
messageHandler: (context, message) => {
129+
context.accept()
104130
received = message.body
105131
},
106132
})
@@ -126,7 +152,8 @@ describe("Consumer", () => {
126152
name: streamName,
127153
offset: Offset.first(),
128154
},
129-
messageHandler: (message) => {
155+
messageHandler: (context, message) => {
156+
context.discard()
130157
received = message.body
131158
},
132159
})
@@ -158,9 +185,14 @@ describe("Consumer", () => {
158185
matchUnfiltered: true,
159186
filterValues: ["invoices"],
160187
},
161-
messageHandler: (message) => {
162-
if (message.message_annotations && ["invoices"].includes(message.message_annotations["x-stream-filter-value"]))
188+
messageHandler: (context, message) => {
189+
if (
190+
message.message_annotations &&
191+
["invoices"].includes(message.message_annotations["x-stream-filter-value"])
192+
) {
163193
received = message.body
194+
}
195+
context.accept()
164196
},
165197
})
166198
consumer.start()
@@ -169,4 +201,133 @@ describe("Consumer", () => {
169201
expect(received).to.be.eql("filtered")
170202
})
171203
})
204+
205+
test("consumer can discard a message published to a queue", async () => {
206+
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
207+
const expectedBody = "ciao"
208+
await publisher.publish(
209+
createAmqpMessage({
210+
body: expectedBody,
211+
})
212+
)
213+
let received: string = ""
214+
215+
const consumer = await connection.createConsumer({
216+
queue: { name: discardQueueName },
217+
messageHandler: (context, message) => {
218+
context.discard()
219+
received = message.body
220+
},
221+
})
222+
consumer.start()
223+
224+
await eventually(async () => {
225+
expect(received).to.be.eql(expectedBody)
226+
const deadLetterInfo = await management.getQueueInfo(deadLetterQueueName)
227+
expect(deadLetterInfo.getInfo.messageCount).eql(1)
228+
})
229+
})
230+
231+
test.skip("consumer can discard a message with annotations in a queue", async () => {
232+
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
233+
const expectedBody = "ciao"
234+
await publisher.publish(
235+
createAmqpMessage({
236+
body: expectedBody,
237+
})
238+
)
239+
let receivedAnnotationValue: string | undefined = ""
240+
const consumer = await connection.createConsumer({
241+
queue: { name: discardQueueName },
242+
messageHandler: (context) => {
243+
context.discard()
244+
},
245+
})
246+
consumer.start()
247+
await wait(2000)
248+
consumer.close()
249+
await wait(3000)
250+
251+
const consumerDeadLetter = await connection.createConsumer({
252+
queue: { name: deadLetterQueueName },
253+
messageHandler: (context, message) => {
254+
receivedAnnotationValue = message.message_annotations
255+
? message.message_annotations["x-opt-annotation-key"]
256+
: undefined
257+
context.accept()
258+
},
259+
})
260+
consumerDeadLetter.start()
261+
await wait(3000)
262+
263+
await eventually(() => {
264+
expect(receivedAnnotationValue).eql("annotation-value")
265+
})
266+
}, 15000)
267+
268+
test("consumer can requeue a message in a queue", async () => {
269+
let toRequeue = true
270+
const messages: Message[] = []
271+
const consumer = await connection.createConsumer({
272+
queue: { name: requeueQueueName },
273+
messageHandler: (context, message) => {
274+
messages.push(message)
275+
if (toRequeue) {
276+
toRequeue = false
277+
context.requeue()
278+
return
279+
}
280+
context.accept()
281+
},
282+
})
283+
284+
consumer.start()
285+
const publisher = await connection.createPublisher({ queue: { name: requeueQueueName } })
286+
const expectedBody = "ciao"
287+
await publisher.publish(
288+
createAmqpMessage({
289+
body: expectedBody,
290+
})
291+
)
292+
293+
await eventually(async () => {
294+
expect(toRequeue).eql(false)
295+
expect(messages).lengthOf(2)
296+
})
297+
})
298+
299+
test.skip("consumer can requeue a message with annotations in a queue", async () => {
300+
let toRequeue = true
301+
const messages: Message[] = []
302+
const consumer = await connection.createConsumer({
303+
queue: { name: requeueQueueName },
304+
messageHandler: (context, message) => {
305+
messages.push(message)
306+
if (toRequeue) {
307+
toRequeue = false
308+
context.requeue()
309+
return
310+
}
311+
context.accept()
312+
},
313+
})
314+
315+
consumer.start()
316+
const publisher = await connection.createPublisher({ queue: { name: requeueQueueName } })
317+
const expectedBody = "ciao"
318+
await publisher.publish(
319+
createAmqpMessage({
320+
body: expectedBody,
321+
})
322+
)
323+
324+
await eventually(async () => {
325+
expect(toRequeue).eql(false)
326+
expect(messages).lengthOf(2)
327+
expect(messages[0].message_annotations!["x-opt-annotation-key"]).toBeUndefined()
328+
expect(messages[0].message_annotations!["x-delivery-count"]).toBeUndefined()
329+
expect(messages[1].message_annotations!["x-opt-annotation-key"]).toEqual("annotation-value")
330+
expect(messages[1].message_annotations!["x-delivery-count"]).toEqual("1")
331+
})
332+
})
172333
})

0 commit comments

Comments
 (0)