Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ import { AmqpManagement, Management } from "./management.js"
import { EnvironmentParams } from "./environment.js"
import { AmqpPublisher, Publisher } from "./publisher.js"
import { DestinationOptions } from "./message.js"
import { AmqpConsumer, Consumer, CreateConsumerParams } from "./consumer.js"

export interface Connection {
close(): Promise<boolean>
isOpen(): boolean
management(): Management
createPublisher(options?: DestinationOptions): Promise<Publisher>
get publishers(): Map<string, Publisher>
get consumers(): Map<string, Consumer>
createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer>
}

export class AmqpConnection implements Connection {
private _publishers: Map<string, Publisher> = new Map<string, Publisher>()
private _consumers: Map<string, Consumer> = new Map<string, Consumer>()

static async create(params: EnvironmentParams) {
const connection = await AmqpConnection.open(params)
Expand Down Expand Up @@ -50,10 +54,17 @@ export class AmqpConnection implements Connection {
})

this._publishers.forEach((p) => p.close())
this._consumers.forEach((p) => p.close())
this.connection.close()
})
}

async createConsumer(queueName: string, params: CreateConsumerParams): Promise<Consumer> {
const consumer = await AmqpConsumer.createFrom(this.connection, this._consumers, queueName, params)
this._consumers.set(consumer.id, consumer)
return consumer
}

management(): Management {
return this.topologyManagement
}
Expand All @@ -68,6 +79,10 @@ export class AmqpConnection implements Connection {
return this._publishers
}

public get consumers(): Map<string, Consumer> {
return this._consumers
}

public isOpen(): boolean {
return this.connection ? this.connection.is_open() : false
}
Expand Down
102 changes: 102 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import {
generate_uuid,
Receiver,
ReceiverEvents,
ReceiverOptions,
Connection,
SenderOptions,
EventContext,
Message,
} from "rhea"
import { openLink } from "./utils.js"
import { createAddressFrom } from "./message.js"

export type ConsumerMessageHandler = (message: Message) => void

export type CreateConsumerParams = {
messageHandler: ConsumerMessageHandler
}

const getConsumerReceiverLinkConfigurationFrom = (
address: string,
consumerId: string
): SenderOptions | ReceiverOptions => ({
snd_settle_mode: 0,
rcv_settle_mode: 0,
name: consumerId,
target: { address, expiry_policy: "SESSION_END", durable: 0, dynamic: false },
source: {
address,
expiry_policy: "LINK_DETACH",
timeout: 0,
dynamic: false,
durable: 0,
},
})

export interface Consumer {
start(): void
close(): void
get id(): string
}

export class AmqpConsumer implements Consumer {
static async createFrom(
connection: Connection,
consumersList: Map<string, Consumer>,
queueName: string,
params: CreateConsumerParams
) {
const id = generate_uuid()
const address = createAddressFrom({ queue: { name: queueName } })
const receiverLink = await AmqpConsumer.openReceiver(connection, address, id)
return new AmqpConsumer(id, connection, consumersList, receiverLink, params)
}

private static async openReceiver(connection: Connection, address: string, consumerId: string): Promise<Receiver> {
return openLink<Receiver>(
connection,
ReceiverEvents.receiverOpen,
ReceiverEvents.receiverError,
connection.open_receiver.bind(connection),
getConsumerReceiverLinkConfigurationFrom(address, consumerId)
)
}

constructor(
private readonly _id: string,
private readonly connection: Connection,
private readonly consumersList: Map<string, Consumer>,
private readonly receiverLink: Receiver,
private readonly params: CreateConsumerParams
) {
console.log(this.connection.container_id)
}

get id() {
return this._id
}

start() {
this.receiverLink.on(ReceiverEvents.message, (context: EventContext) => {
console.log("message received", context.message?.body)
if (context.message && context.delivery) {
console.log("message accepted")
try {
this.params.messageHandler(context.message)
context.delivery.accept()
console.log("message consumed")
} catch (e) {
context.delivery.reject({ condition: "Message Handler error", info: e })
console.log("message rejected")
}
}
})
}

close() {
this.receiverLink.removeAllListeners()
if (this.receiverLink.is_open()) this.receiverLink.close()
if (this.consumersList.has(this._id)) this.consumersList.delete(this._id)
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export { Management, AmqpManagement } from "./management.js"
export { Environment, AmqpEnvironment } from "./environment.js"
export { Connection, AmqpConnection } from "./connection.js"
export { Publisher, AmqpPublisher } from "./publisher.js"
export { Consumer, AmqpConsumer } from "./consumer.js"
export { createAmqpMessage } from "./message.js"
35 changes: 35 additions & 0 deletions test/e2e/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,39 @@ describe("Connection", () => {

expect(newConnection.publishers.size).eql(0)
})

test("create a consumer linked to a queue", async () => {
await connection.createConsumer(queueName, {
messageHandler: async (msg) => {
console.log(msg)
},
})

expect(connection.consumers.size).eql(1)
})

test("close a consumer", async () => {
const consumer = await connection.createConsumer(queueName, {
messageHandler: async (msg) => {
console.log(msg)
},
})

consumer.close()

expect(connection.consumers.size).eql(0)
})

test("closing the connection also closes the consumer", async () => {
const newConnection = await environment.createConnection()
await newConnection.createConsumer(queueName, {
messageHandler: async (msg) => {
console.log(msg)
},
})

await newConnection.close()

expect(newConnection.consumers.size).eql(0)
})
})
107 changes: 107 additions & 0 deletions test/e2e/consumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { Management } from "../../src/index.js"
import { afterEach, beforeEach, describe, expect, test } from "vitest"
import { eventually, host, password, port, username, cleanRabbit } from "../support/util.js"
import { createEnvironment, Environment } from "../../src/environment.js"
import { Connection } from "../../src/connection.js"
import { Queue } from "../../src/queue.js"
import { Exchange } from "../../src/exchange.js"
import { createAmqpMessage } from "../../src/message.js"

describe("Consumer", () => {
let environment: Environment
let connection: Connection
let management: Management
let queue: Queue
let exchange: Exchange

const exchangeName = "test-exchange"
const queueName = "test-queue"
const bindingKey = "test-binding"

beforeEach(async () => {
environment = createEnvironment({
host,
port,
username,
password,
})
connection = await environment.createConnection()
management = connection.management()
queue = await management.declareQueue(queueName)
exchange = await management.declareExchange(exchangeName)
await management.bind(bindingKey, { source: exchange, destination: queue })
})

afterEach(async () => {
try {
await cleanRabbit({ match: /test-/ })
await connection.close()
await environment.close()
} catch (error) {
console.error(error)
}
})

test("consumer can handle message on exchange", async () => {
const publisher = await connection.createPublisher({ exchange: { name: exchangeName, routingKey: bindingKey } })
const expectedBody = "ciao"
await publisher.publish(createAmqpMessage({ body: expectedBody }))
let received: string = ""

const consumer = await connection.createConsumer(queueName, {
messageHandler: (message) => {
received = message.body
},
})
consumer.start()

await eventually(() => {
expect(received).to.be.eql(expectedBody)
})
})

test.skip("consumer can handle message on exchange, destination on message", async () => {
const publisher = await connection.createPublisher()
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
destination: { exchange: { name: exchangeName, routingKey: bindingKey } },
})
)
let received: string = ""

const consumer = await connection.createConsumer(queueName, {
messageHandler: (message) => {
received = message.body
},
})
consumer.start()

await eventually(() => {
expect(received).to.be.eql(expectedBody)
})
})

test("consumer can handle message on queue", async () => {
const publisher = await connection.createPublisher({ queue: { name: queueName } })
const expectedBody = "ciao"
await publisher.publish(
createAmqpMessage({
body: expectedBody,
})
)
let received: string = ""

const consumer = await connection.createConsumer(queueName, {
messageHandler: (message) => {
received = message.body
},
})
consumer.start()

await eventually(() => {
expect(received).to.be.eql(expectedBody)
})
})
})