Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/management.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AmqpExchange, Exchange, ExchangeInfo, ExchangeOptions } from "./exchange.js"
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
import { AmqpQueue, Queue, QueueType, QueueOptions, QuorumQueueOptions, ClassicQueueOptions } from "./queue.js"
import {
EventContext,
Receiver,
Expand Down Expand Up @@ -111,12 +111,7 @@ export class AmqpManagement implements Management {
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.PUT)
.setBody({
exclusive: options.exclusive ?? false,
durable: options.durable ?? false,
auto_delete: options.autoDelete ?? false,
arguments: buildArgumentsFrom(options.type, options.arguments),
})
.setBody(buildDeclareQueueBody(options))
.build()
this.senderLink.send(message)
})
Expand Down Expand Up @@ -295,7 +290,46 @@ export class AmqpManagement implements Management {
}
}

function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
function buildDeclareQueueBody(options: Partial<QueueOptions>) {
const body = {
exclusive: options.exclusive ?? false,
durable: options.durable ?? true, // needed at least by quorum queue type
auto_delete: options.autoDelete ?? false,
arguments: buildArgumentsFrom(options.type, options.arguments),
}
switch (options.type) {
case "quorum":
body.arguments = addQuorumArgumentsFrom(body.arguments, options)
return body
case "classic":
body.arguments = addClassicArgumentsFrom(body.arguments, options)
return body
case "stream":
default:
return body
}
}

function addQuorumArgumentsFrom(args: Record<string, unknown>, options: Partial<QuorumQueueOptions>) {
return {
...args,
...(options.deadLetterStrategy ? { "x-dead-letter-strategy": options.deadLetterStrategy } : {}),
...(options.deliveryLimit ? { "x-max-delivery-limit": options.deliveryLimit } : {}),
...(options.initialGroupSize ? { "x-quorum-initial-group-size": options.initialGroupSize } : {}),
...(options.targetGroupSize ? { "x-quorum-target-group-size": options.targetGroupSize } : {}),
}
}

function addClassicArgumentsFrom(args: Record<string, unknown>, options: Partial<ClassicQueueOptions>) {
return {
...args,
...(options.maxPriority ? { "x-max-priority": options.maxPriority } : {}),
...(options.mode ? { "x-queue-mode": options.mode } : {}),
...(options.version ? { "x-queue-version": options.version } : {}),
}
}

function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>): Record<string, unknown> {
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
}

Expand Down
29 changes: 26 additions & 3 deletions src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,36 @@
export type QueueType = "classic" | "stream" | "quorum"

export type QueueOptions = {
type: QueueType
export type QuorumDeadLetterStrategy = "at-most-once" | "at-least-once"

export type ClassicQueueMode = "default" | "lazy"

export type ClassicQueueVersion = 1 | 2

type BaseQueueOptions = {
exclusive: boolean
autoDelete: boolean
durable: boolean
arguments: Record<string, string>
}

export type ClassicQueueOptions = BaseQueueOptions & {
type: Exclude<QueueType, "quorum">
durable: boolean
maxPriority: number
mode: ClassicQueueMode
version: ClassicQueueVersion
}

export type QuorumQueueOptions = BaseQueueOptions & {
type: "quorum"
durable: true
deadLetterStrategy: QuorumDeadLetterStrategy
deliveryLimit: number
initialGroupSize: number
targetGroupSize: number
}

export type QueueOptions = ClassicQueueOptions | QuorumQueueOptions

export type QueueInfo = {
name: string
durable: boolean
Expand Down
5 changes: 4 additions & 1 deletion src/response_decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ interface ResponseDecoder {
export class CreateQueueResponseDecoder implements ResponseDecoder {
decodeFrom(receivedMessage: Message, sentMessageId: string): Result<QueueInfo, Error> {
if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) {
return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) }
return {
status: "error",
error: new Error(`Message Error: ${receivedMessage.subject}; ${receivedMessage.body}`),
}
}

return {
Expand Down
Loading