Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"ackmode",
"ampq",
"amqpvalue",
"dste",
"dstq",
"fanout",
"perftest",
"prefetch",
Expand Down
27 changes: 27 additions & 0 deletions src/binding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Exchange } from "./exchange.js"
import { Queue } from "./queue.js"

export type BindingOptions = {
source: Exchange
destination: Exchange | Queue
arguments?: Record<string, string>
}

export type BindingInfo = {
id: string
source: string
destination: string
arguments: Record<string, string>
}

export interface Binding {
getInfo: BindingInfo
}

export class AmqpBinding implements Binding {
constructor(private readonly info: BindingInfo) {}

public get getInfo(): BindingInfo {
return this.info
}
}
108 changes: 99 additions & 9 deletions src/management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import {
} from "rhea"
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
import {
CreateBindingResponseDecoder,
CreateExchangeResponseDecoder,
CreateQueueResponseDecoder,
DeleteBindingResponseDecoder,
DeleteExchangeResponseDecoder,
DeleteQueueResponseDecoder,
} from "./response_decoder.js"
import { AmqpBinding, Binding, BindingInfo, BindingOptions } from "./binding.js"
import { randomUUID } from "crypto"

type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
Expand All @@ -36,8 +40,10 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
export interface Management {
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
deleteQueue: (queueName: string) => Promise<boolean>
declareExchange: (exchangeName: string, options: Partial<ExchangeOptions>) => Promise<Exchange>
declareExchange: (exchangeName: string, options?: Partial<ExchangeOptions>) => Promise<Exchange>
deleteExchange: (exchangeName: string) => Promise<boolean>
bind: (key: string, options: BindingOptions) => Promise<Binding>
unbind: (key: string, options: BindingOptions) => Promise<boolean>
close: () => void
}

Expand All @@ -52,9 +58,7 @@ export class AmqpManagement implements Management {
private readonly connection: RheaConnection,
private senderLink: Sender,
private receiverLink: Receiver
) {
console.log(this.receiverLink.is_open())
}
) {}

private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
return AmqpManagement.openLink<Receiver>(
Expand Down Expand Up @@ -163,12 +167,12 @@ export class AmqpManagement implements Management {
})
}

declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
async declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
const exchangeInfo: ExchangeInfo = {
type: options.type ?? "direct",
arguments: options.arguments ?? {},
autoDelete: options.auto_delete ?? false,
durable: options.durable ?? false,
durable: options.durable ?? true,
name: exchangeName,
}
return new Promise((res, rej) => {
Expand All @@ -190,8 +194,8 @@ export class AmqpManagement implements Management {
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.PUT)
.setBody({
type: options.type,
durable: options.durable ?? false,
type: options.type ?? "direct",
durable: options.durable ?? true,
auto_delete: options.auto_delete ?? false,
})
.build()
Expand All @@ -200,7 +204,7 @@ export class AmqpManagement implements Management {
})
}

deleteExchange(exchangeName: string): Promise<boolean> {
async deleteExchange(exchangeName: string): Promise<boolean> {
return new Promise((res, rej) => {
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
if (!context.message) {
Expand All @@ -223,8 +227,94 @@ export class AmqpManagement implements Management {
this.senderLink.send(message)
})
}

async bind(key: string, options: BindingOptions): Promise<Binding> {
const bindingInfo: BindingInfo = {
id: randomUUID(),
source: options.source.getInfo.name,
destination: options.destination.getInfo.name,
arguments: options.arguments ?? {},
}
return new Promise((res, rej) => {
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
if (!context.message) {
return rej(new Error("Receiver has not received any message"))
}

const response = new CreateBindingResponseDecoder().decodeFrom(context.message, String(message.message_id))
if (response.status === "error") {
return rej(response.error)
}

return res(new AmqpBinding(bindingInfo))
})

const message = new MessageBuilder()
.sendTo(`/${AmqpEndpoints.Bindings}`)
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.POST)
.setBody({
source: options.source.getInfo.name,
binding_key: key,
arguments: options.arguments ?? {},
...buildBindingDestinationFrom(options.destination),
})
.build()
this.senderLink.send(message)
})
}

async unbind(key: string, options: BindingOptions): Promise<boolean> {
return new Promise((res, rej) => {
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
if (!context.message) {
return rej(new Error("Receiver has not received any message"))
}

const response = new DeleteBindingResponseDecoder().decodeFrom(context.message, String(message.message_id))
if (response.status === "error") {
return rej(response.error)
}

return res(true)
})

const message = new MessageBuilder()
.sendTo(
`/${AmqpEndpoints.Bindings}/${buildUnbindEndpointFrom({ source: options.source, destination: options.destination, key })}`
)
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.DELETE)
.build()
this.senderLink.send(message)
})
}
}

function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
}

function buildUnbindEndpointFrom({
source,
destination,
key,
}: {
source: Exchange
destination: Exchange | Queue
key: string
}): string {
if (destination instanceof AmqpExchange) {
return `src=${encodeURIComponent(source.getInfo.name)};dste=${encodeURIComponent(destination.getInfo.name)};key=${encodeURIComponent(key)};args=`
}

return `src=${encodeURIComponent(source.getInfo.name)};dstq=${encodeURIComponent(destination.getInfo.name)};key=${encodeURIComponent(key)};args=`
}

function buildBindingDestinationFrom(destination: Exchange | Queue) {
if (destination instanceof AmqpExchange) {
return { destination_exchange: destination.getInfo.name }
}

return { destination_queue: destination.getInfo.name }
}
2 changes: 2 additions & 0 deletions src/message_builder.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { generate_uuid, Message } from "rhea"

export enum AmqpMethods {
POST = "POST",
PUT = "PUT",
DELETE = "DELETE",
GET = "GET",
Expand All @@ -9,6 +10,7 @@ export enum AmqpMethods {
export enum AmqpEndpoints {
Queues = "queues",
Exchanges = "exchanges",
Bindings = "bindings",
}

export const ME = "$me"
Expand Down
34 changes: 19 additions & 15 deletions src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,6 @@ interface ResponseDecoder {
decodeFrom: (receivedMessage: Message, sentMessageId: string) => Result<unknown, Error>
}

class VoidResponseDecoder implements ResponseDecoder {
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
}

return {
status: "ok",
body: undefined,
}
}
}

export class CreateQueueResponseDecoder implements ResponseDecoder {
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<QueueInfo, Error> {
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
Expand Down Expand Up @@ -59,6 +46,23 @@ export class DeleteQueueResponseDecoder implements ResponseDecoder {
}
}

export class CreateExchangeResponseDecoder extends VoidResponseDecoder {}
class EmptyBodyResponseDecoder implements ResponseDecoder {
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
}

return {
status: "ok",
body: undefined,
}
}
}

export class CreateExchangeResponseDecoder extends EmptyBodyResponseDecoder {}

export class DeleteExchangeResponseDecoder extends EmptyBodyResponseDecoder {}

export class CreateBindingResponseDecoder extends EmptyBodyResponseDecoder {}

export class DeleteExchangeResponseDecoder extends VoidResponseDecoder {}
export class DeleteBindingResponseDecoder extends EmptyBodyResponseDecoder {}
76 changes: 68 additions & 8 deletions test/e2e/management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import {
eventually,
createExchange,
existsExchange,
deleteExchange,
getQueueInfo,
host,
password,
port,
username,
getExchangeInfo,
existsBinding,
cleanRabbit,
} from "../support/util.js"
import { createEnvironment, Environment } from "../../src/environment.js"
import { Connection } from "../../src/connection.js"
Expand All @@ -23,6 +24,8 @@ describe("Management", () => {
let management: Management

const exchangeName = "test-exchange"
const queueName = "test-queue"
const bindingKey = "test-binding"

beforeEach(async () => {
environment = createEnvironment({
Expand All @@ -33,22 +36,22 @@ describe("Management", () => {
})
connection = await environment.createConnection()
management = connection.management()
await deleteExchange(exchangeName)
await cleanRabbit({ match: /test-/ })
})

afterEach(async () => {
try {
await management.close()
await cleanRabbit({ match: /test-/ })
management.close()
await connection.close()
await environment.close()
await deleteExchange(exchangeName)
} catch (error) {
console.error(error)
}
})

test("create a queue through the management", async () => {
const queue = await management.declareQueue("test-queue")
const queue = await management.declareQueue(queueName)

await eventually(async () => {
const queueInfo = await getQueueInfo(queue.getInfo.name)
Expand All @@ -65,12 +68,13 @@ describe("Management", () => {
})

test("delete a queue through the management", async () => {
await createQueue("test-queue")
await createQueue(queueName)

await management.deleteQueue("test-queue")
const result = await management.deleteQueue(queueName)

await eventually(async () => {
expect(await existsQueue("test-queue")).to.eql(false)
expect(await existsQueue(queueName)).to.eql(false)
expect(result).to.eql(true)
})
})

Expand Down Expand Up @@ -105,4 +109,60 @@ describe("Management", () => {
expect(result).eql(true)
})
})

test("create a binding from exchange to queue through the management", async () => {
const exchange = await management.declareExchange(exchangeName)
const queue = await management.declareQueue(queueName)

await management.bind(bindingKey, { source: exchange, destination: queue })

await eventually(async () => {
expect(await existsBinding({ source: exchangeName, destination: queueName, type: "exchangeToQueue" })).to.eql(
true
)
})
})

test("create a binding from exchange to exchange through the management", async () => {
const exchange1 = await management.declareExchange(exchangeName)
const exchange2 = await management.declareExchange(exchangeName + "-2")

await management.bind(bindingKey, { source: exchange1, destination: exchange2 })

await eventually(async () => {
expect(
await existsBinding({ source: exchangeName, destination: exchangeName + "-2", type: "exchangeToExchange" })
).to.eql(true)
})
})

test("delete a binding from exchange to queue with no arguments through the management", async () => {
const exchange = await management.declareExchange(exchangeName)
const queue = await management.declareQueue(queueName)

await management.unbind(bindingKey, { source: exchange, destination: queue })

await eventually(async () => {
expect(await existsBinding({ source: exchangeName, destination: queueName, type: "exchangeToQueue" })).to.eql(
false
)
expect(await existsQueue(queue.getInfo.name)).to.eql(true)
expect(await existsExchange(exchange.getInfo.name)).to.eql(true)
})
})

test("delete a binding from exchange to exchange with no arguments through the management", async () => {
const exchange = await management.declareExchange(exchangeName)
const exchange2 = await management.declareExchange(exchangeName + "-2")

await management.unbind(bindingKey, { source: exchange, destination: exchange2 })

await eventually(async () => {
expect(
await existsBinding({ source: exchangeName, destination: exchangeName + "-2", type: "exchangeToExchange" })
).to.eql(false)
expect(await existsExchange(exchange.getInfo.name)).to.eql(true)
expect(await existsExchange(exchangeName + "-2")).to.eql(true)
})
})
})
Loading