Skip to content

Commit 1a9a836

Browse files
authored
Merge pull request #17 from coders51/4-as-a-library-user-i-want-to-create-a-queue-without-using-the-management-ui
[IS-4/feat]: add creation and deletion of queues
2 parents 406be3a + 6f1235c commit 1a9a836

File tree

12 files changed

+630
-195
lines changed

12 files changed

+630
-195
lines changed

src/connection.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,57 @@
1-
import { ConnectionEvents, Connection as RheaConnection } from "rhea"
1+
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
2+
import { AmqpManagement, Management } from "./management.js"
3+
import { EnvironmentParams } from "./environment.js"
24

35
export interface Connection {
46
close(): Promise<boolean>
57
isOpen(): boolean
8+
management(): Management
69
}
710

811
export class AmqpConnection implements Connection {
9-
private readonly rheaConnection: RheaConnection
12+
static async create(params: EnvironmentParams) {
13+
const connection = await AmqpConnection.open(params)
14+
const topologyManagement = await AmqpManagement.create(connection)
15+
return new AmqpConnection(connection, topologyManagement)
16+
}
17+
18+
private static async open(params: EnvironmentParams): Promise<RheaConnection> {
19+
return new Promise((res, rej) => {
20+
const container = create_container()
21+
container.once(ConnectionEvents.connectionOpen, (context) => {
22+
return res(context.connection)
23+
})
24+
container.once(ConnectionEvents.error, (context) => {
25+
return rej(context.error ?? new Error("Connection error occurred"))
26+
})
1027

11-
constructor(connection: RheaConnection) {
12-
this.rheaConnection = connection
28+
container.connect(params)
29+
})
1330
}
1431

32+
constructor(
33+
private readonly connection: RheaConnection,
34+
private readonly topologyManagement: Management
35+
) {}
36+
1537
async close(): Promise<boolean> {
1638
return new Promise((res, rej) => {
17-
this.rheaConnection.once(ConnectionEvents.connectionClose, () => {
39+
this.connection.once(ConnectionEvents.connectionClose, () => {
1840
return res(true)
1941
})
20-
this.rheaConnection.once(ConnectionEvents.connectionError, (context) => {
42+
this.connection.once(ConnectionEvents.connectionError, (context) => {
2143
return rej(new Error("Connection error: " + context.connection.error))
2244
})
2345

24-
this.rheaConnection.close()
46+
this.connection.close()
2547
})
2648
}
2749

50+
management(): Management {
51+
return this.topologyManagement
52+
}
53+
2854
public isOpen(): boolean {
29-
return this.rheaConnection.is_open()
55+
return this.connection ? this.connection.is_open() : false
3056
}
3157
}

src/environment.ts

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
import { ConnectionEvents, Container, create_container } from "rhea"
21
import { AmqpConnection, Connection } from "./connection.js"
3-
import { Connection as RheaConnection } from "rhea"
42

53
export interface Environment {
64
createConnection(): Promise<Connection>
@@ -15,42 +13,17 @@ export type EnvironmentParams = {
1513
}
1614

1715
export class AmqpEnvironment implements Environment {
18-
private readonly host: string
19-
private readonly port: number
20-
private readonly username: string
21-
private readonly password: string
22-
private readonly container: Container
23-
private readonly connections: Connection[] = []
24-
25-
constructor({ host, port, username, password }: EnvironmentParams) {
26-
this.host = host
27-
this.port = port
28-
this.username = username
29-
this.password = password
30-
this.container = create_container()
31-
}
16+
constructor(
17+
private readonly params: EnvironmentParams,
18+
private readonly connections: Connection[] = []
19+
) {}
3220

3321
async createConnection(): Promise<Connection> {
34-
const rheaConnection = await this.openConnection()
35-
const connection = new AmqpConnection(rheaConnection)
22+
const connection = await AmqpConnection.create(this.params)
3623
this.connections.push(connection)
37-
3824
return connection
3925
}
4026

41-
private async openConnection(): Promise<RheaConnection> {
42-
return new Promise((res, rej) => {
43-
this.container.once(ConnectionEvents.connectionOpen, (context) => {
44-
return res(context.connection)
45-
})
46-
this.container.once(ConnectionEvents.error, (context) => {
47-
return rej(context.error ?? new Error("Connection error occurred"))
48-
})
49-
50-
this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password })
51-
})
52-
}
53-
5427
async close(): Promise<void> {
5528
await this.closeConnections()
5629
this.connections.length = 0

src/management.ts

Lines changed: 151 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,161 @@
1-
import { AmqpQueueSpec, QueueSpec } from "./queue.js"
1+
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
2+
import {
3+
EventContext,
4+
Receiver,
5+
ReceiverEvents,
6+
ReceiverOptions,
7+
Connection as RheaConnection,
8+
Sender,
9+
SenderEvents,
10+
SenderOptions,
11+
} from "rhea"
12+
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
13+
import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js"
14+
15+
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
16+
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
17+
type OpenLinkMethods =
18+
| ((options?: SenderOptions | string) => Sender)
19+
| ((options?: ReceiverOptions | string) => Receiver)
20+
21+
const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
22+
snd_settle_mode: 1,
23+
rcv_settle_mode: 0,
24+
name: "management-link-pair",
25+
target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false },
26+
source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 },
27+
properties: { paired: true },
28+
}
229

330
export interface Management {
4-
queue: (queueName: string) => QueueSpec
31+
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
32+
deleteQueue: (queueName: string) => Promise<boolean>
533
close: () => void
634
}
735

836
export class AmqpManagement implements Management {
9-
constructor() {}
37+
static async create(connection: RheaConnection): Promise<AmqpManagement> {
38+
const senderLink = await AmqpManagement.openSender(connection)
39+
const receiverLink = await AmqpManagement.openReceiver(connection)
40+
return new AmqpManagement(connection, senderLink, receiverLink)
41+
}
42+
43+
constructor(
44+
private readonly connection: RheaConnection,
45+
private senderLink: Sender,
46+
private receiverLink: Receiver
47+
) {
48+
console.log(this.receiverLink.is_open())
49+
}
50+
51+
private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
52+
return AmqpManagement.openLink<Receiver>(
53+
connection,
54+
ReceiverEvents.receiverOpen,
55+
ReceiverEvents.receiverError,
56+
connection.open_receiver.bind(connection),
57+
MANAGEMENT_NODE_CONFIGURATION
58+
)
59+
}
60+
61+
private static async openSender(connection: RheaConnection): Promise<Sender> {
62+
return AmqpManagement.openLink<Sender>(
63+
connection,
64+
SenderEvents.senderOpen,
65+
SenderEvents.senderError,
66+
connection.open_sender.bind(connection),
67+
MANAGEMENT_NODE_CONFIGURATION
68+
)
69+
}
70+
71+
private static async openLink<T extends Sender | Receiver>(
72+
connection: RheaConnection,
73+
successEvent: LinkOpenEvents,
74+
errorEvent: LinkErrorEvents,
75+
openMethod: OpenLinkMethods,
76+
config?: SenderOptions | ReceiverOptions | string
77+
): Promise<T> {
78+
return new Promise((res, rej) => {
79+
connection.once(successEvent, (context) => {
80+
return res(context.receiver || context.sender)
81+
})
82+
connection.once(errorEvent, (context) => {
83+
return rej(context.connection.error)
84+
})
85+
openMethod(config)
86+
})
87+
}
88+
89+
close(): void {
90+
if (this.connection.is_closed()) return
91+
92+
this.closeSender()
93+
this.closeReceiver()
94+
}
95+
96+
private closeSender(): void {
97+
this.senderLink.close()
98+
}
99+
100+
private closeReceiver(): void {
101+
this.senderLink.close()
102+
}
103+
104+
async declareQueue(queueName: string, options: Partial<QueueOptions> = {}): Promise<Queue> {
105+
return new Promise((res, rej) => {
106+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
107+
if (!context.message) {
108+
return rej(new Error("Receiver has not received any message"))
109+
}
110+
111+
const response = new CreateQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
112+
if (response.status === "error") {
113+
return rej(response.error)
114+
}
10115

11-
close() {}
116+
return res(new AmqpQueue(response.body))
117+
})
12118

13-
queue(queueName: string): QueueSpec {
14-
return new AmqpQueueSpec().name(queueName)
119+
const message = new MessageBuilder()
120+
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
121+
.setReplyTo(ME)
122+
.setAmqpMethod(AmqpMethods.PUT)
123+
.setBody({
124+
exclusive: options.exclusive ?? false,
125+
durable: options.durable ?? false,
126+
auto_delete: options.autoDelete ?? false,
127+
arguments: buildArgumentsFrom(options.type, options.arguments),
128+
})
129+
.build()
130+
this.senderLink.send(message)
131+
})
15132
}
133+
134+
async deleteQueue(queueName: string): Promise<boolean> {
135+
return new Promise((res, rej) => {
136+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
137+
if (!context.message) {
138+
return rej(new Error("Receiver has not received any message"))
139+
}
140+
141+
const response = new DeleteQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
142+
if (response.status === "error") {
143+
return rej(response.error)
144+
}
145+
146+
return res(true)
147+
})
148+
149+
const message = new MessageBuilder()
150+
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
151+
.setReplyTo(ME)
152+
.setAmqpMethod(AmqpMethods.DELETE)
153+
.build()
154+
this.senderLink.send(message)
155+
})
156+
}
157+
}
158+
159+
function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
160+
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
16161
}

src/message_builder.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { generate_uuid, Message } from "rhea"
2+
3+
export enum AmqpMethods {
4+
PUT = "PUT",
5+
DELETE = "DELETE",
6+
GET = "GET",
7+
}
8+
9+
export enum AmqpEndpoints {
10+
Queues = "queues",
11+
}
12+
13+
export const ME = "$me"
14+
15+
export class MessageBuilder {
16+
private messageId: string = generate_uuid()
17+
private to: string = ""
18+
private replyTo: string = ME
19+
private method: AmqpMethods = AmqpMethods.GET
20+
private body: unknown
21+
22+
constructor() {}
23+
24+
setMessageId(id: string) {
25+
this.messageId = id
26+
return this
27+
}
28+
29+
sendTo(to: string) {
30+
this.to = to
31+
return this
32+
}
33+
34+
setReplyTo(replyTo: string) {
35+
this.replyTo = replyTo
36+
return this
37+
}
38+
39+
setAmqpMethod(method: AmqpMethods) {
40+
this.method = method
41+
return this
42+
}
43+
44+
setBody(body: unknown) {
45+
this.body = body
46+
return this
47+
}
48+
49+
build(): Message {
50+
return {
51+
message_id: this.messageId,
52+
to: this.to,
53+
reply_to: this.replyTo,
54+
subject: this.method,
55+
body: this.body,
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)