diff --git a/pkg/rabbitmqamqp/amqp_binding.go b/pkg/rabbitmqamqp/amqp_binding.go index cbae973..e09ed87 100644 --- a/pkg/rabbitmqamqp/amqp_binding.go +++ b/pkg/rabbitmqamqp/amqp_binding.go @@ -10,11 +10,12 @@ type AMQPBindingInfo struct { } type AMQPBinding struct { - sourceName string - destinationName string - toQueue bool - bindingKey string - management *AmqpManagement + sourceName string + destinationName string + toQueue bool + bindingKey string + management *AmqpManagement + additionalArguments map[string]any } func newAMQPBinding(management *AmqpManagement) *AMQPBinding { @@ -37,6 +38,10 @@ func (b *AMQPBinding) Destination(name string, isQueue bool) { b.toQueue = isQueue } +func (b *AMQPBinding) AdditionalArguments(args map[string]any) { + b.additionalArguments = args +} + // Bind creates a binding between an exchange and a queue or exchange // with the specified binding key. // Returns the binding path that can be used to unbind the binding. @@ -56,7 +61,11 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) { kv["binding_key"] = b.bindingKey kv["source"] = b.sourceName kv[destination] = b.destinationName - kv["arguments"] = make(map[string]any) + if b.additionalArguments != nil { + kv["arguments"] = b.additionalArguments + } else { + kv["arguments"] = make(map[string]any) + } _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) return bindingPathWithExchangeQueueAndKey, err diff --git a/pkg/rabbitmqamqp/amqp_management.go b/pkg/rabbitmqamqp/amqp_management.go index f20b6a8..62aa3ab 100644 --- a/pkg/rabbitmqamqp/amqp_management.go +++ b/pkg/rabbitmqamqp/amqp_management.go @@ -217,6 +217,7 @@ func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBinding bind.SourceExchange(bindingSpecification.sourceExchange()) bind.Destination(bindingSpecification.destination(), bindingSpecification.isDestinationQueue()) bind.BindingKey(bindingSpecification.bindingKey()) + bind.AdditionalArguments(bindingSpecification.arguments()) return bind.Bind(ctx) } diff --git a/pkg/rabbitmqamqp/amqp_queue_test.go b/pkg/rabbitmqamqp/amqp_queue_test.go index 53ead23..b817b89 100644 --- a/pkg/rabbitmqamqp/amqp_queue_test.go +++ b/pkg/rabbitmqamqp/amqp_queue_test.go @@ -60,6 +60,9 @@ var _ = Describe("AMQP Queue test ", func() { MaxLengthBytes: CapacityGB(1), MaxPriority: 2, LeaderLocator: &BalancedLeaderLocator{}, + Arguments: map[string]any{ + "foo": "bar", + }, }) Expect(err).To(BeNil()) @@ -84,6 +87,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-expires", int64(1000))) Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-priority", int64(2))) Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-queue-leader-locator", "random")) + Expect(queueInfo.Arguments()).To(HaveKeyWithValue("foo", "bar")) // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) @@ -174,7 +178,7 @@ var _ = Describe("AMQP Queue test ", func() { It("AMQP Declare Queue should fail with Precondition fail", func() { // The first queue is declared as Classic, and it should succeed // The second queue is declared as Quorum, and it should fail since it is already declared as Classic - //queueName := generateName("AMQP Declare Queue should fail with Precondition fail") + // queueName := generateName("AMQP Declare Queue should fail with Precondition fail") queueName := "ab" _, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{ Name: queueName, diff --git a/pkg/rabbitmqamqp/entities.go b/pkg/rabbitmqamqp/entities.go index 31ad6b2..a3c06ab 100644 --- a/pkg/rabbitmqamqp/entities.go +++ b/pkg/rabbitmqamqp/entities.go @@ -92,6 +92,7 @@ type QuorumQueueSpecification struct { TargetClusterSize int64 LeaderLocator ILeaderLocator QuorumInitialGroupSize int + Arguments map[string]any } func (q *QuorumQueueSpecification) name() string { @@ -111,7 +112,11 @@ func (q *QuorumQueueSpecification) queueType() QueueType { } func (q *QuorumQueueSpecification) buildArguments() map[string]any { - result := map[string]any{} + result := q.Arguments + if result == nil { + result = map[string]any{} + } + if q.MaxLengthBytes != 0 { result["x-max-length-bytes"] = q.MaxLengthBytes } @@ -181,6 +186,7 @@ type ClassicQueueSpecification struct { MaxLengthBytes int64 MaxPriority int64 LeaderLocator ILeaderLocator + Arguments map[string]any } func (q *ClassicQueueSpecification) name() string { @@ -200,7 +206,10 @@ func (q *ClassicQueueSpecification) queueType() QueueType { } func (q *ClassicQueueSpecification) buildArguments() map[string]any { - result := map[string]any{} + result := q.Arguments + if result == nil { + result = map[string]any{} + } if q.MaxLengthBytes != 0 { result["x-max-length-bytes"] = q.MaxLengthBytes @@ -257,6 +266,7 @@ type AutoGeneratedQueueSpecification struct { IsExclusive bool MaxLength int64 MaxLengthBytes int64 + Arguments map[string]any } func (a *AutoGeneratedQueueSpecification) name() string { @@ -276,7 +286,10 @@ func (a *AutoGeneratedQueueSpecification) queueType() QueueType { } func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any { - result := map[string]any{} + result := a.Arguments + if result == nil { + result = map[string]any{} + } if a.MaxLengthBytes != 0 { result["x-max-length-bytes"] = a.MaxLengthBytes @@ -295,6 +308,7 @@ type StreamQueueSpecification struct { Name string MaxLengthBytes int64 InitialClusterSize int + Arguments map[string]any } func (s *StreamQueueSpecification) name() string { @@ -314,7 +328,10 @@ func (s *StreamQueueSpecification) queueType() QueueType { } func (s *StreamQueueSpecification) buildArguments() map[string]any { - result := map[string]any{} + result := s.Arguments + if result == nil { + result = map[string]any{} + } if s.MaxLengthBytes != 0 { result["x-max-length-bytes"] = s.MaxLengthBytes @@ -475,12 +492,14 @@ type IBindingSpecification interface { destination() string bindingKey() string isDestinationQueue() bool + arguments() map[string]any } type ExchangeToQueueBindingSpecification struct { SourceExchange string DestinationQueue string BindingKey string + Arguments map[string]any } func (e *ExchangeToQueueBindingSpecification) sourceExchange() string { @@ -499,10 +518,15 @@ func (e *ExchangeToQueueBindingSpecification) bindingKey() string { return e.BindingKey } +func (e *ExchangeToQueueBindingSpecification) arguments() map[string]any { + return e.Arguments +} + type ExchangeToExchangeBindingSpecification struct { SourceExchange string DestinationExchange string BindingKey string + Arguments map[string]any } func (e *ExchangeToExchangeBindingSpecification) sourceExchange() string { @@ -520,3 +544,7 @@ func (e *ExchangeToExchangeBindingSpecification) isDestinationQueue() bool { func (e *ExchangeToExchangeBindingSpecification) bindingKey() string { return e.BindingKey } + +func (e *ExchangeToExchangeBindingSpecification) arguments() map[string]any { + return e.Arguments +}