Skip to content

Commit df3fb05

Browse files
committed
Add support for exclusive queues (amqp091 only for now)
1 parent 5b2fc27 commit df3fb05

File tree

5 files changed

+47
-1
lines changed

5 files changed

+47
-1
lines changed

cmd/root.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,11 @@ func start(cfg config.Config) {
493493
log.Debug("will stop all consumers and publishers at " + time.Now().Add(cfg.Duration).String())
494494
}
495495

496+
if cfg.Queues == config.Exclusive && cfg.ConsumerProto != config.AMQP091 {
497+
log.Error("exclusive queues are only supported for AMQP 0.9.1 consumers")
498+
os.Exit(1)
499+
}
500+
496501
// every second, print the current values of the metrics
497502
wg.Wait()
498503
}

main_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,28 @@ var _ = Describe("OMQ CLI", func() {
649649
Expect(outputStr).To(ContainSubstring("enabled:true"))
650650
})
651651
})
652+
653+
Describe("supports exclusive queues", func() {
654+
It("declares exclusive queues for AMQP 0.9.1 consumers", func() {
655+
args := []string{
656+
"amqp091",
657+
"-y", "1",
658+
"-C", "0", // run indefinitely
659+
"-T", "/queues/exclusive-amqp091",
660+
"--queues", "exclusive",
661+
}
662+
663+
rmqc, err := rabbithole.NewClient("http://127.0.0.1:15672", "guest", "guest")
664+
Expect(err).ShouldNot(HaveOccurred())
665+
session := omq(args)
666+
defer session.Kill()
667+
668+
Eventually(func() bool {
669+
q, err := rmqc.GetQueue("/", "exclusive-amqp091")
670+
return err == nil && q.Name == "exclusive-amqp091" && q.OwnerPidDetails.Name != ""
671+
}).WithTimeout(5 * time.Second).Should(BeTrue())
672+
})
673+
})
652674
})
653675

654676
func omq(args []string) *gexec.Session {

pkg/amqp091/consumer.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ func (c *Amqp091Consumer) Connect() {
101101
func (c *Amqp091Consumer) Subscribe() {
102102
if c.Connection != nil {
103103
_ = c.Channel.Qos(c.Config.ConsumerCredits, 0, false)
104+
105+
if c.Config.Queues == config.Exclusive {
106+
queueName := strings.TrimPrefix(c.Terminus, "/queues/")
107+
_, err := c.Channel.QueueDeclare(
108+
queueName, // name
109+
false, // durable
110+
false, // autoDelete
111+
true, // exclusive
112+
false, // noWait
113+
nil, // args
114+
)
115+
if err != nil {
116+
log.Error("failed to declare exclusive queue", "id", c.Id, "queue", queueName, "error", err.Error())
117+
return
118+
}
119+
}
120+
104121
// TODO add auto-ack and exclusive options
105122
consumeArgs := amqp091.Table{}
106123
if c.Config.StreamOffset != "" {

pkg/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
Classic
3434
Quorum
3535
Stream
36+
Exclusive
3637
)
3738

3839
var AmqpDurabilityModes = map[AmqpDurabilityMode][]string{
@@ -46,6 +47,7 @@ var QueueTypes = map[QueueType][]string{
4647
Classic: {"classic"},
4748
Quorum: {"quorum"},
4849
Stream: {"stream"},
50+
Exclusive: {"exclusive"},
4951
}
5052

5153
type AmqpOptions struct {

pkg/mgmt/mgmt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (m *Mgmt) DeclareQueues(cfg config.Config) {
113113
}
114114

115115
func (m *Mgmt) DeclareAndBind(cfg config.Config, queueName string, id int) *rmq.AmqpQueueInfo {
116-
if cfg.Queues == config.Predeclared || m.declaredQueues[queueName] {
116+
if cfg.Queues == config.Predeclared || cfg.Queues == config.Exclusive || m.declaredQueues[queueName] {
117117
return nil
118118
}
119119

0 commit comments

Comments
 (0)