diff --git a/README.md b/README.md
index 9cea4e6..eb44fe8 100644
--- a/README.md
+++ b/README.md
@@ -10,4 +10,10 @@ This library is in early stages of development. It is meant to be used with Rabb
## Getting Started
-You can find an example in: `examples/getting_started`
+You can find an example in: `docs/examples/getting_started`
+
+## Examples
+
+You can find more examples in: `docs/examples`
+
+
diff --git a/examples/getting_started/main.go b/docs/examples/getting_started/main.go
similarity index 91%
rename from examples/getting_started/main.go
rename to docs/examples/getting_started/main.go
index b030516..a99fa9c 100644
--- a/examples/getting_started/main.go
+++ b/docs/examples/getting_started/main.go
@@ -37,7 +37,7 @@ func main() {
// Create the management interface for the connection
// so we can declare exchanges, queues, and bindings
management := amqpConnection.Management()
- exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.TopicExchangeSpecification{
Name: exchangeName,
})
if err != nil {
@@ -46,9 +46,8 @@ func main() {
}
// Declare a Quorum queue
- queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{
- Name: queueName,
- QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum},
+ queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{
+ Name: queueName,
})
if err != nil {
@@ -57,7 +56,7 @@ func main() {
}
// Bind the queue to the exchange
- bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{
+ bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: routingKey,
@@ -70,8 +69,10 @@ func main() {
// Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function
- addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
- consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")
+
+ consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
+ Queue: queueName,
+ }, "getting-started-consumer")
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
@@ -105,8 +106,10 @@ func main() {
}
}(consumerContext)
- addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
- publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
+ publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{
+ Exchange: exchangeName,
+ Key: routingKey,
+ }, "getting-started-publisher")
if err != nil {
rabbitmq_amqp.Error("Error creating publisher", err)
return
@@ -151,10 +154,12 @@ func main() {
err = consumer.Close(context.Background())
if err != nil {
rabbitmq_amqp.Error("[Consumer]", err)
+ return
}
// Close the publisher
err = publisher.Close(context.Background())
if err != nil {
+ rabbitmq_amqp.Error("[Publisher]", err)
return
}
diff --git a/go.mod b/go.mod
index 2d3761d..a35e6d7 100644
--- a/go.mod
+++ b/go.mod
@@ -5,18 +5,18 @@ go 1.22.0
require (
github.com/Azure/go-amqp v1.4.0-beta.1
github.com/google/uuid v1.6.0
- github.com/onsi/ginkgo/v2 v2.20.2
- github.com/onsi/gomega v1.34.2
+ github.com/onsi/ginkgo/v2 v2.22.1
+ github.com/onsi/gomega v1.36.2
)
require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
- github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
+ github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
- golang.org/x/tools v0.25.0 // indirect
+ golang.org/x/tools v0.28.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
diff --git a/go.sum b/go.sum
index a4701e7..3ff0100 100644
--- a/go.sum
+++ b/go.sum
@@ -10,14 +10,14 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
-github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ=
-github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
+github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
+github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4=
-github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag=
-github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8=
-github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc=
+github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
+github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
+github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
+github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
@@ -28,10 +28,10 @@ golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
-golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
-golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
-google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
-google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
+golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
+google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
+google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
diff --git a/rabbitmq_amqp/address_helper.go b/rabbitmq_amqp/address.go
similarity index 52%
rename from rabbitmq_amqp/address_helper.go
rename to rabbitmq_amqp/address.go
index 47382b1..e93ac5f 100644
--- a/rabbitmq_amqp/address_helper.go
+++ b/rabbitmq_amqp/address.go
@@ -3,12 +3,71 @@ package rabbitmq_amqp
import (
"errors"
"fmt"
+ "github.com/Azure/go-amqp"
"strings"
)
-// Address Creates the address for the exchange or queue following the RabbitMQ conventions.
+// TargetAddress is an interface that represents an address that can be used to send messages to.
+// It can be either a Queue or an Exchange with a routing key.
+type TargetAddress interface {
+ toAddress() (string, error)
+}
+
+// QueueAddress represents the address of a queue.
+type QueueAddress struct {
+ Queue string // The name of the queue
+ Parameters string // Additional parameters not related to the queue. Most of the time it is empty
+}
+
+func (qas *QueueAddress) toAddress() (string, error) {
+ q := &qas.Queue
+ if isStringNilOrEmpty(&qas.Queue) {
+ q = nil
+ }
+ return queueAddress(q)
+}
+
+// ExchangeAddress represents the address of an exchange with a routing key.
+type ExchangeAddress struct {
+ Exchange string // The name of the exchange
+ Key string // The routing key. Can be empty
+}
+
+func (eas *ExchangeAddress) toAddress() (string, error) {
+ ex := &eas.Exchange
+ if isStringNilOrEmpty(&eas.Exchange) {
+ ex = nil
+ }
+ k := &eas.Key
+ if isStringNilOrEmpty(&eas.Key) {
+ k = nil
+ }
+ return exchangeAddress(ex, k)
+}
+
+// MessageToAddressHelper sets the To property of the message to the address of the target.
+// The target must be a QueueAddress or an ExchangeAddress.
+// Note: The field To will be overwritten if it is already set.
+func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error {
+ if target == nil {
+ return errors.New("target cannot be nil")
+ }
+
+ address, err := target.toAddress()
+ if err != nil {
+ return err
+ }
+
+ if msgRef.Properties == nil {
+ msgRef.Properties = &amqp.MessageProperties{}
+ }
+ msgRef.Properties.To = &address
+ return nil
+}
+
+// address Creates the address for the exchange or queue following the RabbitMQ conventions.
// see: https://www.rabbitmq.com/docs/next/amqp#address-v2
-func Address(exchange, key, queue *string, urlParameters *string) (string, error) {
+func address(exchange, key, queue *string, urlParameters *string) (string, error) {
if exchange == nil && queue == nil {
return "", errors.New("exchange or queue must be set")
}
@@ -39,23 +98,23 @@ func Address(exchange, key, queue *string, urlParameters *string) (string, error
return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil
}
-// ExchangeAddress Creates the address for the exchange
-// See Address for more information
-func ExchangeAddress(exchange, key *string) (string, error) {
- return Address(exchange, key, nil, nil)
+// exchangeAddress Creates the address for the exchange
+// See address for more information
+func exchangeAddress(exchange, key *string) (string, error) {
+ return address(exchange, key, nil, nil)
}
-// QueueAddress Creates the address for the queue.
-// See Address for more information
-func QueueAddress(queue *string) (string, error) {
- return Address(nil, nil, queue, nil)
+// queueAddress Creates the address for the queue.
+// See address for more information
+func queueAddress(queue *string) (string, error) {
+ return address(nil, nil, queue, nil)
}
// PurgeQueueAddress Creates the address for purging the queue.
-// See Address for more information
-func PurgeQueueAddress(queue *string) (string, error) {
+// See address for more information
+func purgeQueueAddress(queue *string) (string, error) {
parameter := "/messages"
- return Address(nil, nil, queue, ¶meter)
+ return address(nil, nil, queue, ¶meter)
}
// encodePathSegments takes a string and returns its percent-encoded representation.
@@ -112,6 +171,9 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
}
-func validateAddress(address string) bool {
- return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
+func validateAddress(address string) error {
+ if strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues)) {
+ return nil
+ }
+ return fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
}
diff --git a/rabbitmq_amqp/address_helper_test.go b/rabbitmq_amqp/address_test.go
similarity index 78%
rename from rabbitmq_amqp/address_helper_test.go
rename to rabbitmq_amqp/address_test.go
index 5e23528..4a0cc9f 100644
--- a/rabbitmq_amqp/address_helper_test.go
+++ b/rabbitmq_amqp/address_test.go
@@ -5,18 +5,18 @@ import (
. "github.com/onsi/gomega"
)
-var _ = Describe("Address builder test ", func() {
+var _ = Describe("address builder test ", func() {
It("With exchange, queue and key should raise and error", func() {
queue := "my_queue"
exchange := "my_exchange"
- _, err := Address(&exchange, nil, &queue, nil)
+ _, err := address(&exchange, nil, &queue, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange and queue cannot be set together"))
})
It("Without exchange and queue should raise and error", func() {
- _, err := Address(nil, nil, nil, nil)
+ _, err := address(nil, nil, nil, nil)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("exchange or queue must be set"))
})
@@ -25,14 +25,14 @@ var _ = Describe("Address builder test ", func() {
exchange := "my_exchange"
key := "my_key"
- address, err := Address(&exchange, &key, nil, nil)
+ address, err := address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange/my_key"))
})
It("With exchange should return address", func() {
exchange := "my_exchange"
- address, err := Address(&exchange, nil, nil, nil)
+ address, err := address(&exchange, nil, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_exchange"))
})
@@ -42,21 +42,21 @@ var _ = Describe("Address builder test ", func() {
exchange := "my_ exchange/()"
key := "my_key "
- address, err := Address(&exchange, &key, nil, nil)
+ address, err := address(&exchange, &key, nil, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20"))
})
It("With queue should return address", func() {
queue := "my_queue>"
- address, err := Address(nil, nil, &queue, nil)
+ address, err := address(nil, nil, &queue, nil)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue%3E"))
})
It("With queue and urlParameters should return address", func() {
queue := "my_queue"
- address, err := PurgeQueueAddress(&queue)
+ address, err := purgeQueueAddress(&queue)
Expect(err).To(BeNil())
Expect(address).To(Equal("/queues/my_queue/messages"))
})
diff --git a/rabbitmq_amqp/amqp_binding.go b/rabbitmq_amqp/amqp_binding.go
index f2b430a..0a42835 100644
--- a/rabbitmq_amqp/amqp_binding.go
+++ b/rabbitmq_amqp/amqp_binding.go
@@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
+ "errors"
"github.com/Azure/go-amqp"
)
@@ -31,18 +32,9 @@ func (b *AMQPBinding) SourceExchange(sourceName string) {
}
}
-func (b *AMQPBinding) DestinationExchange(destinationName string) {
- if len(destinationName) > 0 {
- b.destinationName = destinationName
- b.toQueue = false
- }
-}
-
-func (b *AMQPBinding) DestinationQueue(queueName string) {
- if len(queueName) > 0 {
- b.destinationName = queueName
- b.toQueue = true
- }
+func (b *AMQPBinding) Destination(name string, isQueue bool) {
+ b.destinationName = name
+ b.toQueue = isQueue
}
// Bind creates a binding between an exchange and a queue or exchange
@@ -50,11 +42,20 @@ func (b *AMQPBinding) DestinationQueue(queueName string) {
// Returns the binding path that can be used to unbind the binding.
// Given a virtual host, the binding path is unique.
func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
+ destination := "destination_queue"
+ if !b.toQueue {
+ destination = "destination_exchange"
+ }
+
+ if len(b.sourceName) == 0 || len(b.destinationName) == 0 {
+ return "", errors.New("source and destination names are required")
+ }
+
path := bindingPath()
kv := make(map[string]any)
kv["binding_key"] = b.bindingKey
kv["source"] = b.sourceName
- kv["destination_queue"] = b.destinationName
+ kv[destination] = b.destinationName
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go
index 5abc234..4930fab 100644
--- a/rabbitmq_amqp/amqp_binding_test.go
+++ b/rabbitmq_amqp/amqp_binding_test.go
@@ -23,20 +23,20 @@ var _ = Describe("AMQP Bindings test ", func() {
It("AMQP Bindings between Exchange and Queue Should succeed", func() {
const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess"
const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed"
- exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
Name: exchangeName,
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
Expect(exchangeInfo.Name()).To(Equal(exchangeName))
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ queueInfo, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.Name()).To(Equal(queueName))
- bindingPath, err := management.Bind(context.TODO(), &BindingSpecification{
+ bindingPath, err := management.Bind(context.TODO(), &ExchangeToQueueBindingSpecification{
SourceExchange: exchangeName,
DestinationQueue: queueName,
BindingKey: "routing-key",
@@ -49,4 +49,68 @@ var _ = Describe("AMQP Bindings test ", func() {
err = management.DeleteQueue(context.TODO(), queueName)
Expect(err).To(BeNil())
})
+
+ It("AMQP Bindings between Exchange and Exchange Should succeed", func() {
+ var exchangeName = generateName("Exchange_AMQP Bindings between Exchange and Exchange should succeed")
+ var exchangeName2 = generateName("Exchange_AMQP Bindings between Exchange and Exchange should succeed 2")
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
+ Name: exchangeName,
+ })
+ Expect(err).To(BeNil())
+ Expect(exchangeInfo).NotTo(BeNil())
+ Expect(exchangeInfo.Name()).To(Equal(exchangeName))
+
+ exchangeInfo2, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
+ Name: exchangeName2})
+ Expect(err).To(BeNil())
+ Expect(exchangeInfo2).NotTo(BeNil())
+ Expect(exchangeInfo2.Name()).To(Equal(exchangeName2))
+
+ bindingPath, err := management.Bind(context.TODO(), &ExchangeToExchangeBindingSpecification{
+ SourceExchange: exchangeName,
+ DestinationExchange: exchangeName2,
+ })
+
+ Expect(err).To(BeNil())
+ Expect(management.Unbind(context.TODO(), bindingPath)).To(BeNil())
+ Expect(management.DeleteExchange(context.TODO(), exchangeName)).To(BeNil())
+ Expect(management.DeleteExchange(context.TODO(), exchangeName2)).To(BeNil())
+ })
+
+ It("AMQP Bindings should fail if source or destinations are empty", func() {
+
+ _, err := management.Bind(context.TODO(), &ExchangeToExchangeBindingSpecification{
+ SourceExchange: "",
+ DestinationExchange: "destination",
+ })
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("source and destination names are required"))
+
+ _, err = management.Bind(context.TODO(), &ExchangeToExchangeBindingSpecification{
+ SourceExchange: "source",
+ DestinationExchange: "",
+ })
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("source and destination names are required"))
+
+ _, err = management.Bind(context.TODO(), &ExchangeToQueueBindingSpecification{
+ SourceExchange: "",
+ DestinationQueue: "destination",
+ })
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("source and destination names are required"))
+
+ _, err = management.Bind(context.TODO(), &ExchangeToQueueBindingSpecification{
+ SourceExchange: "source",
+ DestinationQueue: "",
+ })
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("source and destination names are required"))
+ })
+
+ It("AMQP Bindings should fail specification is nil", func() {
+ _, err := management.Bind(context.TODO(), nil)
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("binding specification cannot be nil"))
+ })
})
diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go
index db401f8..a7e0ff3 100644
--- a/rabbitmq_amqp/amqp_connection.go
+++ b/rabbitmq_amqp/amqp_connection.go
@@ -22,20 +22,40 @@ type AmqpConnection struct {
session *amqp.Session
}
-func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (*Publisher, error) {
- if !validateAddress(destinationAdd) {
- return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
+// NewPublisher creates a new Publisher that sends messages to the provided destination.
+// The destination is a TargetAddress that can be a Queue or an Exchange with a routing key.
+// See QueueAddress and ExchangeAddress for more information.
+func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAddress, linkName string) (*Publisher, error) {
+ destinationAdd := ""
+ err := error(nil)
+ if destination != nil {
+ destinationAdd, err = destination.toAddress()
+ if err != nil {
+ return nil, err
+ }
+ err = validateAddress(destinationAdd)
+ if err != nil {
+ return nil, err
+ }
}
+
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
return nil, err
}
- return newPublisher(sender), nil
+ return newPublisher(sender, destinationAdd != ""), nil
}
-func (a *AmqpConnection) Consumer(ctx context.Context, destinationAdd string, linkName string) (*Consumer, error) {
- if !validateAddress(destinationAdd) {
- return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
+// NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
+func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddress, linkName string) (*Consumer, error) {
+ destinationAdd, err := destination.toAddress()
+ if err != nil {
+ return nil, err
+ }
+ err = validateAddress(destinationAdd)
+
+ if err != nil {
+ return nil, err
}
receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce))
if err != nil {
diff --git a/rabbitmq_amqp/amqp_consumer_test.go b/rabbitmq_amqp/amqp_consumer_test.go
index ceabaa0..117ebbd 100644
--- a/rabbitmq_amqp/amqp_consumer_test.go
+++ b/rabbitmq_amqp/amqp_consumer_test.go
@@ -9,46 +9,41 @@ import (
"time"
)
-var _ = Describe("Consumer tests", func() {
+var _ = Describe("NewConsumer tests", func() {
- It("AMQP Consumer should fail due to context cancellation", func() {
- qName := generateNameWithDateTime("AMQP Consumer should fail due to context cancellation")
+ It("AMQP NewConsumer should fail due to context cancellation", func() {
+ qName := generateNameWithDateTime("AMQP NewConsumer should fail due to context cancellation")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
- addr, _ := QueueAddress(&qName)
- queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
- Name: qName,
- IsAutoDelete: false,
- IsExclusive: false,
- QueueType: QueueType{Quorum},
+
+ queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
cancel()
- _, err = connection.Consumer(ctx, addr, "test")
+ _, err = connection.NewConsumer(ctx, &QueueAddress{
+ Queue: qName,
+ }, "test")
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("context canceled"))
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())
})
- It("AMQP Consumer should ack and empty the queue", func() {
- qName := generateNameWithDateTime("AMQP Consumer should ack and empty the queue")
+ It("AMQP NewConsumer should ack and empty the queue", func() {
+ qName := generateNameWithDateTime("AMQP NewConsumer should ack and empty the queue")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
- queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
- Name: qName,
- IsAutoDelete: false,
- IsExclusive: false,
- QueueType: QueueType{Quorum},
+ queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 10)
- addr, _ := QueueAddress(&qName)
- consumer, err := connection.Consumer(context.Background(), addr, "test")
+ consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@@ -66,22 +61,18 @@ var _ = Describe("Consumer tests", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
- It("AMQP Consumer should requeue the message to the queue", func() {
+ It("AMQP NewConsumer should requeue the message to the queue", func() {
- qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue")
+ qName := generateNameWithDateTime("AMQP NewConsumer should requeue the message to the queue")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
- queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
- Name: qName,
- IsAutoDelete: false,
- IsExclusive: false,
- QueueType: QueueType{Quorum},
+ queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
- addr, _ := QueueAddress(&qName)
- consumer, err := connection.Consumer(context.Background(), addr, "test")
+ consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@@ -97,22 +88,18 @@ var _ = Describe("Consumer tests", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
- It("AMQP Consumer should requeue the message to the queue with annotations", func() {
+ It("AMQP NewConsumer should requeue the message to the queue with annotations", func() {
- qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue with annotations")
+ qName := generateNameWithDateTime("AMQP NewConsumer should requeue the message to the queue with annotations")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
- queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
- Name: qName,
- IsAutoDelete: false,
- IsExclusive: false,
- QueueType: QueueType{Quorum},
+ queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 1)
- addr, _ := QueueAddress(&qName)
- consumer, err := connection.Consumer(context.Background(), addr, "test")
+ consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
@@ -136,22 +123,18 @@ var _ = Describe("Consumer tests", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})
- It("AMQP Consumer should discard the message to the queue with and without annotations", func() {
+ It("AMQP NewConsumer should discard the message to the queue with and without annotations", func() {
// TODO: Implement this test with a dead letter queue to test the discard feature
- qName := generateNameWithDateTime("AMQP Consumer should discard the message to the queue with and without annotations")
+ qName := generateNameWithDateTime("AMQP NewConsumer should discard the message to the queue with and without annotations")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
- queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
- Name: qName,
- IsAutoDelete: false,
- IsExclusive: false,
- QueueType: QueueType{Quorum},
+ queue, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: qName,
})
Expect(err).To(BeNil())
Expect(queue).NotTo(BeNil())
publishMessages(qName, 2)
- addr, _ := QueueAddress(&qName)
- consumer, err := connection.Consumer(context.Background(), addr, "test")
+ consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
Expect(consumer).To(BeAssignableToTypeOf(&Consumer{}))
diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go
index b0e0299..bfc42ad 100644
--- a/rabbitmq_amqp/amqp_exchange.go
+++ b/rabbitmq_amqp/amqp_exchange.go
@@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
+ "errors"
"github.com/Azure/go-amqp"
)
@@ -34,7 +35,11 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
}
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) {
- path, err := ExchangeAddress(&e.name, nil)
+ if len(e.name) == 0 {
+ return nil, errors.New("exchange name cannot be empty")
+ }
+
+ path, err := exchangeAddress(&e.name, nil)
if err != nil {
return nil, err
}
@@ -59,7 +64,7 @@ func (e *AmqpExchange) IsAutoDelete() bool {
}
func (e *AmqpExchange) Delete(ctx context.Context) error {
- path, err := ExchangeAddress(&e.name, nil)
+ path, err := exchangeAddress(&e.name, nil)
if err != nil {
return err
}
diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go
index 6867273..e02b800 100644
--- a/rabbitmq_amqp/amqp_exchange_test.go
+++ b/rabbitmq_amqp/amqp_exchange_test.go
@@ -23,7 +23,7 @@ var _ = Describe("AMQP Exchange test ", func() {
It("AMQP Exchange Declare with Default and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed"
- exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &DirectExchangeSpecification{
Name: exchangeName,
})
Expect(err).To(BeNil())
@@ -35,9 +35,8 @@ var _ = Describe("AMQP Exchange test ", func() {
It("AMQP Exchange Declare with Topic and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed"
- exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
- Name: exchangeName,
- ExchangeType: ExchangeType{Topic},
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
+ Name: exchangeName,
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
@@ -48,10 +47,8 @@ var _ = Describe("AMQP Exchange test ", func() {
It("AMQP Exchange Declare with FanOut and Delete should succeed", func() {
const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed"
- //exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut})
- exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{
- Name: exchangeName,
- ExchangeType: ExchangeType{FanOut},
+ exchangeInfo, err := management.DeclareExchange(context.TODO(), &FanOutExchangeSpecification{
+ Name: exchangeName,
})
Expect(err).To(BeNil())
Expect(exchangeInfo).NotTo(BeNil())
@@ -59,4 +56,19 @@ var _ = Describe("AMQP Exchange test ", func() {
err = management.DeleteExchange(context.TODO(), exchangeName)
Expect(err).To(BeNil())
})
+
+ It("AMQP Exchange should fail when specification is nil", func() {
+ _, err := management.DeclareExchange(context.TODO(), nil)
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("exchange specification cannot be nil"))
+ })
+
+ It("AMQP Exchange should fail when name is empty", func() {
+ _, err := management.DeclareExchange(context.TODO(), &TopicExchangeSpecification{
+ Name: "",
+ IsAutoDelete: false,
+ })
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("exchange name cannot be empty"))
+ })
})
diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go
index 75f8b73..8696fec 100644
--- a/rabbitmq_amqp/amqp_management.go
+++ b/rabbitmq_amqp/amqp_management.go
@@ -170,23 +170,17 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return make(map[string]any), nil
}
-func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (*AmqpQueueInfo, error) {
- var amqpQueue *AmqpQueue
-
- if specification == nil || len(specification.Name) <= 0 {
- // If the specification is nil or the name is empty, then we create a new queue
- // with a random name with generateNameWithDefaultPrefix()
- amqpQueue = newAmqpQueue(a, "")
- } else {
- amqpQueue = newAmqpQueue(a, specification.Name)
- amqpQueue.AutoDelete(specification.IsAutoDelete)
- amqpQueue.Exclusive(specification.IsExclusive)
- amqpQueue.MaxLengthBytes(specification.MaxLengthBytes)
- amqpQueue.DeadLetterExchange(specification.DeadLetterExchange)
- amqpQueue.DeadLetterRoutingKey(specification.DeadLetterRoutingKey)
- amqpQueue.QueueType(specification.QueueType)
+func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification QueueSpecification) (*AmqpQueueInfo, error) {
+ if specification == nil {
+ return nil, fmt.Errorf("queue specification cannot be nil. You need to provide a valid QueueSpecification")
}
+ amqpQueue := newAmqpQueue(a, specification.name())
+ amqpQueue.AutoDelete(specification.isAutoDelete())
+ amqpQueue.Exclusive(specification.isExclusive())
+ amqpQueue.QueueType(specification.queueType())
+ amqpQueue.SetArguments(specification.buildArguments())
+
return amqpQueue.Declare(ctx)
}
@@ -195,14 +189,14 @@ func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error {
return q.Delete(ctx)
}
-func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (*AmqpExchangeInfo, error) {
+func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification ExchangeSpecification) (*AmqpExchangeInfo, error) {
if exchangeSpecification == nil {
- return nil, fmt.Errorf("exchangeSpecification is nil")
+ return nil, errors.New("exchange specification cannot be nil. You need to provide a valid ExchangeSpecification")
}
- exchange := newAmqpExchange(a, exchangeSpecification.Name)
- exchange.AutoDelete(exchangeSpecification.IsAutoDelete)
- exchange.ExchangeType(exchangeSpecification.ExchangeType)
+ exchange := newAmqpExchange(a, exchangeSpecification.name())
+ exchange.AutoDelete(exchangeSpecification.isAutoDelete())
+ exchange.ExchangeType(exchangeSpecification.exchangeType())
return exchange.Declare(ctx)
}
@@ -211,12 +205,15 @@ func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error
return e.Delete(ctx)
}
-func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error) {
+func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification BindingSpecification) (string, error) {
+ if bindingSpecification == nil {
+ return "", fmt.Errorf("binding specification cannot be nil. You need to provide a valid BindingSpecification")
+ }
+
bind := newAMQPBinding(a)
- bind.SourceExchange(bindingSpecification.SourceExchange)
- bind.DestinationQueue(bindingSpecification.DestinationQueue)
- bind.DestinationExchange(bindingSpecification.DestinationExchange)
- bind.BindingKey(bindingSpecification.BindingKey)
+ bind.SourceExchange(bindingSpecification.sourceExchange())
+ bind.Destination(bindingSpecification.destination(), bindingSpecification.isDestinationQueue())
+ bind.BindingKey(bindingSpecification.bindingKey())
return bind.Bind(ctx)
}
@@ -225,7 +222,7 @@ func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error {
return bind.Unbind(ctx, bindingPath)
}
func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error) {
- path, err := QueueAddress(&queueName)
+ path, err := queueAddress(&queueName)
if err != nil {
return nil, err
}
diff --git a/rabbitmq_amqp/amqp_publisher.go b/rabbitmq_amqp/amqp_publisher.go
index 508949d..86c7e96 100644
--- a/rabbitmq_amqp/amqp_publisher.go
+++ b/rabbitmq_amqp/amqp_publisher.go
@@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
+ "fmt"
"github.com/Azure/go-amqp"
)
@@ -10,23 +11,63 @@ type PublishResult struct {
Message *amqp.Message
}
+// Publisher is a publisher that sends messages to a specific destination address.
type Publisher struct {
- sender *amqp.Sender
+ sender *amqp.Sender
+ staticTargetAddress bool
}
-func newPublisher(sender *amqp.Sender) *Publisher {
- return &Publisher{sender: sender}
+func newPublisher(sender *amqp.Sender, staticTargetAddress bool) *Publisher {
+ return &Publisher{sender: sender, staticTargetAddress: staticTargetAddress}
}
-// Publish sends a message to the destination address.
-// The message is sent to the destination address and the outcome of the operation is returned.
-// The outcome is a DeliveryState that indicates if the message was accepted or rejected.
-// RabbitMQ supports the following DeliveryState types:
-// - StateAccepted
-// - StateReleased
-// - StateRejected
-// See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
+/*
+Publish sends a message to the destination address that can be decided during the creation of the publisher or at the time of sending the message.
+
+The message is sent and the outcome of the operation is returned.
+The outcome is a DeliveryState that indicates if the message was accepted or rejected.
+RabbitMQ supports the following DeliveryState types:
+
+ - StateAccepted
+ - StateReleased
+ - StateRejected
+ See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
+
+Note: If the destination address is not defined during the creation, the message must have a TO property set.
+You can use the helper "MessageToAddressHelper" to create the destination address.
+See the examples:
+Create a new publisher that sends messages to a specific destination address:
+
+
+ publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{
+ Exchange: "myExchangeName",
+ Key: "myRoutingKey",
+ }
+
+ .. publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
+
+
+Create a new publisher that sends messages based on message destination address:
+
+
+ publisher, err := connection.NewPublisher(context.Background(), nil, "test")
+ msg := amqp.NewMessage([]byte("hello"))
+ ..:= MessageToAddressHelper(msg, &QueueAddress{Queue: "myQueueName"})
+ ..:= publisher.Publish(context.Background(), msg)
+
+
+*/
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) {
+ if !m.staticTargetAddress {
+ if message.Properties == nil || message.Properties.To == nil {
+ return nil, fmt.Errorf("message properties TO is required to send a message to a dynamic target address")
+ }
+
+ err := validateAddress(*message.Properties.To)
+ if err != nil {
+ return nil, err
+ }
+ }
r, err := m.sender.SendWithReceipt(ctx, message, nil)
if err != nil {
return nil, err
diff --git a/rabbitmq_amqp/amqp_publisher_test.go b/rabbitmq_amqp/amqp_publisher_test.go
index bfd2285..fc02455 100644
--- a/rabbitmq_amqp/amqp_publisher_test.go
+++ b/rabbitmq_amqp/amqp_publisher_test.go
@@ -8,18 +8,17 @@ import (
)
var _ = Describe("AMQP publisher ", func() {
- It("Send a message to a queue with a Message Target Publisher", func() {
- qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher")
+ It("Send a message to a queue with a Message Target NewPublisher", func() {
+ qName := generateNameWithDateTime("Send a message to a queue with a Message Target NewPublisher")
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
- queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
+ queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
- dest, _ := QueueAddress(&qName)
- publisher, err := connection.Publisher(context.Background(), dest, "test")
+ publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
Expect(publisher).To(BeAssignableToTypeOf(&Publisher{}))
@@ -36,29 +35,14 @@ var _ = Describe("AMQP publisher ", func() {
Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil())
})
- It("Publisher should fail to a not existing exchange", func() {
+ It("NewPublisher should fail to a not existing exchange", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
exchangeName := "Nope"
- addr, err := ExchangeAddress(&exchangeName, nil)
- Expect(err).To(BeNil())
- publisher, err := connection.Publisher(context.Background(), addr, "test")
- Expect(err).NotTo(BeNil())
- Expect(publisher).To(BeNil())
- Expect(connection.Close(context.Background())).To(BeNil())
- })
-
- It("Publisher should fail if the destination address does not start in the correct way", func() {
- connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
- Expect(err).To(BeNil())
- Expect(connection).NotTo(BeNil())
- destinationAddress := "this is not valid since does not start with exchanges or queues"
- Expect(err).To(BeNil())
- publisher, err := connection.Publisher(context.Background(), destinationAddress, "test")
+ publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{Exchange: exchangeName}, "test")
Expect(err).NotTo(BeNil())
Expect(publisher).To(BeNil())
- Expect(err.Error()).To(ContainSubstring("invalid destination address"))
Expect(connection.Close(context.Background())).To(BeNil())
})
@@ -67,17 +51,18 @@ var _ = Describe("AMQP publisher ", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
- exchange, err := connection.Management().DeclareExchange(context.Background(), &ExchangeSpecification{
+ exchange, err := connection.Management().DeclareExchange(context.Background(), &TopicExchangeSpecification{
Name: eName,
IsAutoDelete: false,
- ExchangeType: ExchangeType{Type: Topic},
})
Expect(err).To(BeNil())
Expect(exchange).NotTo(BeNil())
routingKeyNope := "I don't exist"
- addr, err := ExchangeAddress(&eName, &routingKeyNope)
Expect(err).To(BeNil())
- publisher, err := connection.Publisher(context.Background(), addr, "test")
+ publisher, err := connection.NewPublisher(context.Background(), &ExchangeAddress{
+ Exchange: eName,
+ Key: routingKeyNope,
+ }, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
@@ -93,12 +78,11 @@ var _ = Describe("AMQP publisher ", func() {
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
- _, err = connection.Management().DeclareQueue(context.Background(), &QueueSpecification{
+ _, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: qName,
})
Expect(err).To(BeNil())
- dest, _ := QueueAddress(&qName)
- publisher, err := connection.Publisher(context.Background(), dest, "test")
+ publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")))
@@ -110,4 +94,104 @@ var _ = Describe("AMQP publisher ", func() {
Expect(err).NotTo(BeNil())
Expect(connection.Close(context.Background()))
})
+
+ It("Multi Targets Publisher should fail with StateReleased when the destination does not exist", func() {
+ connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
+ Expect(err).To(BeNil())
+ Expect(connection).NotTo(BeNil())
+ publisher, err := connection.NewPublisher(context.Background(), nil, "test")
+ Expect(err).To(BeNil())
+ Expect(publisher).NotTo(BeNil())
+ qName := generateNameWithDateTime("Targets Publisher should fail when the destination does not exist")
+ msg := amqp.NewMessage([]byte("hello"))
+ Expect(MessageToAddressHelper(msg, &QueueAddress{Queue: qName})).To(BeNil())
+
+ publishResult, err := publisher.Publish(context.Background(), msg)
+ Expect(err).To(BeNil())
+ Expect(publishResult).NotTo(BeNil())
+ Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{}))
+ Expect(connection.Close(context.Background())).To(BeNil())
+ })
+
+ It("Multi Targets Publisher should success with StateReceived when the destination exists", func() {
+ connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
+ Expect(err).To(BeNil())
+ Expect(connection).NotTo(BeNil())
+ Expect(err).To(BeNil())
+ publisher, err := connection.NewPublisher(context.Background(), nil, "test")
+ Expect(err).To(BeNil())
+ Expect(publisher).NotTo(BeNil())
+ name := generateNameWithDateTime("Targets Publisher should success with StateReceived when the destination exists")
+ _, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
+ Name: name,
+ })
+ Expect(err).To(BeNil())
+ // as first message is sent to a queue, the outcome should be StateReceived
+ // since the message was accepted by the existing queue
+ msg := amqp.NewMessage([]byte("hello"))
+ Expect(MessageToAddressHelper(msg, &QueueAddress{Queue: name})).To(BeNil())
+
+ publishResult, err := publisher.Publish(context.Background(), msg)
+ Expect(err).To(BeNil())
+ Expect(publishResult).NotTo(BeNil())
+ Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
+
+ _, err = connection.Management().DeclareExchange(context.Background(), &TopicExchangeSpecification{
+ Name: name,
+ IsAutoDelete: false,
+ })
+ msg = amqp.NewMessage([]byte("hello"))
+ Expect(MessageToAddressHelper(msg, &ExchangeAddress{Exchange: name})).To(BeNil())
+ // the status should be StateReleased since the exchange does not have any binding
+ publishResult, err = publisher.Publish(context.Background(), msg)
+ Expect(err).To(BeNil())
+ Expect(publishResult).NotTo(BeNil())
+ Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{}))
+
+ // Create the binding between the exchange and the queue
+ _, err = connection.Management().Bind(context.Background(), &ExchangeToQueueBindingSpecification{
+ SourceExchange: name,
+ DestinationQueue: name,
+ BindingKey: "#",
+ })
+ Expect(err).To(BeNil())
+ // the status should be StateAccepted since the exchange has a binding
+ msg = amqp.NewMessage([]byte("hello"))
+ Expect(MessageToAddressHelper(msg, &ExchangeAddress{Exchange: name})).To(BeNil())
+ publishResult, err = publisher.Publish(context.Background(), msg)
+ Expect(err).To(BeNil())
+ Expect(publishResult).NotTo(BeNil())
+ Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{}))
+ Expect(connection.Management().DeleteQueue(context.Background(), name)).To(BeNil())
+ Expect(connection.Management().DeleteExchange(context.Background(), name)).To(BeNil())
+ Expect(connection.Close(context.Background())).To(BeNil())
+ })
+
+ It("Multi Targets Publisher should fail it TO is not set or not valid", func() {
+ connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
+ Expect(err).To(BeNil())
+ Expect(connection).NotTo(BeNil())
+ publisher, err := connection.NewPublisher(context.Background(), nil, "test")
+ Expect(err).To(BeNil())
+ Expect(publisher).NotTo(BeNil())
+ msg := amqp.NewMessage([]byte("hello"))
+ // the message should fail since the TO property is not set
+ publishResult, err := publisher.Publish(context.Background(), msg)
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("message properties TO is required"))
+ Expect(publishResult).To(BeNil())
+
+ invalid := "invalid"
+ // the message should fail since the TO property is not valid
+ msg.Properties = &amqp.MessageProperties{
+ To: &invalid,
+ }
+
+ publishResult, err = publisher.Publish(context.Background(), msg)
+ Expect(err).NotTo(BeNil())
+ Expect(err.Error()).To(ContainSubstring("invalid destination address"))
+ Expect(publishResult).To(BeNil())
+
+ Expect(connection.Close(context.Background())).To(BeNil())
+ })
})
diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go
index 16441c3..9890713 100644
--- a/rabbitmq_amqp/amqp_queue.go
+++ b/rabbitmq_amqp/amqp_queue.go
@@ -2,6 +2,7 @@ package rabbitmq_amqp
import (
"context"
+ "errors"
"github.com/Azure/go-amqp"
)
@@ -70,22 +71,8 @@ type AmqpQueue struct {
name string
}
-func (a *AmqpQueue) DeadLetterExchange(dlx string) {
- if len(dlx) != 0 {
- a.arguments["x-dead-letter-exchange"] = dlx
- }
-}
-
-func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) {
- if len(dlrk) != 0 {
- a.arguments["x-dead-letter-routing-key"] = dlrk
- }
-}
-
-func (a *AmqpQueue) MaxLengthBytes(length int64) {
- if length != 0 {
- a.arguments["max-length-bytes"] = length
- }
+func (a *AmqpQueue) SetArguments(arguments map[string]any) {
+ a.arguments = arguments
}
func (a *AmqpQueue) QueueType(queueType QueueType) {
@@ -94,13 +81,6 @@ func (a *AmqpQueue) QueueType(queueType QueueType) {
}
}
-func (a *AmqpQueue) GetQueueType() TQueueType {
- if a.arguments["x-queue-type"] == nil {
- return Classic
- }
- return TQueueType(a.arguments["x-queue-type"].(string))
-}
-
func (a *AmqpQueue) Exclusive(isExclusive bool) {
a.isExclusive = isExclusive
}
@@ -124,32 +104,40 @@ func newAmqpQueue(management *AmqpManagement, queueName string) *AmqpQueue {
}
func (a *AmqpQueue) validate() error {
- if a.arguments["max-length-bytes"] != nil {
- err := validatePositive("max length", a.arguments["max-length-bytes"].(int64))
+ if a.arguments["x-max-length-bytes"] != nil {
+ err := validatePositive("max length", a.arguments["x-max-length-bytes"].(int64))
if err != nil {
return err
}
}
+
+ if a.arguments["x-max-length"] != nil {
+ err := validatePositive("max length", a.arguments["x-max-length"].(int64))
+ if err != nil {
+ return err
+ }
+ }
+ if a.arguments["x-max-priority"] != nil {
+ err := validatePositive("max priority", a.arguments["x-max-priority"].(int64))
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error) {
- if Quorum == a.GetQueueType() ||
- Stream == a.GetQueueType() {
- // mandatory arguments for quorum queues and streams
- a.Exclusive(false)
- a.AutoDelete(false)
- }
if err := a.validate(); err != nil {
return nil, err
}
if a.name == "" {
- a.name = generateNameWithDefaultPrefix()
+ return nil, errors.New("queue name is required")
}
- path, err := QueueAddress(&a.name)
+ path, err := queueAddress(&a.name)
if err != nil {
return nil, err
}
@@ -166,7 +154,7 @@ func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error) {
}
func (a *AmqpQueue) Delete(ctx context.Context) error {
- path, err := QueueAddress(&a.name)
+ path, err := queueAddress(&a.name)
if err != nil {
return err
}
@@ -175,7 +163,7 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
}
func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
- path, err := PurgeQueueAddress(&a.name)
+ path, err := purgeQueueAddress(&a.name)
if err != nil {
return 0, err
}
diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go
index 305fc00..944529b 100644
--- a/rabbitmq_amqp/amqp_queue_test.go
+++ b/rabbitmq_amqp/amqp_queue_test.go
@@ -25,8 +25,8 @@ var _ = Describe("AMQP Queue test ", func() {
})
It("AMQP Queue Declare With Response and Get/Delete should succeed", func() {
- const queueName = "AMQP Queue Declare With Response and Delete should succeed"
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ var queueName = generateName("AMQP Queue Declare With Response and Delete should succeed")
+ queueInfo, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
@@ -35,7 +35,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
- Expect(queueInfo.Type()).To(Equal(Classic))
+ Expect(queueInfo.Type()).To(Equal(Quorum))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
@@ -46,16 +46,22 @@ var _ = Describe("AMQP Queue test ", func() {
})
It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() {
- const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
+ var queueName = generateName("AMQP Queue Declare With Parameters and Delete should succeed")
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ queueInfo, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
Name: queueName,
IsAutoDelete: true,
IsExclusive: true,
- QueueType: QueueType{Classic},
- MaxLengthBytes: CapacityGB(1),
+ AutoExpire: 1000,
+ MessageTTL: 1000,
+ OverflowStrategy: &DropHeadOverflowStrategy{},
+ SingleActiveConsumer: true,
DeadLetterExchange: "dead-letter-exchange",
DeadLetterRoutingKey: "dead-letter-routing-key",
+ MaxLength: 9_000,
+ MaxLengthBytes: CapacityGB(1),
+ MaxPriority: 2,
+ LeaderLocator: &BalancedLeaderLocator{},
})
Expect(err).To(BeNil())
@@ -70,7 +76,14 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange"))
Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
- Expect(queueInfo.Arguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-length-bytes", int64(1000000000)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-length", int64(9000)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-message-ttl", int64(1000)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-single-active-consumer", true))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-overflow", "drop-head"))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-expires", int64(1000)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-max-priority", int64(2)))
+ Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-queue-leader-locator", "random"))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
@@ -82,14 +95,11 @@ var _ = Describe("AMQP Queue test ", func() {
})
It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() {
- const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
+ var queueName = generateName("AMQP Declare Quorum Queue and Delete should succeed")
// Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
- Name: queueName,
- IsAutoDelete: true,
- IsExclusive: true,
- QueueType: QueueType{Quorum},
+ queueInfo, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
+ Name: queueName,
})
Expect(err).To(BeNil())
@@ -113,12 +123,10 @@ var _ = Describe("AMQP Queue test ", func() {
// Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ queueInfo, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
Name: queueName,
- IsAutoDelete: true,
- IsExclusive: true,
- QueueType: QueueType{Stream},
- })
+ IsAutoDelete: false,
+ IsExclusive: false})
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
@@ -126,7 +134,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsDurable()).To(BeTrue())
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
- Expect(queueInfo.Type()).To(Equal(Stream))
+ Expect(queueInfo.Type()).To(Equal(Classic))
// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))
@@ -136,29 +144,18 @@ var _ = Describe("AMQP Queue test ", func() {
})
- It("AMQP Declare Queue with invalid type should fail", func() {
- const queueName = "AMQP Declare Queue with invalid type should fail"
- _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
- Name: queueName,
- QueueType: QueueType{Type: "invalid"},
- })
- Expect(err).NotTo(BeNil())
- })
-
It("AMQP Declare Queue should fail with Precondition fail", func() {
// The first queue is declared as Classic, and it should succeed
// The second queue is declared as Quorum, and it should fail since it is already declared as Classic
- queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
-
- _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
- Name: queueName,
- QueueType: QueueType{Classic},
+ //queueName := generateName("AMQP Declare Queue should fail with Precondition fail")
+ queueName := "ab"
+ _, err := management.DeclareQueue(context.TODO(), &ClassicQueueSpecification{
+ Name: queueName,
})
Expect(err).To(BeNil())
- _, err = management.DeclareQueue(context.TODO(), &QueueSpecification{
- Name: queueName,
- QueueType: QueueType{Quorum},
+ _, err = management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
+ Name: queueName,
})
Expect(err).NotTo(BeNil())
@@ -169,7 +166,7 @@ var _ = Describe("AMQP Queue test ", func() {
It("AMQP Declare Queue should fail during validation", func() {
queueName := generateName("AMQP Declare Queue should fail during validation")
- _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ _, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
Name: queueName,
MaxLengthBytes: -1,
})
@@ -178,8 +175,18 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(HaveOccurred())
})
+ It("AMQP Declare Queue should fail if queue specification is nil", func() {
+ _, err := management.DeclareQueue(context.TODO(), nil)
+ Expect(err).NotTo(BeNil())
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).Should(ContainSubstring("queue specification cannot be nil"))
+ })
+
It("AMQP Declare Queue should create client name queue", func() {
- queueInfo, err := management.DeclareQueue(context.TODO(), nil)
+ queueInfo, err := management.DeclareQueue(context.TODO(), &AutoGeneratedQueueSpecification{
+ IsAutoDelete: true,
+ IsExclusive: false,
+ })
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())
Expect(queueInfo.Name()).To(ContainSubstring("client.gen-"))
@@ -189,7 +196,7 @@ var _ = Describe("AMQP Queue test ", func() {
It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
queueName := generateName("AMQP Purge Queue should succeed and return the number of messages purged")
- queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{
+ queueInfo, err := management.DeclareQueue(context.TODO(), &QuorumQueueSpecification{
Name: queueName,
})
Expect(err).To(BeNil())
@@ -213,10 +220,7 @@ func publishMessages(queueName string, count int) {
conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil)
Expect(err).To(BeNil())
- address, err := QueueAddress(&queueName)
- Expect(err).To(BeNil())
-
- publisher, err := conn.Publisher(context.TODO(), address, "test")
+ publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, "test")
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())
diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go
index 2a9d72d..197d0b2 100644
--- a/rabbitmq_amqp/entities.go
+++ b/rabbitmq_amqp/entities.go
@@ -16,17 +16,264 @@ func (e QueueType) String() string {
return string(e.Type)
}
-// QueueSpecification represents the specification of a queue
-type QueueSpecification struct {
+type QueueSpecification interface {
+ name() string
+ isAutoDelete() bool
+ isExclusive() bool
+ queueType() QueueType
+ buildArguments() map[string]any
+}
+
+// QuorumQueueSpecification represents the specification of the quorum queue
+
+type OverflowStrategy interface {
+ overflowStrategy() string
+}
+
+type DropHeadOverflowStrategy struct {
+}
+
+func (d *DropHeadOverflowStrategy) overflowStrategy() string {
+ return "drop-head"
+}
+
+type RejectPublishOverflowStrategy struct {
+}
+
+func (r *RejectPublishOverflowStrategy) overflowStrategy() string {
+ return "reject-publish"
+}
+
+type RejectPublishDlxOverflowStrategy struct {
+}
+
+func (r *RejectPublishDlxOverflowStrategy) overflowStrategy() string {
+ return "reject-publish-dlx"
+}
+
+type LeaderLocator interface {
+ leaderLocator() string
+}
+
+type BalancedLeaderLocator struct {
+}
+
+func (r *BalancedLeaderLocator) leaderLocator() string {
+ return "random"
+}
+
+type ClientLocalLeaderLocator struct {
+}
+
+func (r *ClientLocalLeaderLocator) leaderLocator() string {
+ return "client-local"
+}
+
+type QuorumQueueSpecification struct {
+ Name string
+ AutoExpire int64
+ MessageTTL int64
+ OverflowStrategy OverflowStrategy
+ SingleActiveConsumer bool
+ DeadLetterExchange string
+ DeadLetterRoutingKey string
+ MaxLength int64
+ MaxLengthBytes int64
+ DeliveryLimit int64
+ TargetClusterSize int64
+ LeaderLocator LeaderLocator
+}
+
+func (q *QuorumQueueSpecification) name() string {
+ return q.Name
+}
+
+func (q *QuorumQueueSpecification) isAutoDelete() bool {
+ return false
+}
+
+func (q *QuorumQueueSpecification) isExclusive() bool {
+ return false
+}
+
+func (q *QuorumQueueSpecification) queueType() QueueType {
+ return QueueType{Type: Quorum}
+}
+
+func (q *QuorumQueueSpecification) buildArguments() map[string]any {
+ result := map[string]any{}
+ if q.MaxLengthBytes != 0 {
+ result["x-max-length-bytes"] = q.MaxLengthBytes
+ }
+
+ if len(q.DeadLetterExchange) != 0 {
+ result["x-dead-letter-exchange"] = q.DeadLetterExchange
+ }
+
+ if len(q.DeadLetterRoutingKey) != 0 {
+ result["x-dead-letter-routing-key"] = q.DeadLetterRoutingKey
+ }
+
+ if q.AutoExpire != 0 {
+ result["x-expires"] = q.AutoExpire
+ }
+
+ if q.MessageTTL != 0 {
+ result["x-message-ttl"] = q.MessageTTL
+ }
+
+ if q.OverflowStrategy != nil {
+ result["x-overflow"] = q.OverflowStrategy.overflowStrategy()
+ }
+
+ if q.SingleActiveConsumer {
+ result["x-single-active-consumer"] = true
+ }
+
+ if q.MaxLength != 0 {
+ result["x-max-length"] = q.MaxLength
+ }
+
+ if q.DeliveryLimit != 0 {
+ result["x-delivery-limit"] = q.DeliveryLimit
+ }
+
+ if q.TargetClusterSize != 0 {
+ result["x-quorum-target-group-size"] = q.TargetClusterSize
+ }
+
+ if q.LeaderLocator != nil {
+ result["x-queue-leader-locator"] = q.LeaderLocator.leaderLocator()
+ }
+
+ result["x-queue-type"] = q.queueType().String()
+ return result
+}
+
+// ClassicQueueSpecification represents the specification of the classic queue
+type ClassicQueueSpecification struct {
Name string
IsAutoDelete bool
IsExclusive bool
- QueueType QueueType
- MaxLengthBytes int64
+ AutoExpire int64
+ MessageTTL int64
+ OverflowStrategy OverflowStrategy
+ SingleActiveConsumer bool
DeadLetterExchange string
DeadLetterRoutingKey string
+ MaxLength int64
+ MaxLengthBytes int64
+ MaxPriority int64
+ LeaderLocator LeaderLocator
+}
+
+func (q *ClassicQueueSpecification) name() string {
+ return q.Name
+}
+
+func (q *ClassicQueueSpecification) isAutoDelete() bool {
+ return q.IsAutoDelete
+}
+
+func (q *ClassicQueueSpecification) isExclusive() bool {
+ return q.IsExclusive
+}
+
+func (q *ClassicQueueSpecification) queueType() QueueType {
+ return QueueType{Type: Classic}
+}
+
+func (q *ClassicQueueSpecification) buildArguments() map[string]any {
+ result := map[string]any{}
+
+ if q.MaxLengthBytes != 0 {
+ result["x-max-length-bytes"] = q.MaxLengthBytes
+ }
+
+ if len(q.DeadLetterExchange) != 0 {
+ result["x-dead-letter-exchange"] = q.DeadLetterExchange
+ }
+
+ if len(q.DeadLetterRoutingKey) != 0 {
+ result["x-dead-letter-routing-key"] = q.DeadLetterRoutingKey
+ }
+
+ if q.AutoExpire != 0 {
+ result["x-expires"] = q.AutoExpire
+ }
+
+ if q.MessageTTL != 0 {
+ result["x-message-ttl"] = q.MessageTTL
+ }
+
+ if q.OverflowStrategy != nil {
+ result["x-overflow"] = q.OverflowStrategy.overflowStrategy()
+ }
+
+ if q.SingleActiveConsumer {
+ result["x-single-active-consumer"] = true
+ }
+
+ if q.MaxLength != 0 {
+ result["x-max-length"] = q.MaxLength
+ }
+
+ if q.MaxPriority != 0 {
+ result["x-max-priority"] = q.MaxPriority
+ }
+
+ if q.LeaderLocator != nil {
+ result["x-queue-leader-locator"] = q.LeaderLocator.leaderLocator()
+ }
+
+ result["x-queue-type"] = q.queueType().String()
+
+ return result
+}
+
+type AutoGeneratedQueueSpecification struct {
+ IsAutoDelete bool
+ IsExclusive bool
+ MaxLength int64
+ MaxLengthBytes int64
+}
+
+func (a *AutoGeneratedQueueSpecification) name() string {
+ return generateNameWithDefaultPrefix()
+}
+
+func (a *AutoGeneratedQueueSpecification) isAutoDelete() bool {
+ return a.IsAutoDelete
+}
+
+func (a *AutoGeneratedQueueSpecification) isExclusive() bool {
+ return a.IsExclusive
+}
+
+func (a *AutoGeneratedQueueSpecification) queueType() QueueType {
+ return QueueType{Classic}
+}
+
+func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any {
+ result := map[string]any{}
+
+ if a.MaxLengthBytes != 0 {
+ result["x-max-length-bytes"] = a.MaxLengthBytes
+ }
+
+ if a.MaxLength != 0 {
+ result["x-max-length"] = a.MaxLength
+ }
+
+ result["x-queue-type"] = a.queueType().String()
+
+ return result
+
}
+// / **** Exchange ****
+
+// TExchangeType represents the type of exchange
type TExchangeType string
const (
@@ -43,15 +290,124 @@ func (e ExchangeType) String() string {
return string(e.Type)
}
-type ExchangeSpecification struct {
+// ExchangeSpecification represents the specification of an exchange
+type ExchangeSpecification interface {
+ name() string
+ isAutoDelete() bool
+ exchangeType() ExchangeType
+ buildArguments() map[string]any
+}
+
+type DirectExchangeSpecification struct {
Name string
IsAutoDelete bool
- ExchangeType ExchangeType
}
-type BindingSpecification struct {
+func (d *DirectExchangeSpecification) name() string {
+ return d.Name
+}
+
+func (d *DirectExchangeSpecification) isAutoDelete() bool {
+ return d.IsAutoDelete
+}
+
+func (d *DirectExchangeSpecification) exchangeType() ExchangeType {
+ return ExchangeType{Type: Direct}
+}
+
+func (d *DirectExchangeSpecification) buildArguments() map[string]any {
+ return map[string]any{}
+}
+
+type TopicExchangeSpecification struct {
+ Name string
+ IsAutoDelete bool
+}
+
+func (t *TopicExchangeSpecification) name() string {
+ return t.Name
+}
+
+func (t *TopicExchangeSpecification) isAutoDelete() bool {
+ return t.IsAutoDelete
+}
+
+func (t *TopicExchangeSpecification) exchangeType() ExchangeType {
+ return ExchangeType{Type: Topic}
+}
+
+func (t *TopicExchangeSpecification) buildArguments() map[string]any {
+ return map[string]any{}
+}
+
+type FanOutExchangeSpecification struct {
+ Name string
+ IsAutoDelete bool
+}
+
+func (f *FanOutExchangeSpecification) name() string {
+ return f.Name
+}
+
+func (f *FanOutExchangeSpecification) isAutoDelete() bool {
+ return f.IsAutoDelete
+}
+
+func (f *FanOutExchangeSpecification) exchangeType() ExchangeType {
+ return ExchangeType{Type: FanOut}
+}
+
+func (f *FanOutExchangeSpecification) buildArguments() map[string]any {
+ return map[string]any{}
+}
+
+type BindingSpecification interface {
+ sourceExchange() string
+ destination() string
+ bindingKey() string
+ isDestinationQueue() bool
+}
+
+type ExchangeToQueueBindingSpecification struct {
+ SourceExchange string
+ DestinationQueue string
+ BindingKey string
+}
+
+func (e *ExchangeToQueueBindingSpecification) sourceExchange() string {
+ return e.SourceExchange
+}
+
+func (e *ExchangeToQueueBindingSpecification) destination() string {
+ return e.DestinationQueue
+}
+
+func (e *ExchangeToQueueBindingSpecification) isDestinationQueue() bool {
+ return true
+}
+
+func (e *ExchangeToQueueBindingSpecification) bindingKey() string {
+ return e.BindingKey
+}
+
+type ExchangeToExchangeBindingSpecification struct {
SourceExchange string
- DestinationQueue string
DestinationExchange string
BindingKey string
}
+
+func (e *ExchangeToExchangeBindingSpecification) sourceExchange() string {
+ return e.SourceExchange
+}
+
+func (e *ExchangeToExchangeBindingSpecification) destination() string {
+ return e.DestinationExchange
+}
+
+func (e *ExchangeToExchangeBindingSpecification) isDestinationQueue() bool {
+ return false
+}
+
+func (e *ExchangeToExchangeBindingSpecification) bindingKey() string {
+ return e.BindingKey
+}