Skip to content

Commit a14ac83

Browse files
author
magne
committed
feat: add create and delete queue
1 parent 7266b43 commit a14ac83

File tree

7 files changed

+313
-39
lines changed

7 files changed

+313
-39
lines changed

src/management.ts

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { AmqpQueueInfo, QueueInfo } from "./queue.js"
1+
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
22
import {
3-
generate_uuid,
3+
EventContext,
44
Receiver,
55
ReceiverEvents,
66
ReceiverOptions,
@@ -9,11 +9,8 @@ import {
99
SenderEvents,
1010
SenderOptions,
1111
} from "rhea"
12-
13-
type QueueOptions = {
14-
exclusive: boolean
15-
auto_delete: boolean
16-
}
12+
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
13+
import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js"
1714

1815
type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
1916
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
@@ -31,27 +28,26 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3128
}
3229

3330
export interface Management {
34-
declareQueue: (queueName: string, options: Partial<QueueOptions>) => QueueInfo
31+
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
32+
deleteQueue: (queueName: string) => Promise<boolean>
3533
close: () => void
3634
}
3735

3836
export class AmqpManagement implements Management {
3937
static async create(connection: RheaConnection): Promise<AmqpManagement> {
4038
const senderLink = await AmqpManagement.openSender(connection)
4139
const receiverLink = await AmqpManagement.openReceiver(connection)
42-
return new AmqpManagement(senderLink, receiverLink)
40+
return new AmqpManagement(connection, senderLink, receiverLink)
4341
}
4442

4543
constructor(
46-
// private readonly connection: RheaConnection,
44+
private readonly connection: RheaConnection,
4745
private senderLink: Sender,
4846
private receiverLink: Receiver
4947
) {
5048
console.log(this.receiverLink.is_open())
5149
}
5250

53-
async close() {}
54-
5551
private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
5652
return AmqpManagement.openLink<Receiver>(
5753
connection,
@@ -90,18 +86,76 @@ export class AmqpManagement implements Management {
9086
})
9187
}
9288

93-
declareQueue(queueName: string, options: Partial<QueueOptions> = {}): QueueInfo {
94-
// decode the response
95-
// create queueInfo
89+
close(): void {
90+
if (this.connection.is_closed()) return
91+
92+
this.closeSender()
93+
this.closeReceiver()
94+
}
95+
96+
private closeSender(): void {
97+
this.senderLink.close()
98+
}
99+
100+
private closeReceiver(): void {
101+
this.senderLink.close()
102+
}
103+
104+
async declareQueue(queueName: string, options: Partial<QueueOptions> = {}): Promise<Queue> {
105+
return new Promise((res, rej) => {
106+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
107+
if (!context.message) {
108+
return rej(new Error("Receiver has not received any message"))
109+
}
110+
111+
const response = new CreateQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
112+
if (response.status === "error") {
113+
return rej(response.error)
114+
}
96115

97-
this.senderLink.send({
98-
message_id: generate_uuid(),
99-
to: `/queues/${encodeURIComponent(queueName)}`,
100-
reply_to: "$me",
101-
subject: "PUT",
102-
body: options,
116+
return res(new AmqpQueue(response.body))
117+
})
118+
119+
const message = new MessageBuilder()
120+
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
121+
.setReplyTo(ME)
122+
.setAmqpMethod(AmqpMethods.PUT)
123+
.setBody({
124+
exclusive: options.exclusive ?? false,
125+
durable: options.durable ?? false,
126+
auto_delete: options.autoDelete ?? false,
127+
arguments: buildArgumentsFrom(options.type, options.arguments),
128+
})
129+
.build()
130+
this.senderLink.send(message)
103131
})
132+
}
133+
134+
async deleteQueue(queueName: string): Promise<boolean> {
135+
return new Promise((res, rej) => {
136+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
137+
if (!context.message) {
138+
return rej(new Error("Receiver has not received any message"))
139+
}
140+
141+
const response = new DeleteQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
142+
if (response.status === "error") {
143+
return rej(response.error)
144+
}
104145

105-
return new AmqpQueueInfo({ name: queueName })
146+
return res(true)
147+
})
148+
149+
const message = new MessageBuilder()
150+
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
151+
.setReplyTo(ME)
152+
.setAmqpMethod(AmqpMethods.DELETE)
153+
.build()
154+
this.senderLink.send(message)
155+
})
106156
}
107157
}
158+
159+
function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
160+
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
161+
}

src/message_builder.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { generate_uuid, Message } from "rhea"
2+
3+
export enum AmqpMethods {
4+
PUT = "PUT",
5+
DELETE = "DELETE",
6+
GET = "GET",
7+
}
8+
9+
export enum AmqpEndpoints {
10+
Queues = "queues",
11+
}
12+
13+
export const ME = "$me"
14+
15+
export class MessageBuilder {
16+
private messageId: string = generate_uuid()
17+
private to: string = ""
18+
private replyTo: string = ME
19+
private method: AmqpMethods = AmqpMethods.GET
20+
private body: unknown
21+
22+
constructor() {}
23+
24+
setMessageId(id: string) {
25+
this.messageId = id
26+
return this
27+
}
28+
29+
sendTo(to: string) {
30+
this.to = to
31+
return this
32+
}
33+
34+
setReplyTo(replyTo: string) {
35+
this.replyTo = replyTo
36+
return this
37+
}
38+
39+
setAmqpMethod(method: AmqpMethods) {
40+
this.method = method
41+
return this
42+
}
43+
44+
setBody(body: unknown) {
45+
this.body = body
46+
return this
47+
}
48+
49+
build(): Message {
50+
return {
51+
message_id: this.messageId,
52+
to: this.to,
53+
reply_to: this.replyTo,
54+
subject: this.method,
55+
body: this.body,
56+
}
57+
}
58+
}

src/queue.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,39 @@
1-
export interface QueueInfo {
1+
export type QueueType = "classic" | "stream" | "quorum"
2+
3+
export type QueueOptions = {
4+
type: QueueType
5+
exclusive: boolean
6+
autoDelete: boolean
7+
durable: boolean
8+
arguments: Record<string, string>
9+
}
10+
11+
export type QueueInfo = {
12+
name: string
13+
durable: boolean
14+
autoDelete: boolean
15+
exclusive: boolean
16+
type: QueueType
17+
leader: string
18+
replicas: string[]
19+
messageCount: number
20+
consumerCount: number
21+
arguments: Record<string, string>
22+
}
23+
24+
export type DeletedQueueInfo = {
225
name: string
26+
deleted: boolean
327
}
428

5-
export class AmqpQueueInfo implements QueueInfo {
6-
private queueName: string
29+
export interface Queue {
30+
getInfo: QueueInfo
31+
}
732

8-
constructor(params: { name: string }) {
9-
this.queueName = params.name
10-
}
33+
export class AmqpQueue implements Queue {
34+
constructor(private readonly info: QueueInfo) {}
1135

12-
public get name(): string {
13-
return this.queueName
36+
public get getInfo(): QueueInfo {
37+
return this.info
1438
}
1539
}

src/response_decoder.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { Message } from "rhea"
2+
import { AUTO_DELETE, DURABLE, EXCLUSIVE, isError, queueTypeFromString, Result } from "./utils.js"
3+
import { DeletedQueueInfo, QueueInfo } from "./queue.js"
4+
5+
interface ResponseDecoder {
6+
decodeFrom: (receivedMessage: Message, sentMessageId: string) => unknown
7+
}
8+
9+
export class CreateQueueResponseDecoder implements ResponseDecoder {
10+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<QueueInfo, Error> {
11+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
12+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
13+
}
14+
15+
return {
16+
status: "ok",
17+
body: {
18+
name: receivedMessage.body.name,
19+
durable: receivedMessage.body.durable === DURABLE,
20+
autoDelete: receivedMessage.body.auto_delete === AUTO_DELETE,
21+
exclusive: receivedMessage.body.exclusive === EXCLUSIVE,
22+
type: queueTypeFromString(receivedMessage.body.type),
23+
arguments: receivedMessage.body.arguments ?? {},
24+
leader: receivedMessage.body.leader,
25+
replicas: receivedMessage.body.replicas,
26+
messageCount: receivedMessage.body.message_count,
27+
consumerCount: receivedMessage.body.consumer_count,
28+
},
29+
}
30+
}
31+
}
32+
33+
export class DeleteQueueResponseDecoder implements ResponseDecoder {
34+
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<DeletedQueueInfo, Error> {
35+
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
36+
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
37+
}
38+
39+
return {
40+
status: "ok",
41+
body: {
42+
name: receivedMessage.body.name,
43+
deleted: true,
44+
},
45+
}
46+
}
47+
}

src/utils.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { Message } from "rhea"
2+
import { QueueType } from "./queue.js"
3+
4+
export enum AmqpResponseCodes {
5+
OK = "200",
6+
CREATED = "201",
7+
NO_CONTENT = "204",
8+
BAD_REQUEST = "400",
9+
NOT_FOUND = "404",
10+
CONFLICT = "409",
11+
}
12+
13+
export const DURABLE = 1
14+
export const AUTO_DELETE = 1
15+
export const EXCLUSIVE = 1
16+
17+
export type Result<T, K> = OkResult<T> | ErrorResult<K>
18+
19+
type OkResult<T> = {
20+
status: "ok"
21+
body: T
22+
}
23+
24+
type ErrorResult<K> = {
25+
status: "error"
26+
error: K
27+
}
28+
29+
export function isError(message: Message): boolean {
30+
return (
31+
message.subject === AmqpResponseCodes.BAD_REQUEST ||
32+
message.subject === AmqpResponseCodes.NOT_FOUND ||
33+
message.subject === AmqpResponseCodes.CONFLICT
34+
)
35+
}
36+
37+
export function queueTypeFromString(queueType: string): QueueType {
38+
switch (queueType) {
39+
case "classic":
40+
return "classic"
41+
case "quorum":
42+
return "quorum"
43+
case "stream":
44+
return "stream"
45+
default:
46+
throw new Error(`Unsupported queue type: ${queueType}`)
47+
}
48+
}

test/e2e/management.test.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Management } from "../../src/index.js"
22
import { afterEach, beforeEach, describe, expect, test } from "vitest"
3-
import { eventually, existsQueue, host, password, port, username } from "../support/util.js"
3+
import { createQueue, eventually, existsQueue, getQueueInfo, host, password, port, username } from "../support/util.js"
44
import { createEnvironment, Environment } from "../../src/environment.js"
55
import { Connection } from "../../src/connection.js"
66

@@ -21,17 +21,39 @@ describe("Management", () => {
2121
})
2222

2323
afterEach(async () => {
24-
await management.close()
25-
await connection.close()
26-
await environment.close()
24+
try {
25+
await management.close()
26+
await connection.close()
27+
await environment.close()
28+
} catch (error) {
29+
console.error(error)
30+
}
2731
})
2832

2933
test("create a queue through the management", async () => {
30-
const queueInfo = management.declareQueue("test-queue", { exclusive: true, auto_delete: false })
34+
const queue = await management.declareQueue("test-queue")
3135

32-
expect(queueInfo.name).to.eql("test-queue")
3336
await eventually(async () => {
34-
expect(await existsQueue(queueInfo.name)).to.eql(true)
37+
const queueInfo = await getQueueInfo(queue.getInfo.name)
38+
expect(queueInfo.ok).to.eql(true)
39+
expect(queue.getInfo.arguments).to.eql(queueInfo.body.arguments)
40+
expect(queue.getInfo.autoDelete).to.eql(queueInfo.body.auto_delete)
41+
expect(queue.getInfo.durable).to.eql(queueInfo.body.durable)
42+
expect(queue.getInfo.exclusive).to.eql(queueInfo.body.exclusive)
43+
expect(queue.getInfo.consumerCount).to.eql(queueInfo.body.consumers)
44+
expect(queue.getInfo.messageCount).to.eql(queueInfo.body.messages)
45+
expect(queue.getInfo.type).to.eql(queueInfo.body.type)
46+
expect(queue.getInfo.leader).to.eql(queueInfo.body.node)
47+
})
48+
})
49+
50+
test("delete a queue through the management", async () => {
51+
await createQueue("test-queue")
52+
53+
await management.deleteQueue("test-queue")
54+
55+
await eventually(async () => {
56+
expect(await existsQueue("test-queue")).to.eql(false)
3557
})
3658
})
3759
})

0 commit comments

Comments
 (0)