Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/rabbitmqamqp/amqp_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ type AMQPBindingInfo struct {
}

type AMQPBinding struct {
sourceName string
destinationName string
toQueue bool
bindingKey string
management *AmqpManagement
sourceName string
destinationName string
toQueue bool
bindingKey string
management *AmqpManagement
additionalArguments map[string]any
}

func newAMQPBinding(management *AmqpManagement) *AMQPBinding {
Expand All @@ -37,6 +38,10 @@ func (b *AMQPBinding) Destination(name string, isQueue bool) {
b.toQueue = isQueue
}

func (b *AMQPBinding) AdditionalArguments(args map[string]any) {
b.additionalArguments = args
}

// Bind creates a binding between an exchange and a queue or exchange
// with the specified binding key.
// Returns the binding path that can be used to unbind the binding.
Expand All @@ -56,7 +61,11 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceName
kv[destination] = b.destinationName
kv["arguments"] = make(map[string]any)
if b.additionalArguments != nil {
kv["arguments"] = b.additionalArguments
} else {
kv["arguments"] = make(map[string]any)
}
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
return bindingPathWithExchangeQueueAndKey, err
Expand Down
1 change: 1 addition & 0 deletions pkg/rabbitmqamqp/amqp_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification IBinding
bind.SourceExchange(bindingSpecification.sourceExchange())
bind.Destination(bindingSpecification.destination(), bindingSpecification.isDestinationQueue())
bind.BindingKey(bindingSpecification.bindingKey())
bind.AdditionalArguments(bindingSpecification.arguments())
return bind.Bind(ctx)

}
Expand Down
6 changes: 5 additions & 1 deletion pkg/rabbitmqamqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ var _ = Describe("AMQP Queue test ", func() {
MaxLengthBytes: CapacityGB(1),
MaxPriority: 2,
LeaderLocator: &BalancedLeaderLocator{},
Arguments: map[string]any{
"foo": "bar",
},
})

Expect(err).To(BeNil())
Expand All @@ -84,6 +87,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-expires", int64(1000)))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-priority", int64(2)))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-queue-leader-locator", "random"))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("foo", "bar"))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expand Down Expand Up @@ -174,7 +178,7 @@ var _ = Describe("AMQP Queue test ", func() {
It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic, and it should succeed
// The second queue is declared as Quorum, and it should fail since it is already declared as Classic
//queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
// queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
queueName := "ab"
_, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
Name: queueName,
Expand Down
36 changes: 32 additions & 4 deletions pkg/rabbitmqamqp/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type QuorumQueueSpecification struct {
TargetClusterSize int64
LeaderLocator ILeaderLocator
QuorumInitialGroupSize int
Arguments map[string]any
}

func (q *QuorumQueueSpecification) name() string {
Expand All @@ -111,7 +112,11 @@ func (q *QuorumQueueSpecification) queueType() QueueType {
}

func (q *QuorumQueueSpecification) buildArguments() map[string]any {
result := map[string]any{}
result := q.Arguments
if result == nil {
result = map[string]any{}
}

if q.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = q.MaxLengthBytes
}
Expand Down Expand Up @@ -181,6 +186,7 @@ type ClassicQueueSpecification struct {
MaxLengthBytes int64
MaxPriority int64
LeaderLocator ILeaderLocator
Arguments map[string]any
}

func (q *ClassicQueueSpecification) name() string {
Expand All @@ -200,7 +206,10 @@ func (q *ClassicQueueSpecification) queueType() QueueType {
}

func (q *ClassicQueueSpecification) buildArguments() map[string]any {
result := map[string]any{}
result := q.Arguments
if result == nil {
result = map[string]any{}
}

if q.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = q.MaxLengthBytes
Expand Down Expand Up @@ -257,6 +266,7 @@ type AutoGeneratedQueueSpecification struct {
IsExclusive bool
MaxLength int64
MaxLengthBytes int64
Arguments map[string]any
}

func (a *AutoGeneratedQueueSpecification) name() string {
Expand All @@ -276,7 +286,10 @@ func (a *AutoGeneratedQueueSpecification) queueType() QueueType {
}

func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any {
result := map[string]any{}
result := a.Arguments
if result == nil {
result = map[string]any{}
}

if a.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = a.MaxLengthBytes
Expand All @@ -295,6 +308,7 @@ type StreamQueueSpecification struct {
Name string
MaxLengthBytes int64
InitialClusterSize int
Arguments map[string]any
}

func (s *StreamQueueSpecification) name() string {
Expand All @@ -314,7 +328,10 @@ func (s *StreamQueueSpecification) queueType() QueueType {
}

func (s *StreamQueueSpecification) buildArguments() map[string]any {
result := map[string]any{}
result := s.Arguments
if result == nil {
result = map[string]any{}
}

if s.MaxLengthBytes != 0 {
result["x-max-length-bytes"] = s.MaxLengthBytes
Expand Down Expand Up @@ -475,12 +492,14 @@ type IBindingSpecification interface {
destination() string
bindingKey() string
isDestinationQueue() bool
arguments() map[string]any
}

type ExchangeToQueueBindingSpecification struct {
SourceExchange string
DestinationQueue string
BindingKey string
Arguments map[string]any
}

func (e *ExchangeToQueueBindingSpecification) sourceExchange() string {
Expand All @@ -499,10 +518,15 @@ func (e *ExchangeToQueueBindingSpecification) bindingKey() string {
return e.BindingKey
}

func (e *ExchangeToQueueBindingSpecification) arguments() map[string]any {
return e.Arguments
}

type ExchangeToExchangeBindingSpecification struct {
SourceExchange string
DestinationExchange string
BindingKey string
Arguments map[string]any
}

func (e *ExchangeToExchangeBindingSpecification) sourceExchange() string {
Expand All @@ -520,3 +544,7 @@ func (e *ExchangeToExchangeBindingSpecification) isDestinationQueue() bool {
func (e *ExchangeToExchangeBindingSpecification) bindingKey() string {
return e.BindingKey
}

func (e *ExchangeToExchangeBindingSpecification) arguments() map[string]any {
return e.Arguments
}
Loading