Skip to content

Commit 5cdee4e

Browse files
authored
Merge pull request #16 from coders51/2-as-a-library-user-i-want-to-create-an-exchange-without-using-the-management-ui
2# as a library user i want to create an exchange without using the management UI
2 parents 1a9a836 + 28cc367 commit 5cdee4e

File tree

7 files changed

+244
-2
lines changed

7 files changed

+244
-2
lines changed

cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"ackmode",
55
"ampq",
66
"amqpvalue",
7+
"fanout",
78
"perftest",
89
"prefetch",
910
"RABBITMQ",

src/exchange.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
export type ExchangeType = "direct" | "fanout" | "topic" | "headers"
2+
3+
export type ExchangeOptions = {
4+
arguments: Record<string, string>
5+
auto_delete: boolean
6+
durable: boolean
7+
type: ExchangeType
8+
}
9+
10+
export interface ExchangeInfo {
11+
name: string
12+
arguments: Record<string, string>
13+
autoDelete: boolean
14+
durable: boolean
15+
type: ExchangeType
16+
}
17+
18+
export interface Exchange {
19+
getInfo: ExchangeInfo
20+
}
21+
22+
export type DeletedExchangeInfo = {
23+
name: string
24+
deleted: boolean
25+
}
26+
27+
export class AmqpExchange implements Exchange {
28+
constructor(private readonly info: ExchangeInfo) {}
29+
30+
public get getInfo(): ExchangeInfo {
31+
return this.info
32+
}
33+
}

src/management.ts

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { AmqpExchange, Exchange, ExchangeInfo, ExchangeOptions } from "./exchange.js"
12
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
23
import {
34
EventContext,
@@ -10,7 +11,12 @@ import {
1011
SenderOptions,
1112
} from "rhea"
1213
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
13-
import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js"
14+
import {
15+
CreateExchangeResponseDecoder,
16+
CreateQueueResponseDecoder,
17+
DeleteExchangeResponseDecoder,
18+
DeleteQueueResponseDecoder,
19+
} from "./response_decoder.js"
1420

1521
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
1622
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
@@ -30,6 +36,8 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3036
export interface Management {
3137
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
3238
deleteQueue: (queueName: string) => Promise<boolean>
39+
declareExchange: (exchangeName: string, options: Partial<ExchangeOptions>) => Promise<Exchange>
40+
deleteExchange: (exchangeName: string) => Promise<boolean>
3341
close: () => void
3442
}
3543

@@ -154,6 +162,67 @@ export class AmqpManagement implements Management {
154162
this.senderLink.send(message)
155163
})
156164
}
165+
166+
declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
167+
const exchangeInfo: ExchangeInfo = {
168+
type: options.type ?? "direct",
169+
arguments: options.arguments ?? {},
170+
autoDelete: options.auto_delete ?? false,
171+
durable: options.durable ?? false,
172+
name: exchangeName,
173+
}
174+
return new Promise((res, rej) => {
175+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
176+
if (!context.message) {
177+
return rej(new Error("Receiver has not received any message"))
178+
}
179+
180+
const response = new CreateExchangeResponseDecoder().decodeFrom(context.message, String(message.message_id))
181+
if (response.status === "error") {
182+
return rej(response.error)
183+
}
184+
185+
return res(new AmqpExchange(exchangeInfo))
186+
})
187+
188+
const message = new MessageBuilder()
189+
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
190+
.setReplyTo(ME)
191+
.setAmqpMethod(AmqpMethods.PUT)
192+
.setBody({
193+
type: options.type,
194+
durable: options.durable ?? false,
195+
auto_delete: options.auto_delete ?? false,
196+
})
197+
.build()
198+
199+
this.senderLink.send(message)
200+
})
201+
}
202+
203+
deleteExchange(exchangeName: string): Promise<boolean> {
204+
return new Promise((res, rej) => {
205+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
206+
if (!context.message) {
207+
return rej(new Error("Receiver has not received any message"))
208+
}
209+
210+
const response = new DeleteExchangeResponseDecoder().decodeFrom(context.message, String(message.message_id))
211+
if (response.status === "error") {
212+
return rej(response.error)
213+
}
214+
215+
return res(true)
216+
})
217+
218+
const message = new MessageBuilder()
219+
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
220+
.setReplyTo(ME)
221+
.setAmqpMethod(AmqpMethods.DELETE)
222+
.build()
223+
this.senderLink.send(message)
224+
})
225+
}
157226
}
158227

159228
function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {

src/message_builder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export enum AmqpMethods {
88

99
export enum AmqpEndpoints {
1010
Queues = "queues",
11+
Exchanges = "exchanges",
1112
}
1213

1314
export const ME = "$me"

src/response_decoder.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ interface ResponseDecoder {
66
decodeFrom: (receivedMessage: Message, sentMessageId: string) => Result<unknown, Error>
77
}
88

9+
class VoidResponseDecoder implements ResponseDecoder {
10+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
11+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
12+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
13+
}
14+
15+
return {
16+
status: "ok",
17+
body: undefined,
18+
}
19+
}
20+
}
21+
922
export class CreateQueueResponseDecoder implements ResponseDecoder {
1023
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<QueueInfo, Error> {
1124
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
@@ -45,3 +58,7 @@ export class DeleteQueueResponseDecoder implements ResponseDecoder {
4558
}
4659
}
4760
}
61+
62+
export class CreateExchangeResponseDecoder extends VoidResponseDecoder {}
63+
64+
export class DeleteExchangeResponseDecoder extends VoidResponseDecoder {}

test/e2e/management.test.ts

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
import { Management } from "../../src/index.js"
22
import { afterEach, beforeEach, describe, expect, test } from "vitest"
3-
import { createQueue, eventually, existsQueue, getQueueInfo, host, password, port, username } from "../support/util.js"
3+
import {
4+
createQueue,
5+
existsQueue,
6+
eventually,
7+
createExchange,
8+
existsExchange,
9+
deleteExchange,
10+
getQueueInfo,
11+
host,
12+
password,
13+
port,
14+
username,
15+
getExchangeInfo,
16+
} from "../support/util.js"
417
import { createEnvironment, Environment } from "../../src/environment.js"
518
import { Connection } from "../../src/connection.js"
619

@@ -9,6 +22,8 @@ describe("Management", () => {
922
let connection: Connection
1023
let management: Management
1124

25+
const exchangeName = "test-exchange"
26+
1227
beforeEach(async () => {
1328
environment = createEnvironment({
1429
host,
@@ -18,13 +33,15 @@ describe("Management", () => {
1833
})
1934
connection = await environment.createConnection()
2035
management = connection.management()
36+
await deleteExchange(exchangeName)
2137
})
2238

2339
afterEach(async () => {
2440
try {
2541
await management.close()
2642
await connection.close()
2743
await environment.close()
44+
await deleteExchange(exchangeName)
2845
} catch (error) {
2946
console.error(error)
3047
}
@@ -56,4 +73,36 @@ describe("Management", () => {
5673
expect(await existsQueue("test-queue")).to.eql(false)
5774
})
5875
})
76+
77+
test("create an exchange through the management", async () => {
78+
const exchange = await management.declareExchange(exchangeName, {
79+
type: "headers",
80+
auto_delete: true,
81+
durable: false,
82+
})
83+
84+
await eventually(async () => {
85+
const exchangeInfo = await getExchangeInfo(exchange.getInfo.name)
86+
expect(exchangeInfo.ok).to.eql(true)
87+
expect(exchange.getInfo.name).to.eql(exchangeInfo.body.name)
88+
expect(exchange.getInfo.arguments).to.eql(exchangeInfo.body.arguments)
89+
expect(exchange.getInfo.autoDelete).to.eql(exchangeInfo.body.auto_delete)
90+
expect(exchange.getInfo.durable).to.eql(exchangeInfo.body.durable)
91+
expect(exchange.getInfo.type).to.eql(exchangeInfo.body.type)
92+
})
93+
})
94+
95+
test("delete an exchange through the management", async () => {
96+
await createExchange(exchangeName)
97+
await eventually(async () => {
98+
expect(await existsExchange(exchangeName)).to.eql(true)
99+
})
100+
101+
const result = await management.deleteExchange(exchangeName)
102+
103+
await eventually(async () => {
104+
expect(await existsExchange(exchangeName)).to.eql(false)
105+
expect(result).eql(true)
106+
})
107+
})
59108
})

test/support/util.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ export type QueueInfoResponse = {
1919
type: string
2020
}
2121

22+
export type ExchangeInfoResponse = {
23+
name: string
24+
arguments: Record<string, string>
25+
auto_delete: boolean
26+
durable: boolean
27+
type: string
28+
}
29+
2230
export const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
2331
export const port = parseInt(process.env.RABBITMQ_PORT ?? "5672")
2432
export const managementPort = 15672
@@ -74,6 +82,70 @@ export async function createQueue(queue: string): Promise<boolean> {
7482
return response.ok
7583
}
7684

85+
export async function existsExchange(exchangeName: string): Promise<boolean> {
86+
const response = await getExchangeInfo(exchangeName)
87+
88+
if (!response.ok) {
89+
if (response.statusCode === 404) return false
90+
91+
throw new Error(`HTTPError: ${inspect(response)}`)
92+
}
93+
94+
return response.ok
95+
}
96+
97+
export async function getExchangeInfo(exchange: string): Promise<Response<ExchangeInfoResponse>> {
98+
const response = await got.get<ExchangeInfoResponse>(
99+
`http://${host}:${managementPort}/api/exchanges/${vhost}/${exchange}`,
100+
{
101+
headers: {
102+
Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`,
103+
},
104+
responseType: "json",
105+
throwHttpErrors: false,
106+
}
107+
)
108+
109+
console.log(response.body)
110+
return response
111+
}
112+
113+
export async function createExchange(exchange: string): Promise<Response<unknown>> {
114+
const response = await got.put(`http://${host}:${managementPort}/api/exchanges/${vhost}/${exchange}`, {
115+
headers: {
116+
Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`,
117+
},
118+
responseType: "json",
119+
throwHttpErrors: false,
120+
json: {
121+
type: "direct",
122+
auto_delete: false,
123+
durable: false,
124+
internal: false,
125+
arguments: {},
126+
},
127+
})
128+
129+
console.log(response.body)
130+
return response
131+
}
132+
133+
export async function deleteExchange(exchange: string): Promise<Response<unknown>> {
134+
const response = await got.delete(`http://${host}:${managementPort}/api/exchanges/${vhost}/${exchange}`, {
135+
headers: {
136+
Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`,
137+
},
138+
searchParams: {
139+
"if-unused": true,
140+
},
141+
responseType: "json",
142+
throwHttpErrors: false,
143+
})
144+
145+
console.log(response.body)
146+
return response
147+
}
148+
77149
export async function wait(ms: number) {
78150
return new Promise((res) => {
79151
setTimeout(() => res(true), ms)

0 commit comments

Comments
 (0)