Skip to content

Commit 835e204

Browse files
l4mbymagne
andauthored
feat: add get queue info method (#43)
Co-authored-by: magne <[email protected]>
1 parent d2d7ed1 commit 835e204

File tree

3 files changed

+46
-0
lines changed

3 files changed

+46
-0
lines changed

src/management.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
DeleteBindingResponseDecoder,
1919
DeleteExchangeResponseDecoder,
2020
DeleteQueueResponseDecoder,
21+
GetQueueInfoResponseDecoder,
2122
} from "./response_decoder.js"
2223
import { AmqpBinding, Binding, BindingInfo, BindingOptions } from "./binding.js"
2324
import { randomUUID } from "crypto"
@@ -35,6 +36,7 @@ const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
3536
export interface Management {
3637
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
3738
deleteQueue: (queueName: string) => Promise<boolean>
39+
getQueueInfo: (queueName: string) => Promise<Queue>
3840
declareExchange: (exchangeName: string, options?: Partial<ExchangeOptions>) => Promise<Exchange>
3941
deleteExchange: (exchangeName: string) => Promise<boolean>
4042
bind: (key: string, options: BindingOptions) => Promise<Binding>
@@ -144,6 +146,30 @@ export class AmqpManagement implements Management {
144146
})
145147
}
146148

149+
async getQueueInfo(queueName: string): Promise<Queue> {
150+
return new Promise((res, rej) => {
151+
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
152+
if (!context.message) {
153+
return rej(new Error("Receiver has not received any message"))
154+
}
155+
156+
const response = new GetQueueInfoResponseDecoder().decodeFrom(context.message, String(message.message_id))
157+
if (response.status === "error") {
158+
return rej(response.error)
159+
}
160+
161+
return res(new AmqpQueue(response.body))
162+
})
163+
164+
const message = new LinkMessageBuilder()
165+
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
166+
.setReplyTo(ME)
167+
.setAmqpMethod(AmqpMethods.GET)
168+
.build()
169+
this.senderLink.send(message)
170+
})
171+
}
172+
147173
async declareExchange(exchangeName: string, options: Partial<ExchangeOptions> = {}): Promise<Exchange> {
148174
const exchangeInfo: ExchangeInfo = {
149175
type: options.type ?? "direct",

src/response_decoder.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ export class CreateQueueResponseDecoder implements ResponseDecoder {
3030
}
3131
}
3232

33+
export class GetQueueInfoResponseDecoder extends CreateQueueResponseDecoder {}
34+
3335
export class DeleteQueueResponseDecoder implements ResponseDecoder {
3436
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<DeletedQueueInfo, Error> {
3537
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {

test/e2e/management.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ describe("Management", () => {
7878
})
7979
})
8080

81+
test("get info of a queue through the management", async () => {
82+
await createQueue(queueName)
83+
84+
const result = await management.getQueueInfo(queueName)
85+
86+
await eventually(async () => {
87+
const queueInfo = await getQueueInfo(queueName)
88+
expect(result.getInfo.arguments).to.eql(queueInfo.body.arguments)
89+
expect(result.getInfo.autoDelete).to.eql(queueInfo.body.auto_delete)
90+
expect(result.getInfo.durable).to.eql(queueInfo.body.durable)
91+
expect(result.getInfo.exclusive).to.eql(queueInfo.body.exclusive)
92+
expect(result.getInfo.consumerCount).to.eql(queueInfo.body.consumers)
93+
expect(result.getInfo.messageCount).to.eql(queueInfo.body.messages)
94+
expect(result.getInfo.type).to.eql(queueInfo.body.type)
95+
expect(result.getInfo.leader).to.eql(queueInfo.body.node)
96+
})
97+
})
98+
8199
test("create an exchange through the management", async () => {
82100
const exchange = await management.declareExchange(exchangeName, {
83101
type: "headers",

0 commit comments

Comments
 (0)