Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/enabled_plugins
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[rabbit_stream,rabbitmq_stream_management,rabbitmq_consistent_hash_exchange].
[rabbit_stream,rabbitmq_stream_management,rabbitmq_management,rabbitmq_top,rabbitmq_consistent_hash_exchange].
10 changes: 10 additions & 0 deletions conf/rabbitmq.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
loopback_users = none
loopback_users.guest = true

log.console = true
log.console.level = debug
log.exchange = true

listeners.tcp.default = 5672

deprecated_features.permit.amqp_address_v1 = false
8 changes: 6 additions & 2 deletions examples/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ async function main() {
publisher.close()

console.log("Opening a consumer and consuming messages...")
const consumer = await connection.createConsumer(testQueue, {
messageHandler: (msg) => console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`),
const consumer = await connection.createConsumer({
queue: { name: testQueue },
messageHandler: (context, msg) => {
context.accept()
console.log(`MessageId: ${msg.message_id}; Payload: ${msg.body}`)
},
})
consumer.start()
await sleep(5000)
Expand Down
11 changes: 4 additions & 7 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import {
} from "./utils.js"
import { createConsumerAddressFrom } from "./message.js"
import { QueueOptions } from "./message.js"
import { AmqpDeliveryContext, DeliveryContext } from "./delivery_context.js"

export type ConsumerMessageHandler = (message: Message) => void
export type ConsumerMessageHandler = (context: DeliveryContext, message: Message) => void

export type StreamOptions = {
name: string
Expand Down Expand Up @@ -103,12 +104,8 @@ export class AmqpConsumer implements Consumer {
start() {
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
if (context.message && context.delivery) {
try {
this.params.messageHandler(context.message)
context.delivery.accept()
} catch (e) {
context.delivery.reject({ condition: "Message Handler error", info: e })
}
const deliveryContext = new AmqpDeliveryContext(context.delivery, this.receiverLink)
this.params.messageHandler(deliveryContext, context.message)
}
})
}
Expand Down
32 changes: 32 additions & 0 deletions src/delivery_context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Delivery, Receiver } from "rhea"

export interface DeliveryContext {
accept(): void
discard(): void
requeue(): void
}

export class AmqpDeliveryContext implements DeliveryContext {
constructor(
private readonly delivery: Delivery,
private readonly receiverLink: Receiver
) {}

accept(): void {
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")

this.delivery.accept()
}

discard(): void {
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")

this.delivery.reject()
}

requeue(): void {
if (this.receiverLink.is_closed()) throw new Error("Receiver link is closed")

this.delivery.release()
}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ export { Connection, AmqpConnection } from "./connection.js"
export { Publisher, AmqpPublisher } from "./publisher.js"
export { Consumer, AmqpConsumer } from "./consumer.js"
export { createAmqpMessage } from "./message.js"
export { OutcomeState } from "./utils.js"
export { OutcomeState, Offset, OffsetType } from "./utils.js"
183 changes: 172 additions & 11 deletions test/e2e/consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
import { Management } from "../../src/index.js"
import { afterEach, beforeEach, describe, expect, test } from "vitest"
import { eventually, host, password, port, username, cleanRabbit } from "../support/util.js"
import { eventually, host, password, port, username, cleanRabbit, wait } from "../support/util.js"
import { createEnvironment, Environment } from "../../src/environment.js"
import { Connection } from "../../src/connection.js"
import { Queue } from "../../src/queue.js"
import { Exchange } from "../../src/exchange.js"
import { createAmqpMessage } from "../../src/message.js"
import { Offset } from "../../src/utils.js"
import { Message } from "rhea"

describe("Consumer", () => {
let environment: Environment
let connection: Connection
let management: Management
let queue: Queue
let deadLetterQueue: Queue
let exchange: Exchange
let deadLetterExchange: Exchange

const exchangeName = "test-exchange"
const queueName = "test-queue"
const discardQueueName = "test-discard-queue"
const requeueQueueName = "test-requeue-queue"
const bindingKey = "test-binding"
const streamName = "test-stream"
const deadLetterExchangeName = "test-dead-letter-exchange"
const deadLetterQueueName = "test-dead-letter-queue"
const deadLetterBindingKey = "test-dead-letter-binding"

beforeEach(async () => {
environment = createEnvironment({
Expand All @@ -31,8 +39,23 @@ describe("Consumer", () => {
management = connection.management()
queue = await management.declareQueue(queueName)
await management.declareQueue(streamName, { type: "stream" })
await management.declareQueue(discardQueueName, {
type: "quorum",
durable: true,
arguments: {
"x-dead-letter-exchange": deadLetterExchangeName,
"x-dead-letter-routing-key": deadLetterBindingKey,
},
})
await management.declareQueue(requeueQueueName, {
type: "quorum",
durable: true,
})
deadLetterQueue = await management.declareQueue(deadLetterQueueName, { exclusive: true })
exchange = await management.declareExchange(exchangeName)
deadLetterExchange = await management.declareExchange(deadLetterExchangeName, { type: "fanout", auto_delete: true })
await management.bind(bindingKey, { source: exchange, destination: queue })
await management.bind(deadLetterBindingKey, { source: deadLetterExchange, destination: deadLetterQueue })
})

afterEach(async () => {
Expand All @@ -45,26 +68,27 @@ describe("Consumer", () => {
}
})

test("consumer can handle message on exchange", async () => {
test("consumer can handle a message published to an exchange", async () => {
const publisher = await connection.createPublisher({ exchange: { name: exchangeName, routingKey: bindingKey } })
const expectedBody = "ciao"
await publisher.publish(createAmqpMessage({ body: expectedBody }))
let received: string = ""

const consumer = await connection.createConsumer({
queue: { name: queueName },
messageHandler: (message) => {
messageHandler: (context, message) => {
context.accept()
received = message.body
},
})
consumer.start()

await eventually(() => {
await eventually(async () => {
expect(received).to.be.eql(expectedBody)
})
})

test("consumer can handle message on exchange, destination on message", async () => {
test("consumer can handle a message published to an exchange with the destination directly on the message", async () => {
const publisher = await connection.createPublisher()
const expectedBody = "ciao"
await publisher.publish(
Expand All @@ -77,7 +101,8 @@ describe("Consumer", () => {

const consumer = await connection.createConsumer({
queue: { name: queueName },
messageHandler: (message) => {
messageHandler: (context, message) => {
context.accept()
received = message.body
},
})
Expand All @@ -88,7 +113,7 @@ describe("Consumer", () => {
})
})

test("consumer can handle message on queue", async () => {
test("consumer can handle a message published to a queue", async () => {
const publisher = await connection.createPublisher({ queue: { name: queueName } })
const expectedBody = "ciao"
await publisher.publish(
Expand All @@ -100,7 +125,8 @@ describe("Consumer", () => {

const consumer = await connection.createConsumer({
queue: { name: queueName },
messageHandler: (message) => {
messageHandler: (context, message) => {
context.accept()
received = message.body
},
})
Expand All @@ -126,7 +152,8 @@ describe("Consumer", () => {
name: streamName,
offset: Offset.first(),
},
messageHandler: (message) => {
messageHandler: (context, message) => {
context.discard()
received = message.body
},
})
Expand Down Expand Up @@ -158,9 +185,14 @@ describe("Consumer", () => {
matchUnfiltered: true,
filterValues: ["invoices"],
},
messageHandler: (message) => {
if (message.message_annotations && ["invoices"].includes(message.message_annotations["x-stream-filter-value"]))
messageHandler: (context, message) => {
if (
message.message_annotations &&
["invoices"].includes(message.message_annotations["x-stream-filter-value"])
) {
received = message.body
}
context.accept()
},
})
consumer.start()
Expand All @@ -169,4 +201,133 @@ describe("Consumer", () => {
expect(received).to.be.eql("filtered")
})
})

test("consumer can discard a message published to a queue", async () => {
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
})
)
let received: string = ""

const consumer = await connection.createConsumer({
queue: { name: discardQueueName },
messageHandler: (context, message) => {
context.discard()
received = message.body
},
})
consumer.start()

await eventually(async () => {
expect(received).to.be.eql(expectedBody)
const deadLetterInfo = await management.getQueueInfo(deadLetterQueueName)
expect(deadLetterInfo.getInfo.messageCount).eql(1)
})
})

test.skip("consumer can discard a message with annotations in a queue", async () => {
const publisher = await connection.createPublisher({ queue: { name: discardQueueName } })
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
})
)
let receivedAnnotationValue: string | undefined = ""
const consumer = await connection.createConsumer({
queue: { name: discardQueueName },
messageHandler: (context) => {
context.discard()
},
})
consumer.start()
await wait(2000)
consumer.close()
await wait(3000)

const consumerDeadLetter = await connection.createConsumer({
queue: { name: deadLetterQueueName },
messageHandler: (context, message) => {
receivedAnnotationValue = message.message_annotations
? message.message_annotations["x-opt-annotation-key"]
: undefined
context.accept()
},
})
consumerDeadLetter.start()
await wait(3000)

await eventually(() => {
expect(receivedAnnotationValue).eql("annotation-value")
})
}, 15000)

test("consumer can requeue a message in a queue", async () => {
let toRequeue = true
const messages: Message[] = []
const consumer = await connection.createConsumer({
queue: { name: requeueQueueName },
messageHandler: (context, message) => {
messages.push(message)
if (toRequeue) {
toRequeue = false
context.requeue()
return
}
context.accept()
},
})

consumer.start()
const publisher = await connection.createPublisher({ queue: { name: requeueQueueName } })
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
})
)

await eventually(async () => {
expect(toRequeue).eql(false)
expect(messages).lengthOf(2)
})
})

test.skip("consumer can requeue a message with annotations in a queue", async () => {
let toRequeue = true
const messages: Message[] = []
const consumer = await connection.createConsumer({
queue: { name: requeueQueueName },
messageHandler: (context, message) => {
messages.push(message)
if (toRequeue) {
toRequeue = false
context.requeue()
return
}
context.accept()
},
})

consumer.start()
const publisher = await connection.createPublisher({ queue: { name: requeueQueueName } })
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
})
)

await eventually(async () => {
expect(toRequeue).eql(false)
expect(messages).lengthOf(2)
expect(messages[0].message_annotations!["x-opt-annotation-key"]).toBeUndefined()
expect(messages[0].message_annotations!["x-delivery-count"]).toBeUndefined()
expect(messages[1].message_annotations!["x-opt-annotation-key"]).toEqual("annotation-value")
expect(messages[1].message_annotations!["x-delivery-count"]).toEqual("1")
})
})
})