|
1 | 1 | import { AmqpExchange, Exchange, ExchangeInfo, ExchangeOptions } from "./exchange.js" |
2 | | -import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js" |
| 2 | +import { AmqpQueue, Queue, QueueType, QueueOptions, QuorumQueueOptions, ClassicQueueOptions } from "./queue.js" |
3 | 3 | import { |
4 | 4 | EventContext, |
5 | 5 | Receiver, |
@@ -111,12 +111,7 @@ export class AmqpManagement implements Management { |
111 | 111 | .sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`) |
112 | 112 | .setReplyTo(ME) |
113 | 113 | .setAmqpMethod(AmqpMethods.PUT) |
114 | | - .setBody({ |
115 | | - exclusive: options.exclusive ?? false, |
116 | | - durable: options.durable ?? false, |
117 | | - auto_delete: options.autoDelete ?? false, |
118 | | - arguments: buildArgumentsFrom(options.type, options.arguments), |
119 | | - }) |
| 114 | + .setBody(buildDeclareQueueBody(options)) |
120 | 115 | .build() |
121 | 116 | this.senderLink.send(message) |
122 | 117 | }) |
@@ -295,7 +290,46 @@ export class AmqpManagement implements Management { |
295 | 290 | } |
296 | 291 | } |
297 | 292 |
|
298 | | -function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) { |
| 293 | +function buildDeclareQueueBody(options: Partial<QueueOptions>) { |
| 294 | + const body = { |
| 295 | + exclusive: options.exclusive ?? false, |
| 296 | + durable: options.durable ?? true, // needed at least by quorum queue type |
| 297 | + auto_delete: options.autoDelete ?? false, |
| 298 | + arguments: buildArgumentsFrom(options.type, options.arguments), |
| 299 | + } |
| 300 | + switch (options.type) { |
| 301 | + case "quorum": |
| 302 | + body.arguments = addQuorumArgumentsFrom(body.arguments, options) |
| 303 | + return body |
| 304 | + case "classic": |
| 305 | + body.arguments = addClassicArgumentsFrom(body.arguments, options) |
| 306 | + return body |
| 307 | + case "stream": |
| 308 | + default: |
| 309 | + return body |
| 310 | + } |
| 311 | +} |
| 312 | + |
| 313 | +function addQuorumArgumentsFrom(args: Record<string, unknown>, options: Partial<QuorumQueueOptions>) { |
| 314 | + return { |
| 315 | + ...args, |
| 316 | + ...(options.deadLetterStrategy ? { "x-dead-letter-strategy": options.deadLetterStrategy } : {}), |
| 317 | + ...(options.deliveryLimit ? { "x-max-delivery-limit": options.deliveryLimit } : {}), |
| 318 | + ...(options.initialGroupSize ? { "x-quorum-initial-group-size": options.initialGroupSize } : {}), |
| 319 | + ...(options.targetGroupSize ? { "x-quorum-target-group-size": options.targetGroupSize } : {}), |
| 320 | + } |
| 321 | +} |
| 322 | + |
| 323 | +function addClassicArgumentsFrom(args: Record<string, unknown>, options: Partial<ClassicQueueOptions>) { |
| 324 | + return { |
| 325 | + ...args, |
| 326 | + ...(options.maxPriority ? { "x-max-priority": options.maxPriority } : {}), |
| 327 | + ...(options.mode ? { "x-queue-mode": options.mode } : {}), |
| 328 | + ...(options.version ? { "x-queue-version": options.version } : {}), |
| 329 | + } |
| 330 | +} |
| 331 | + |
| 332 | +function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>): Record<string, unknown> { |
299 | 333 | return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) } |
300 | 334 | } |
301 | 335 |
|
|
0 commit comments