Skip to content

Commit bf5e04a

Browse files
authored
Add support for additional arguments in AMQP bindings and queue specifications (#53)
1 parent 574f232 commit bf5e04a

File tree

4 files changed

+53
-11
lines changed

4 files changed

+53
-11
lines changed

pkg/rabbitmqamqp/amqp_binding.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ type AMQPBindingInfo struct {
1010
}
1111

1212
type AMQPBinding struct {
13-
sourceName string
14-
destinationName string
15-
toQueue bool
16-
bindingKey string
17-
management *AmqpManagement
13+
sourceName string
14+
destinationName string
15+
toQueue bool
16+
bindingKey string
17+
management *AmqpManagement
18+
additionalArguments map[string]any
1819
}
1920

2021
func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
@@ -37,6 +38,10 @@ func (b *AMQPBinding) Destination(name string, isQueue bool) {
3738
b.toQueue = isQueue
3839
}
3940

41+
func (b *AMQPBinding) AdditionalArguments(args map[string]any) {
42+
b.additionalArguments = args
43+
}
44+
4045
// Bind creates a binding between an exchange and a queue or exchange
4146
// with the specified binding key.
4247
// Returns the binding path that can be used to unbind the binding.
@@ -56,7 +61,11 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
5661
kv["binding_key"] = b.bindingKey
5762
kv["source"] = b.sourceName
5863
kv[destination] = b.destinationName
59-
kv["arguments"] = make(map[string]any)
64+
if b.additionalArguments != nil {
65+
kv["arguments"] = b.additionalArguments
66+
} else {
67+
kv["arguments"] = make(map[string]any)
68+
}
6069
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
6170
bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
6271
return bindingPathWithExchangeQueueAndKey, err

pkg/rabbitmqamqp/amqp_management.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBinding
217217
bind.SourceExchange(bindingSpecification.sourceExchange())
218218
bind.Destination(bindingSpecification.destination(), bindingSpecification.isDestinationQueue())
219219
bind.BindingKey(bindingSpecification.bindingKey())
220+
bind.AdditionalArguments(bindingSpecification.arguments())
220221
return bind.Bind(ctx)
221222

222223
}

pkg/rabbitmqamqp/amqp_queue_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ var _ = Describe("AMQP Queue test ", func() {
6060
MaxLengthBytes: CapacityGB(1),
6161
MaxPriority: 2,
6262
LeaderLocator: &BalancedLeaderLocator{},
63+
Arguments: map[string]any{
64+
"foo": "bar",
65+
},
6366
})
6467

6568
Expect(err).To(BeNil())
@@ -84,6 +87,7 @@ var _ = Describe("AMQP Queue test ", func() {
8487
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-expires", int64(1000)))
8588
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-priority", int64(2)))
8689
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-queue-leader-locator", "random"))
90+
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("foo", "bar"))
8791

8892
// validate GET (query queue info)
8993
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
@@ -174,7 +178,7 @@ var _ = Describe("AMQP Queue test ", func() {
174178
It("AMQP Declare Queue should fail with Precondition fail", func() {
175179
// The first queue is declared as Classic, and it should succeed
176180
// The second queue is declared as Quorum, and it should fail since it is already declared as Classic
177-
//queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
181+
// queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
178182
queueName := "ab"
179183
_, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
180184
Name: queueName,

pkg/rabbitmqamqp/entities.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type QuorumQueueSpecification struct {
9292
TargetClusterSize int64
9393
LeaderLocator ILeaderLocator
9494
QuorumInitialGroupSize int
95+
Arguments map[string]any
9596
}
9697

9798
func (q *QuorumQueueSpecification) name() string {
@@ -111,7 +112,11 @@ func (q *QuorumQueueSpecification) queueType() QueueType {
111112
}
112113

113114
func (q *QuorumQueueSpecification) buildArguments() map[string]any {
114-
result := map[string]any{}
115+
result := q.Arguments
116+
if result == nil {
117+
result = map[string]any{}
118+
}
119+
115120
if q.MaxLengthBytes != 0 {
116121
result["x-max-length-bytes"] = q.MaxLengthBytes
117122
}
@@ -181,6 +186,7 @@ type ClassicQueueSpecification struct {
181186
MaxLengthBytes int64
182187
MaxPriority int64
183188
LeaderLocator ILeaderLocator
189+
Arguments map[string]any
184190
}
185191

186192
func (q *ClassicQueueSpecification) name() string {
@@ -200,7 +206,10 @@ func (q *ClassicQueueSpecification) queueType() QueueType {
200206
}
201207

202208
func (q *ClassicQueueSpecification) buildArguments() map[string]any {
203-
result := map[string]any{}
209+
result := q.Arguments
210+
if result == nil {
211+
result = map[string]any{}
212+
}
204213

205214
if q.MaxLengthBytes != 0 {
206215
result["x-max-length-bytes"] = q.MaxLengthBytes
@@ -257,6 +266,7 @@ type AutoGeneratedQueueSpecification struct {
257266
IsExclusive bool
258267
MaxLength int64
259268
MaxLengthBytes int64
269+
Arguments map[string]any
260270
}
261271

262272
func (a *AutoGeneratedQueueSpecification) name() string {
@@ -276,7 +286,10 @@ func (a *AutoGeneratedQueueSpecification) queueType() QueueType {
276286
}
277287

278288
func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any {
279-
result := map[string]any{}
289+
result := a.Arguments
290+
if result == nil {
291+
result = map[string]any{}
292+
}
280293

281294
if a.MaxLengthBytes != 0 {
282295
result["x-max-length-bytes"] = a.MaxLengthBytes
@@ -295,6 +308,7 @@ type StreamQueueSpecification struct {
295308
Name string
296309
MaxLengthBytes int64
297310
InitialClusterSize int
311+
Arguments map[string]any
298312
}
299313

300314
func (s *StreamQueueSpecification) name() string {
@@ -314,7 +328,10 @@ func (s *StreamQueueSpecification) queueType() QueueType {
314328
}
315329

316330
func (s *StreamQueueSpecification) buildArguments() map[string]any {
317-
result := map[string]any{}
331+
result := s.Arguments
332+
if result == nil {
333+
result = map[string]any{}
334+
}
318335

319336
if s.MaxLengthBytes != 0 {
320337
result["x-max-length-bytes"] = s.MaxLengthBytes
@@ -475,12 +492,14 @@ type IBindingSpecification interface {
475492
destination() string
476493
bindingKey() string
477494
isDestinationQueue() bool
495+
arguments() map[string]any
478496
}
479497

480498
type ExchangeToQueueBindingSpecification struct {
481499
SourceExchange string
482500
DestinationQueue string
483501
BindingKey string
502+
Arguments map[string]any
484503
}
485504

486505
func (e *ExchangeToQueueBindingSpecification) sourceExchange() string {
@@ -499,10 +518,15 @@ func (e *ExchangeToQueueBindingSpecification) bindingKey() string {
499518
return e.BindingKey
500519
}
501520

521+
func (e *ExchangeToQueueBindingSpecification) arguments() map[string]any {
522+
return e.Arguments
523+
}
524+
502525
type ExchangeToExchangeBindingSpecification struct {
503526
SourceExchange string
504527
DestinationExchange string
505528
BindingKey string
529+
Arguments map[string]any
506530
}
507531

508532
func (e *ExchangeToExchangeBindingSpecification) sourceExchange() string {
@@ -520,3 +544,7 @@ func (e *ExchangeToExchangeBindingSpecification) isDestinationQueue() bool {
520544
func (e *ExchangeToExchangeBindingSpecification) bindingKey() string {
521545
return e.BindingKey
522546
}
547+
548+
func (e *ExchangeToExchangeBindingSpecification) arguments() map[string]any {
549+
return e.Arguments
550+
}

0 commit comments

Comments
 (0)