Skip to content

Commit 8b8b1eb

Browse files
authored
Merge pull request #20 from coders51/3-as-a-library-user-i-want-to-create-a-binding-without-using-the-management-ui
[IS-3/feat]: add bind and unbind
2 parents d01efd7 + e3288a5 commit 8b8b1eb

File tree

8 files changed

+364
-44
lines changed

8 files changed

+364
-44
lines changed

cspell.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
"ackmode",
55
"ampq",
66
"amqpvalue",
7+
"dste",
8+
"dstq",
79
"fanout",
810
"perftest",
911
"prefetch",

src/binding.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { Exchange } from "./exchange.js"
2+
import { Queue } from "./queue.js"
3+
4+
export type BindingOptions = {
5+
source: Exchange
6+
destination: Exchange | Queue
7+
arguments?: Record<string, string>
8+
}
9+
10+
export type BindingInfo = {
11+
id: string
12+
source: string
13+
destination: string
14+
arguments: Record<string, string>
15+
}
16+
17+
export interface Binding {
18+
getInfo: BindingInfo
19+
}
20+
21+
export class AmqpBinding implements Binding {
22+
constructor(private readonly info: BindingInfo) {}
23+
24+
public get getInfo(): BindingInfo {
25+
return this.info
26+
}
27+
}

src/management.ts

Lines changed: 99 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ import {
1212
} from "rhea"
1313
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
1414
import {
15+
CreateBindingResponseDecoder,
1516
CreateExchangeResponseDecoder,
1617
CreateQueueResponseDecoder,
18+
DeleteBindingResponseDecoder,
1719
DeleteExchangeResponseDecoder,
1820
DeleteQueueResponseDecoder,
1921
} from "./response_decoder.js"
22+
import { AmqpBinding, Binding, BindingInfo, BindingOptions } from "./binding.js"
23+
import { randomUUID } from "crypto"
2024

2125
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
2226
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
@@ -36,8 +40,10 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3640
export interface Management {
3741
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
3842
deleteQueue: (queueName: string) => Promise<boolean>
39-
declareExchange: (exchangeName: string, options: Partial<ExchangeOptions>) => Promise<Exchange>
43+
declareExchange: (exchangeName: string, options?: Partial<ExchangeOptions>) => Promise<Exchange>
4044
deleteExchange: (exchangeName: string) => Promise<boolean>
45+
bind: (key: string, options: BindingOptions) => Promise<Binding>
46+
unbind: (key: string, options: BindingOptions) => Promise<boolean>
4147
close: () => void
4248
}
4349

@@ -52,9 +58,7 @@ export class AmqpManagement implements Management {
5258
private readonly connection: RheaConnection,
5359
private senderLink: Sender,
5460
private receiverLink: Receiver
55-
) {
56-
console.log(this.receiverLink.is_open())
57-
}
61+
) {}
5862

5963
private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
6064
return AmqpManagement.openLink<Receiver>(
@@ -163,12 +167,12 @@ export class AmqpManagement implements Management {
163167
})
164168
}
165169

166-
declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
170+
async declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
167171
const exchangeInfo: ExchangeInfo = {
168172
type: options.type ?? "direct",
169173
arguments: options.arguments ?? {},
170174
autoDelete: options.auto_delete ?? false,
171-
durable: options.durable ?? false,
175+
durable: options.durable ?? true,
172176
name: exchangeName,
173177
}
174178
return new Promise((res, rej) => {
@@ -190,8 +194,8 @@ export class AmqpManagement implements Management {
190194
.setReplyTo(ME)
191195
.setAmqpMethod(AmqpMethods.PUT)
192196
.setBody({
193-
type: options.type,
194-
durable: options.durable ?? false,
197+
type: options.type ?? "direct",
198+
durable: options.durable ?? true,
195199
auto_delete: options.auto_delete ?? false,
196200
})
197201
.build()
@@ -200,7 +204,7 @@ export class AmqpManagement implements Management {
200204
})
201205
}
202206

203-
deleteExchange(exchangeName: string): Promise<boolean> {
207+
async deleteExchange(exchangeName: string): Promise<boolean> {
204208
return new Promise((res, rej) => {
205209
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
206210
if (!context.message) {
@@ -223,8 +227,94 @@ export class AmqpManagement implements Management {
223227
this.senderLink.send(message)
224228
})
225229
}
230+
231+
async bind(key: string, options: BindingOptions): Promise<Binding> {
232+
const bindingInfo: BindingInfo = {
233+
id: randomUUID(),
234+
source: options.source.getInfo.name,
235+
destination: options.destination.getInfo.name,
236+
arguments: options.arguments ?? {},
237+
}
238+
return new Promise((res, rej) => {
239+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
240+
if (!context.message) {
241+
return rej(new Error("Receiver has not received any message"))
242+
}
243+
244+
const response = new CreateBindingResponseDecoder().decodeFrom(context.message, String(message.message_id))
245+
if (response.status === "error") {
246+
return rej(response.error)
247+
}
248+
249+
return res(new AmqpBinding(bindingInfo))
250+
})
251+
252+
const message = new MessageBuilder()
253+
.sendTo(`/${AmqpEndpoints.Bindings}`)
254+
.setReplyTo(ME)
255+
.setAmqpMethod(AmqpMethods.POST)
256+
.setBody({
257+
source: options.source.getInfo.name,
258+
binding_key: key,
259+
arguments: options.arguments ?? {},
260+
...buildBindingDestinationFrom(options.destination),
261+
})
262+
.build()
263+
this.senderLink.send(message)
264+
})
265+
}
266+
267+
async unbind(key: string, options: BindingOptions): Promise<boolean> {
268+
return new Promise((res, rej) => {
269+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
270+
if (!context.message) {
271+
return rej(new Error("Receiver has not received any message"))
272+
}
273+
274+
const response = new DeleteBindingResponseDecoder().decodeFrom(context.message, String(message.message_id))
275+
if (response.status === "error") {
276+
return rej(response.error)
277+
}
278+
279+
return res(true)
280+
})
281+
282+
const message = new MessageBuilder()
283+
.sendTo(
284+
`/${AmqpEndpoints.Bindings}/${buildUnbindEndpointFrom({ source: options.source, destination: options.destination, key })}`
285+
)
286+
.setReplyTo(ME)
287+
.setAmqpMethod(AmqpMethods.DELETE)
288+
.build()
289+
this.senderLink.send(message)
290+
})
291+
}
226292
}
227293

228294
function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
229295
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
230296
}
297+
298+
function buildUnbindEndpointFrom({
299+
source,
300+
destination,
301+
key,
302+
}: {
303+
source: Exchange
304+
destination: Exchange | Queue
305+
key: string
306+
}): string {
307+
if (destination instanceof AmqpExchange) {
308+
return `src=${encodeURIComponent(source.getInfo.name)};dste=${encodeURIComponent(destination.getInfo.name)};key=${encodeURIComponent(key)};args=`
309+
}
310+
311+
return `src=${encodeURIComponent(source.getInfo.name)};dstq=${encodeURIComponent(destination.getInfo.name)};key=${encodeURIComponent(key)};args=`
312+
}
313+
314+
function buildBindingDestinationFrom(destination: Exchange | Queue) {
315+
if (destination instanceof AmqpExchange) {
316+
return { destination_exchange: destination.getInfo.name }
317+
}
318+
319+
return { destination_queue: destination.getInfo.name }
320+
}

src/message_builder.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { generate_uuid, Message } from "rhea"
22

33
export enum AmqpMethods {
4+
POST = "POST",
45
PUT = "PUT",
56
DELETE = "DELETE",
67
GET = "GET",
@@ -9,6 +10,7 @@ export enum AmqpMethods {
910
export enum AmqpEndpoints {
1011
Queues = "queues",
1112
Exchanges = "exchanges",
13+
Bindings = "bindings",
1214
}
1315

1416
export const ME = "$me"

src/response_decoder.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,6 @@ interface ResponseDecoder {
66
decodeFrom: (receivedMessage: Message, sentMessageId: string) => Result<unknown, Error>
77
}
88

9-
class VoidResponseDecoder implements ResponseDecoder {
10-
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
11-
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
12-
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
13-
}
14-
15-
return {
16-
status: "ok",
17-
body: undefined,
18-
}
19-
}
20-
}
21-
229
export class CreateQueueResponseDecoder implements ResponseDecoder {
2310
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<QueueInfo, Error> {
2411
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
@@ -59,6 +46,23 @@ export class DeleteQueueResponseDecoder implements ResponseDecoder {
5946
}
6047
}
6148

62-
export class CreateExchangeResponseDecoder extends VoidResponseDecoder {}
49+
class EmptyBodyResponseDecoder implements ResponseDecoder {
50+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<void, Error> {
51+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
52+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
53+
}
54+
55+
return {
56+
status: "ok",
57+
body: undefined,
58+
}
59+
}
60+
}
61+
62+
export class CreateExchangeResponseDecoder extends EmptyBodyResponseDecoder {}
63+
64+
export class DeleteExchangeResponseDecoder extends EmptyBodyResponseDecoder {}
65+
66+
export class CreateBindingResponseDecoder extends EmptyBodyResponseDecoder {}
6367

64-
export class DeleteExchangeResponseDecoder extends VoidResponseDecoder {}
68+
export class DeleteBindingResponseDecoder extends EmptyBodyResponseDecoder {}

test/e2e/management.test.ts

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import {
66
eventually,
77
createExchange,
88
existsExchange,
9-
deleteExchange,
109
getQueueInfo,
1110
host,
1211
password,
1312
port,
1413
username,
1514
getExchangeInfo,
15+
existsBinding,
16+
cleanRabbit,
1617
} from "../support/util.js"
1718
import { createEnvironment, Environment } from "../../src/environment.js"
1819
import { Connection } from "../../src/connection.js"
@@ -23,6 +24,8 @@ describe("Management", () => {
2324
let management: Management
2425

2526
const exchangeName = "test-exchange"
27+
const queueName = "test-queue"
28+
const bindingKey = "test-binding"
2629

2730
beforeEach(async () => {
2831
environment = createEnvironment({
@@ -33,22 +36,22 @@ describe("Management", () => {
3336
})
3437
connection = await environment.createConnection()
3538
management = connection.management()
36-
await deleteExchange(exchangeName)
39+
await cleanRabbit({ match: /test-/ })
3740
})
3841

3942
afterEach(async () => {
4043
try {
41-
await management.close()
44+
await cleanRabbit({ match: /test-/ })
45+
management.close()
4246
await connection.close()
4347
await environment.close()
44-
await deleteExchange(exchangeName)
4548
} catch (error) {
4649
console.error(error)
4750
}
4851
})
4952

5053
test("create a queue through the management", async () => {
51-
const queue = await management.declareQueue("test-queue")
54+
const queue = await management.declareQueue(queueName)
5255

5356
await eventually(async () => {
5457
const queueInfo = await getQueueInfo(queue.getInfo.name)
@@ -65,12 +68,13 @@ describe("Management", () => {
6568
})
6669

6770
test("delete a queue through the management", async () => {
68-
await createQueue("test-queue")
71+
await createQueue(queueName)
6972

70-
await management.deleteQueue("test-queue")
73+
const result = await management.deleteQueue(queueName)
7174

7275
await eventually(async () => {
73-
expect(await existsQueue("test-queue")).to.eql(false)
76+
expect(await existsQueue(queueName)).to.eql(false)
77+
expect(result).to.eql(true)
7478
})
7579
})
7680

@@ -105,4 +109,60 @@ describe("Management", () => {
105109
expect(result).eql(true)
106110
})
107111
})
112+
113+
test("create a binding from exchange to queue through the management", async () => {
114+
const exchange = await management.declareExchange(exchangeName)
115+
const queue = await management.declareQueue(queueName)
116+
117+
await management.bind(bindingKey, { source: exchange, destination: queue })
118+
119+
await eventually(async () => {
120+
expect(await existsBinding({ source: exchangeName, destination: queueName, type: "exchangeToQueue" })).to.eql(
121+
true
122+
)
123+
})
124+
})
125+
126+
test("create a binding from exchange to exchange through the management", async () => {
127+
const exchange1 = await management.declareExchange(exchangeName)
128+
const exchange2 = await management.declareExchange(exchangeName + "-2")
129+
130+
await management.bind(bindingKey, { source: exchange1, destination: exchange2 })
131+
132+
await eventually(async () => {
133+
expect(
134+
await existsBinding({ source: exchangeName, destination: exchangeName + "-2", type: "exchangeToExchange" })
135+
).to.eql(true)
136+
})
137+
})
138+
139+
test("delete a binding from exchange to queue with no arguments through the management", async () => {
140+
const exchange = await management.declareExchange(exchangeName)
141+
const queue = await management.declareQueue(queueName)
142+
143+
await management.unbind(bindingKey, { source: exchange, destination: queue })
144+
145+
await eventually(async () => {
146+
expect(await existsBinding({ source: exchangeName, destination: queueName, type: "exchangeToQueue" })).to.eql(
147+
false
148+
)
149+
expect(await existsQueue(queue.getInfo.name)).to.eql(true)
150+
expect(await existsExchange(exchange.getInfo.name)).to.eql(true)
151+
})
152+
})
153+
154+
test("delete a binding from exchange to exchange with no arguments through the management", async () => {
155+
const exchange = await management.declareExchange(exchangeName)
156+
const exchange2 = await management.declareExchange(exchangeName + "-2")
157+
158+
await management.unbind(bindingKey, { source: exchange, destination: exchange2 })
159+
160+
await eventually(async () => {
161+
expect(
162+
await existsBinding({ source: exchangeName, destination: exchangeName + "-2", type: "exchangeToExchange" })
163+
).to.eql(false)
164+
expect(await existsExchange(exchange.getInfo.name)).to.eql(true)
165+
expect(await existsExchange(exchangeName + "-2")).to.eql(true)
166+
})
167+
})
108168
})

0 commit comments

Comments
 (0)