Skip to content

Commit 0dfee96

Browse files
committed
added response
1 parent 28afefd commit 0dfee96

File tree

6 files changed

+128
-35
lines changed

6 files changed

+128
-35
lines changed

src/exchange.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
export type ExchangeType = "direct" | "fanout" | "topic" | "headers"
22

33
export type ExchangeOptions = {
4+
arguments: Record<string, string>
45
auto_delete: boolean
56
durable: boolean
67
type: ExchangeType
78
}
89

910
export interface ExchangeInfo {
1011
name: string
12+
arguments: Record<string, string>
13+
autoDelete: boolean
14+
durable: boolean
15+
type: string
1116
}
1217

13-
export class AmqpExchangeInfo implements ExchangeInfo {
14-
private exchangeName: string
18+
export interface Exchange {
19+
getInfo: ExchangeInfo
20+
}
1521

16-
constructor(params: { name: string }) {
17-
this.exchangeName = params.name
18-
}
22+
export type DeletedExchangeInfo = {
23+
name: string
24+
deleted: boolean
25+
}
26+
27+
export class AmqpExchange implements Exchange {
28+
constructor(private readonly info: ExchangeInfo) {}
1929

20-
public get name(): string {
21-
return this.exchangeName
30+
public get getInfo(): ExchangeInfo {
31+
return this.info
2232
}
2333
}

src/management.ts

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AmqpExchangeInfo, ExchangeInfo, ExchangeOptions } from "./exchange.js"
1+
import { AmqpExchange, Exchange, ExchangeInfo, ExchangeOptions } from "./exchange.js"
22
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
33
import {
44
EventContext,
@@ -11,7 +11,12 @@ import {
1111
SenderOptions,
1212
} from "rhea"
1313
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
14-
import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js"
14+
import {
15+
CreateExchangeResponseDecoder,
16+
CreateQueueResponseDecoder,
17+
DeleteExchangeResponseDecoder,
18+
DeleteQueueResponseDecoder,
19+
} from "./response_decoder.js"
1520

1621
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
1722
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
@@ -31,8 +36,8 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3136
export interface Management {
3237
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
3338
deleteQueue: (queueName: string) => Promise<boolean>
34-
declareExchange: (exchangeName: string, options: Partial<ExchangeOptions>) => ExchangeInfo
35-
deleteExchange: (exchangeName: string) => void
39+
declareExchange: (exchangeName: string, options: Partial<ExchangeOptions>) => Promise<Exchange>
40+
deleteExchange: (exchangeName: string) => Promise<boolean>
3641
close: () => void
3742
}
3843

@@ -158,28 +163,64 @@ export class AmqpManagement implements Management {
158163
})
159164
}
160165

161-
declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): ExchangeInfo {
162-
// decode the response
163-
// create queueInfo
166+
declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
167+
const exchangeInfo: ExchangeInfo = {
168+
type: options.type ?? "direct",
169+
arguments: options.arguments ?? {},
170+
autoDelete: options.auto_delete ?? false,
171+
durable: options.durable ?? false,
172+
name: exchangeName,
173+
}
174+
return new Promise((res, rej) => {
175+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
176+
if (!context.message) {
177+
return rej(new Error("Receiver has not received any message"))
178+
}
164179

165-
this.senderLink.send({
166-
message_id: generate_uuid(),
167-
to: `/exchanges/${encodeURIComponent(exchangeName)}`,
168-
reply_to: "$me",
169-
subject: "PUT",
170-
body: options,
171-
})
180+
const response = new CreateExchangeResponseDecoder().decodeFrom(context.message, String(message.message_id))
181+
if (response.status === "error") {
182+
return rej(response.error)
183+
}
184+
185+
return res(new AmqpExchange(exchangeInfo))
186+
})
187+
188+
const message = new MessageBuilder()
189+
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
190+
.setReplyTo(ME)
191+
.setAmqpMethod(AmqpMethods.PUT)
192+
.setBody({
193+
type: options.type,
194+
durable: options.durable ?? false,
195+
auto_delete: options.auto_delete ?? false,
196+
})
197+
.build()
172198

173-
return new AmqpExchangeInfo({ name: exchangeName })
199+
this.senderLink.send(message)
200+
})
174201
}
175202

176-
deleteExchange(exchangeName: string) {
177-
this.senderLink.send({
178-
message_id: generate_uuid(),
179-
to: `/exchanges/${encodeURIComponent(exchangeName)}`,
180-
reply_to: "$me",
181-
subject: "DELETE",
182-
body: null,
203+
deleteExchange(exchangeName: string): Promise<boolean> {
204+
return new Promise((res, rej) => {
205+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
206+
if (!context.message) {
207+
return rej(new Error("Receiver has not received any message"))
208+
}
209+
210+
const response = new DeleteExchangeResponseDecoder().decodeFrom(context.message, String(message.message_id))
211+
if (response.status === "error") {
212+
return rej(response.error)
213+
}
214+
215+
return res(true)
216+
})
217+
218+
const message = new MessageBuilder()
219+
.sendTo(`/${AmqpEndpoints.Exchanges}/${encodeURIComponent(exchangeName)}`)
220+
.setReplyTo(ME)
221+
.setAmqpMethod(AmqpMethods.DELETE)
222+
.build()
223+
this.senderLink.send(message)
183224
})
184225
}
185226
}

src/message_builder.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export enum AmqpMethods {
88

99
export enum AmqpEndpoints {
1010
Queues = "queues",
11+
Exchanges = "exchanges",
1112
}
1213

1314
export const ME = "$me"

src/response_decoder.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,29 @@ export class DeleteQueueResponseDecoder implements ResponseDecoder {
4545
}
4646
}
4747
}
48+
49+
export class CreateExchangeResponseDecoder implements ResponseDecoder {
50+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
51+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
52+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
53+
}
54+
55+
return {
56+
status: "ok",
57+
body: undefined,
58+
}
59+
}
60+
}
61+
62+
export class DeleteExchangeResponseDecoder implements ResponseDecoder {
63+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
64+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
65+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
66+
}
67+
68+
return {
69+
status: "ok",
70+
body: undefined,
71+
}
72+
}
73+
}

test/e2e/management.test.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
password,
1313
port,
1414
username,
15+
getExchangeInfo,
1516
} from "../support/util.js"
1617
import { createEnvironment, Environment } from "../../src/environment.js"
1718
import { Connection } from "../../src/connection.js"
@@ -74,15 +75,20 @@ describe("Management", () => {
7475
})
7576

7677
test("create an exchange through the management", async () => {
77-
const exchangeInfo = management.declareExchange(exchangeName, {
78+
const exchange = await management.declareExchange(exchangeName, {
7879
type: "headers",
7980
auto_delete: true,
8081
durable: false,
8182
})
8283

83-
expect(exchangeInfo.name).to.eql(exchangeName)
8484
await eventually(async () => {
85-
expect(await existsExchange(exchangeInfo.name)).to.eql(true)
85+
const exchangeInfo = await getExchangeInfo(exchange.getInfo.name)
86+
expect(exchangeInfo.ok).to.eql(true)
87+
expect(exchange.getInfo.name).to.eql(exchangeInfo.body.name)
88+
expect(exchange.getInfo.arguments).to.eql(exchangeInfo.body.arguments)
89+
expect(exchange.getInfo.autoDelete).to.eql(exchangeInfo.body.auto_delete)
90+
expect(exchange.getInfo.durable).to.eql(exchangeInfo.body.durable)
91+
expect(exchange.getInfo.type).to.eql(exchangeInfo.body.type)
8692
})
8793
})
8894

@@ -92,10 +98,11 @@ describe("Management", () => {
9298
expect(await existsExchange(exchangeName)).to.eql(true)
9399
})
94100

95-
management.deleteExchange(exchangeName)
101+
const result = await management.deleteExchange(exchangeName)
96102

97103
await eventually(async () => {
98104
expect(await existsExchange(exchangeName)).to.eql(false)
105+
expect(result).eql(true)
99106
})
100107
})
101108
})

test/support/util.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ export type QueueInfoResponse = {
1919
type: string
2020
}
2121

22+
export type ExchangeInfoResponse = {
23+
name: string
24+
arguments: Record<string, string>
25+
auto_delete: boolean
26+
durable: boolean
27+
type: string
28+
}
29+
2230
export const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
2331
export const port = parseInt(process.env.RABBITMQ_PORT ?? "5672")
2432
export const managementPort = 15672
@@ -86,8 +94,8 @@ export async function existsExchange(exchangeName: string): Promise<boolean> {
8694
return response.ok
8795
}
8896

89-
async function getExchangeInfo(exchange: string): Promise<Response<QueueInfoResponse>> {
90-
const response = await got.get<QueueInfoResponse>(
97+
export async function getExchangeInfo(exchange: string): Promise<Response<ExchangeInfoResponse>> {
98+
const response = await got.get<ExchangeInfoResponse>(
9199
`http://${host}:${managementPort}/api/exchanges/${vhost}/${exchange}`,
92100
{
93101
headers: {

0 commit comments

Comments
 (0)