diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index 0491115..5c615c4 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -48,7 +48,7 @@ func main() { } fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName()) - bindingSpec := management.Binding().SourceExchange(exchangeInfo.GetName()).DestinationQueue(queueInfo.GetName()).Key("routing-key") + bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key") err = bindingSpec.Bind(context.Background()) if err != nil { diff --git a/go.mod b/go.mod index 12cb540..7e84846 100644 --- a/go.mod +++ b/go.mod @@ -3,20 +3,20 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client go 1.22.0 require ( + github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 + github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 ) require ( - github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect - github.com/google/uuid v1.6.0 // indirect - golang.org/x/net v0.28.0 // indirect - golang.org/x/sys v0.24.0 // indirect - golang.org/x/text v0.17.0 // indirect - golang.org/x/tools v0.24.0 // indirect + github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/tools v0.25.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 260b0ed..70209dc 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A= github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48 h1:etxEtd7qkhJD34gpQesPbZuMJrqkc+ZOXqR3diVfGWs= -github.com/Gsantomaggio/go-amqp v0.0.0-20240905094626-af192b497e48/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -12,8 +10,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= -github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 h1:c5FlPPgxOn7kJz3VoPLkQYQXGBS3EklQ4Zfi57uOuqQ= +github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= @@ -24,14 +22,14 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= -golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= +golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/rabbitmq_amqp/amqp_binding.go b/rabbitmq_amqp/amqp_binding.go index 7b5d99a..b61b979 100644 --- a/rabbitmq_amqp/amqp_binding.go +++ b/rabbitmq_amqp/amqp_binding.go @@ -9,10 +9,11 @@ type AMQPBindingInfo struct { } type AMQPBinding struct { - sourceExchangeName string - destinationQueue string - bindingKey string - management *AmqpManagement + sourceName string + destinationName string + toQueue bool + bindingKey string + management *AmqpManagement } func newAMQPBinding(management *AmqpManagement) *AMQPBinding { @@ -24,31 +25,55 @@ func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification { return b } -func (b *AMQPBinding) SourceExchange(exchangeName string) IBindingSpecification { - b.sourceExchangeName = exchangeName +func (b *AMQPBinding) SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification { + b.sourceName = exchangeSpec.GetName() + b.toQueue = false return b } -func (b *AMQPBinding) DestinationQueue(queueName string) IBindingSpecification { - b.destinationQueue = queueName +func (b *AMQPBinding) SourceExchangeName(exchangeName string) IBindingSpecification { + b.sourceName = exchangeName + b.toQueue = false return b } -func (b *AMQPBinding) Bind(ctx context.Context) error { +func (b *AMQPBinding) DestinationExchange(exchangeSpec IExchangeInfo) IBindingSpecification { + b.destinationName = exchangeSpec.GetName() + b.toQueue = false + return b +} + +func (b *AMQPBinding) DestinationExchangeName(exchangeName string) IBindingSpecification { + b.destinationName = exchangeName + b.toQueue = false + return b +} + +func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification { + b.destinationName = queueSpec.GetName() + b.toQueue = true + return b +} +func (b *AMQPBinding) DestinationQueueName(queueName string) IBindingSpecification { + b.destinationName = queueName + b.toQueue = true + return b +} + +func (b *AMQPBinding) Bind(ctx context.Context) error { path := bindingPath() kv := make(map[string]any) kv["binding_key"] = b.bindingKey - kv["source"] = b.sourceExchangeName - kv["destination_queue"] = b.destinationQueue + kv["source"] = b.sourceName + kv["destination_queue"] = b.destinationName kv["arguments"] = make(map[string]any) _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) return err - } func (b *AMQPBinding) Unbind(ctx context.Context) error { - bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.sourceExchangeName, b.destinationQueue, b.bindingKey) + bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) _, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204}) return err } diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go index f84dcfa..5eeb9ab 100644 --- a/rabbitmq_amqp/amqp_binding_test.go +++ b/rabbitmq_amqp/amqp_binding_test.go @@ -7,7 +7,6 @@ import ( ) var _ = Describe("AMQP Bindings test ", func() { - var connection IConnection var management IManagement BeforeEach(func() { @@ -26,9 +25,9 @@ var _ = Describe("AMQP Bindings test ", func() { Expect(connection.Close(context.Background())).To(BeNil()) }) - It("AMQP Bindings between Exchange and Queue Should success ", func() { - const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue Should success" - const queueName = "Queue_AMQP Bindings between Exchange and Queue Should success" + It("AMQP Bindings between Exchange and Queue Should succeed", func() { + const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess" + const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed" exchangeSpec := management.Exchange(exchangeName) exchangeInfo, err := exchangeSpec.Declare(context.TODO()) Expect(err).To(BeNil()) @@ -41,9 +40,7 @@ var _ = Describe("AMQP Bindings test ", func() { Expect(queueInfo).NotTo(BeNil()) Expect(queueInfo.GetName()).To(Equal(queueName)) - bindingSpec := management.Binding().SourceExchange(exchangeName). - DestinationQueue(queueName). - Key("routing-key") + bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key") err = bindingSpec.Bind(context.TODO()) Expect(err).To(BeNil()) err = bindingSpec.Unbind(context.TODO()) @@ -52,7 +49,5 @@ var _ = Describe("AMQP Bindings test ", func() { Expect(err).To(BeNil()) err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) - }) - }) diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index e251710..d950dc0 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -44,7 +44,6 @@ func (c *ConnectionSettings) Port(port int) IConnectionSettings { } func (c *ConnectionSettings) User(userName string) IConnectionSettings { - c.user = userName return c } @@ -71,7 +70,6 @@ func (c *ConnectionSettings) GetHost() string { func (c *ConnectionSettings) Host(hostName string) IConnectionSettings { c.host = hostName return c - } func (c *ConnectionSettings) GetPort() int { @@ -170,6 +168,7 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectio err = a.Management().Open(ctx, a) if err != nil { + // TODO close connection? return err } return nil diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go index a3f987a..78a6297 100644 --- a/rabbitmq_amqp/amqp_connection_test.go +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -8,7 +8,7 @@ import ( ) var _ = Describe("AMQP Connection Test", func() { - It("AMQP SASLTypeAnonymous Connection should success", func() { + It("AMQP SASLTypeAnonymous Connection should succeed", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) @@ -17,11 +17,14 @@ var _ = Describe("AMQP Connection Test", func() { Expect(connectionSettings).NotTo(BeNil()) connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous}) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := amqpConnection.Open(context.TODO(), connectionSettings) + + err := amqpConnection.Open(context.Background(), connectionSettings) + Expect(err).To(BeNil()) + err = amqpConnection.Close(context.Background()) Expect(err).To(BeNil()) }) - It("AMQP SASLTypePlain Connection should success", func() { + It("AMQP SASLTypePlain Connection should succeed", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) @@ -30,7 +33,10 @@ var _ = Describe("AMQP Connection Test", func() { Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) connectionSettings.SaslMechanism(SaslMechanism{Type: Plain}) - err := amqpConnection.Open(context.TODO(), connectionSettings) + + err := amqpConnection.Open(context.Background(), connectionSettings) + Expect(err).To(BeNil()) + err = amqpConnection.Close(context.Background()) Expect(err).To(BeNil()) }) @@ -42,12 +48,12 @@ var _ = Describe("AMQP Connection Test", func() { Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) connectionSettings.Host("localhost").Port(1234) - err := amqpConnection.Open(context.TODO(), connectionSettings) + + err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should fail due of wrong host", func() { - amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) @@ -56,11 +62,12 @@ var _ = Describe("AMQP Connection Test", func() { Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) connectionSettings.Host("wronghost").Port(5672) - err := amqpConnection.Open(context.TODO(), connectionSettings) + + err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).NotTo(BeNil()) }) - It("AMQP Connection should fail due of context cancelled", func() { + It("AMQP Connection should fail due to context cancellation", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -69,12 +76,12 @@ var _ = Describe("AMQP Connection Test", func() { Expect(err).NotTo(BeNil()) }) - It("AMQP Connection should receive events ", func() { + It("AMQP Connection should receive events", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) ch := make(chan *StatusChanged, 1) amqpConnection.NotifyStatusChange(ch) - err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + err := amqpConnection.Open(context.Background(), NewConnectionSettings()) Expect(err).To(BeNil()) recv := <-ch Expect(recv).NotTo(BeNil()) @@ -88,7 +95,6 @@ var _ = Describe("AMQP Connection Test", func() { Expect(recv.From).To(Equal(Open)) Expect(recv.To).To(Equal(Closed)) - }) //It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() { @@ -103,8 +109,7 @@ var _ = Describe("AMQP Connection Test", func() { // }) // Expect(connectionSettings).NotTo(BeNil()) // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - // err := amqpConnection.Open(context.TODO(), connectionSettings) + // err := amqpConnection.Open(context.Background(), connectionSettings) // Expect(err).To(BeNil()) //}) - }) diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go index 73d6041..47cea6e 100644 --- a/rabbitmq_amqp/amqp_exchange.go +++ b/rabbitmq_amqp/amqp_exchange.go @@ -34,7 +34,6 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange { } func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { - path := exchangePath(e.name) kv := make(map[string]any) kv["auto_delete"] = e.isAutoDelete diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go index 3d4fc35..8dd997c 100644 --- a/rabbitmq_amqp/amqp_exchange_test.go +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -7,7 +7,6 @@ import ( ) var _ = Describe("AMQP Exchange test ", func() { - var connection IConnection var management IManagement BeforeEach(func() { @@ -26,8 +25,8 @@ var _ = Describe("AMQP Exchange test ", func() { Expect(connection.Close(context.Background())).To(BeNil()) }) - It("AMQP Exchange Declare with Default and Delete should success ", func() { - const exchangeName = "AMQP Exchange Declare and Delete with Default should success" + It("AMQP Exchange Declare with Default and Delete should succeed", func() { + const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed" exchangeSpec := management.Exchange(exchangeName) exchangeInfo, err := exchangeSpec.Declare(context.TODO()) Expect(err).To(BeNil()) @@ -37,8 +36,8 @@ var _ = Describe("AMQP Exchange test ", func() { Expect(err).To(BeNil()) }) - It("AMQP Exchange Declare with Topic and Delete should success ", func() { - const exchangeName = "AMQP Exchange Declare with Topic and Delete should success" + It("AMQP Exchange Declare with Topic and Delete should succeed", func() { + const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed" exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic}) exchangeInfo, err := exchangeSpec.Declare(context.TODO()) Expect(err).To(BeNil()) @@ -48,8 +47,8 @@ var _ = Describe("AMQP Exchange test ", func() { Expect(err).To(BeNil()) }) - It("AMQP Exchange Declare with FanOut and Delete should success ", func() { - const exchangeName = "AMQP Exchange Declare with FanOut and Delete should success" + It("AMQP Exchange Declare with FanOut and Delete should succeed", func() { + const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed" exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut}) exchangeInfo, err := exchangeSpec.Declare(context.TODO()) Expect(err).To(BeNil()) @@ -58,5 +57,4 @@ var _ = Describe("AMQP Exchange test ", func() { err = exchangeSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - }) diff --git a/rabbitmq_amqp/amqp_managent.go b/rabbitmq_amqp/amqp_management.go similarity index 87% rename from rabbitmq_amqp/amqp_managent.go rename to rabbitmq_amqp/amqp_management.go index 30c1fc6..2e79b9a 100644 --- a/rabbitmq_amqp/amqp_managent.go +++ b/rabbitmq_amqp/amqp_management.go @@ -10,7 +10,7 @@ import ( "time" ) -var PreconditionFailed = errors.New("precondition Failed") +var ErrPreconditionFailed = errors.New("precondition Failed") type AmqpManagement struct { session *amqp.Session @@ -58,28 +58,6 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { return nil } -//func (a *AmqpManagement) processMessages(ctx context.Context) error { -// -// go func() { -// -// for a.GetStatus() == Open { -// msg, err := a.receiver.Receive(ctx, nil) // blocking call -// if err != nil { -// fmt.Printf("Exiting processMessages %s\n", err) -// return -// } -// -// if msg != nil { -// a.receiver.AcceptMessage(ctx, msg) -// } -// } -// -// fmt.Printf("Exiting processMessages\n") -// }() - -//return nil -//} - func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { if a.sender == nil { prop := make(map[string]any) @@ -110,19 +88,24 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error if err != nil { return err } + a.session = session err = a.ensureSenderLink(ctx) - if err != nil { return err } - time.Sleep(500 * time.Millisecond) err = a.ensureReceiverLink(ctx) - time.Sleep(500 * time.Millisecond) if err != nil { return err } + + // TODO + // Even 10ms is enough to allow the links to establish, + // which tells me it allows the golang runtime to process + // some channels or I/O or something elsewhere + time.Sleep(time.Millisecond * 10) + a.lifeCycle.SetStatus(Open) return ctx.Err() } @@ -137,15 +120,12 @@ func (a *AmqpManagement) Close(ctx context.Context) error { func (a *AmqpManagement) Request(ctx context.Context, body any, path string, method string, expectedResponseCodes []int) (map[string]any, error) { - return a.request(ctx, uuid.New().String(), body, path, method, expectedResponseCodes) - } func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponseCodes []int) error { - if responseCode == responseCode409 { - return PreconditionFailed + return ErrPreconditionFailed } for _, code := range expectedResponseCodes { @@ -154,7 +134,7 @@ func (a *AmqpManagement) validateResponseCode(responseCode int, expectedResponse } } - return errors.New(fmt.Sprintf("expected response code %d got %d", expectedResponseCodes, responseCode)) + return fmt.Errorf("expected response code %d got %d", expectedResponseCodes, responseCode) } func (a *AmqpManagement) request(ctx context.Context, id string, body any, path string, method string, @@ -162,6 +142,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path amqpMessage := &amqp.Message{ Value: body, } + s := commandReplyTo amqpMessage.Properties = &amqp.MessageProperties{ ReplyTo: &s, @@ -169,19 +150,24 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path Subject: &method, MessageID: &id, } + opts := &amqp.SendOptions{Settled: true} + err := a.sender.Send(ctx, amqpMessage, opts) if err != nil { return make(map[string]any), err } + msg, err := a.receiver.Receive(ctx, nil) if err != nil { return make(map[string]any), err } + err = a.receiver.AcceptMessage(ctx, msg) if err != nil { return nil, err } + if msg.Properties == nil { return make(map[string]any), fmt.Errorf("expected properties in the message") } @@ -193,6 +179,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path if msg.Properties.CorrelationID != id { return make(map[string]any), fmt.Errorf("expected correlation id %s got %s", id, msg.Properties.CorrelationID) } + switch msg.Value.(type) { case map[string]interface{}: return msg.Value.(map[string]any), nil diff --git a/rabbitmq_amqp/amqp_managent_test.go b/rabbitmq_amqp/amqp_management_test.go similarity index 75% rename from rabbitmq_amqp/amqp_managent_test.go rename to rabbitmq_amqp/amqp_management_test.go index d2b7521..9bd51d2 100644 --- a/rabbitmq_amqp/amqp_managent_test.go +++ b/rabbitmq_amqp/amqp_management_test.go @@ -8,8 +8,7 @@ import ( ) var _ = Describe("Management tests", func() { - - It("AMQP Management should fail due of context cancelled", func() { + It("AMQP Management should fail due to context cancellation", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) err := amqpConnection.Open(context.Background(), NewConnectionSettings()) @@ -19,14 +18,15 @@ var _ = Describe("Management tests", func() { cancel() err = amqpConnection.Management().Open(ctx, amqpConnection) Expect(err).NotTo(BeNil()) + amqpConnection.Close(context.Background()) }) - It("AMQP Management should receive events ", func() { + It("AMQP Management should receive events", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) ch := make(chan *StatusChanged, 1) amqpConnection.Management().NotifyStatusChange(ch) - err := amqpConnection.Open(context.TODO(), NewConnectionSettings()) + err := amqpConnection.Open(context.Background(), NewConnectionSettings()) Expect(err).To(BeNil()) recv := <-ch Expect(recv).NotTo(BeNil()) @@ -40,7 +40,7 @@ var _ = Describe("Management tests", func() { Expect(recv.From).To(Equal(Open)) Expect(recv.To).To(Equal(Closed)) - + amqpConnection.Close(context.Background()) }) It("Request", func() { @@ -51,7 +51,7 @@ var _ = Describe("Management tests", func() { connectionSettings := NewConnectionSettings() Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := amqpConnection.Open(context.TODO(), connectionSettings) + err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).To(BeNil()) management := amqpConnection.Management() @@ -62,9 +62,10 @@ var _ = Describe("Management tests", func() { _queueArguments["x-queue-type"] = "quorum" kv["arguments"] = _queueArguments path := "/queues/test" - result, err := management.Request(context.TODO(), kv, path, "PUT", []int{200}) + result, err := management.Request(context.Background(), kv, path, "PUT", []int{200}) Expect(err).To(BeNil()) Expect(result).NotTo(BeNil()) - Expect(management.Close(context.TODO())).To(BeNil()) + Expect(management.Close(context.Background())).To(BeNil()) + amqpConnection.Close(context.Background()) }) }) diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index ada9775..81b5462 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -45,7 +45,7 @@ func (a *AmqpQueueInfo) IsAutoDelete() bool { return a.isAutoDelete } -func (a *AmqpQueueInfo) Exclusive() bool { +func (a *AmqpQueueInfo) IsExclusive() bool { return a.isExclusive } @@ -121,9 +121,7 @@ func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecificat } func (a *AmqpQueue) validate() error { - if a.arguments["max-length-bytes"] != nil { - err := validatePositive("max length", a.arguments["max-length-bytes"].(int64)) if err != nil { return err @@ -133,18 +131,18 @@ func (a *AmqpQueue) validate() error { } func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { - if Quorum == a.GetQueueType() || Stream == a.GetQueueType() { // mandatory arguments for quorum queues and streams a.Exclusive(false).AutoDelete(false) } + if err := a.validate(); err != nil { return nil, err } if a.name == "" { - a.name = GenerateNameWithDefaultPrefix() + a.name = generateNameWithDefaultPrefix() } path := queuePath(a.name) diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index 5e2d79c..2423ae7 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -7,7 +7,6 @@ import ( ) var _ = Describe("AMQP Queue test ", func() { - var connection IConnection var management IManagement BeforeEach(func() { @@ -27,8 +26,8 @@ var _ = Describe("AMQP Queue test ", func() { Expect(connection.Close(context.Background())).To(BeNil()) }) - It("AMQP Queue Declare With Response and Delete should success ", func() { - const queueName = "AMQP Queue Declare With Response and Delete should success" + It("AMQP Queue Declare With Response and Delete should succeed", func() { + const queueName = "AMQP Queue Declare With Response and Delete should succeed" queueSpec := management.Queue(queueName) queueInfo, err := queueSpec.Declare(context.TODO()) Expect(err).To(BeNil()) @@ -36,14 +35,14 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) - Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Classic)) err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Queue Declare With Parameters and Delete should success ", func() { - const queueName = "AMQP Queue Declare With Parameters and Delete should success" + It("AMQP Queue Declare With Parameters and Delete should succeed", func() { + const queueName = "AMQP Queue Declare With Parameters and Delete should succeed" queueSpec := management.Queue(queueName).Exclusive(true). AutoDelete(true). QueueType(QueueType{Classic}). @@ -56,7 +55,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeTrue()) - Expect(queueInfo.Exclusive()).To(BeTrue()) + Expect(queueInfo.IsExclusive()).To(BeTrue()) Expect(queueInfo.Type()).To(Equal(Classic)) Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit")) Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0)) @@ -69,8 +68,8 @@ var _ = Describe("AMQP Queue test ", func() { Expect(err).To(BeNil()) }) - It("AMQP Declare Quorum Queue and Delete should success ", func() { - const queueName = "AMQP Declare Quorum Queue and Delete should success" + It("AMQP Declare Quorum Queue and Delete should succeed", func() { + const queueName = "AMQP Declare Quorum Queue and Delete should succeed" // Quorum queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues queueSpec := management.Queue(queueName). @@ -82,14 +81,14 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) - Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Quorum)) err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Declare Stream Queue and Delete should success ", func() { - const queueName = "AMQP Declare Stream Queue and Delete should success" + It("AMQP Declare Stream Queue and Delete should succeed", func() { + const queueName = "AMQP Declare Stream Queue and Delete should succeed" // Stream queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues queueSpec := management.Queue(queueName). @@ -101,13 +100,13 @@ var _ = Describe("AMQP Queue test ", func() { Expect(queueInfo.GetName()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) - Expect(queueInfo.Exclusive()).To(BeFalse()) + Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Stream)) err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - It("AMQP Declare Queue with invalid type should fail ", func() { + It("AMQP Declare Queue with invalid type should fail", func() { const queueName = "AMQP Declare Queue with invalid type should fail" queueSpec := management.Queue(queueName). QueueType(QueueType{Type: "invalid"}) @@ -115,8 +114,7 @@ var _ = Describe("AMQP Queue test ", func() { Expect(err).NotTo(BeNil()) }) - It("AMQP Declare Queue should fail with Precondition fail ", func() { - + It("AMQP Declare Queue should fail with Precondition fail", func() { // The first queue is declared as Classic and it should succeed // The second queue is declared as Quorum and it should fail since it is already declared as Classic const queueName = "AMQP Declare Queue should fail with Precondition fail" @@ -126,13 +124,12 @@ var _ = Describe("AMQP Queue test ", func() { queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum}) _, err = queueSpecFail.Declare(context.TODO()) Expect(err).NotTo(BeNil()) - Expect(err).To(Equal(PreconditionFailed)) + Expect(err).To(Equal(ErrPreconditionFailed)) err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) It("AMQP Declare Queue should fail during validation", func() { - const queueName = "AMQP Declare Queue should fail during validation" queueSpec := management.Queue(queueName).MaxLengthBytes(-1) _, err := queueSpec.Declare(context.TODO()) @@ -149,5 +146,4 @@ var _ = Describe("AMQP Queue test ", func() { err = queueSpec.Delete(context.TODO()) Expect(err).To(BeNil()) }) - }) diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go index a85b8d8..a19b9ac 100644 --- a/rabbitmq_amqp/common.go +++ b/rabbitmq_amqp/common.go @@ -9,10 +9,26 @@ import ( "strings" ) -type PercentCodec struct{} +const ( + responseCode200 = 200 + responseCode201 = 201 + responseCode204 = 204 + responseCode409 = 409 + commandPut = "PUT" + commandGet = "GET" + commandPost = "POST" + commandDelete = "DELETE" + commandReplyTo = "$me" + managementNodeAddress = "/management" + linkPairName = "management-link-pair" + exchanges = "exchanges" + key = "key" + queues = "queues" + bindings = "bindings" +) -// Encode takes a string and returns its percent-encoded representation. -func (pc *PercentCodec) Encode(input string) string { +// encodePathSegments takes a string and returns its percent-encoded representation. +func encodePathSegments(input string) string { var encoded strings.Builder // Iterate over each character in the input string @@ -30,7 +46,7 @@ func (pc *PercentCodec) Encode(input string) string { } // Decode takes a percent-encoded string and returns its decoded representation. -func (pc *PercentCodec) Decode(input string) (string, error) { +func decode(input string) (string, error) { // Use url.QueryUnescape which properly decodes percent-encoded strings decoded, err := url.QueryUnescape(input) if err != nil { @@ -40,27 +56,6 @@ func (pc *PercentCodec) Decode(input string) (string, error) { return decoded, nil } -const ( - responseCode200 = 200 - responseCode201 = 201 - responseCode204 = 204 - responseCode409 = 409 - commandPut = "PUT" - commandGet = "GET" - commandPost = "POST" - commandDelete = "DELETE" - commandReplyTo = "$me" - managementNodeAddress = "/management" - linkPairName = "management-link-pair" -) - -const ( - Exchanges = "exchanges" - Key = "key" - Queues = "queues" - Bindings = "bindings" -) - // isUnreserved checks if a character is an unreserved character in percent encoding // Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ func isUnreserved(char rune) bool { @@ -70,27 +65,28 @@ func isUnreserved(char rune) bool { char == '-' || char == '.' || char == '_' || char == '~' } -func encodePathSegments(pathSegments string) string { - return (&PercentCodec{}).Encode(pathSegments) -} - func queuePath(queueName string) string { - return "/" + Queues + "/" + encodePathSegments(queueName) + return "/" + queues + "/" + encodePathSegments(queueName) } func exchangePath(exchangeName string) string { - return "/" + Exchanges + "/" + encodePathSegments(exchangeName) + return "/" + exchanges + "/" + encodePathSegments(exchangeName) } func bindingPath() string { - return "/" + Bindings + return "/" + bindings } -func bindingPathWithExchangeQueueKey(exchangeName, queueName, key string) string { - //string path = - //$"/{Consts.Bindings}/src={Utils.EncodePathSegment(_sourceName)};{($"{destinationCharacter}={Utils.EncodePathSegment(_destinationName)};key={Utils.EncodePathSegment(_routingKey)};args=")}"; - - return fmt.Sprintf("/%s/src=%s;dstq=%s;key=%s;args=", Bindings, encodePathSegments(exchangeName), encodePathSegments(queueName), encodePathSegments(key)) +func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string { + sourceNameEncoded := encodePathSegments(sourceName) + destinationNameEncoded := encodePathSegments(destinationName) + keyEncoded := encodePathSegments(key) + destinationType := "dste" + if toQueue { + destinationType = "dstq" + } + format := "/%s/src=%s;%s=%s;key=%s;args=" + return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded) } @@ -101,21 +97,19 @@ func validatePositive(label string, value int64) error { return nil } -func GenerateNameWithDefaultPrefix() string { - return GenerateName("client.gen-") +func generateNameWithDefaultPrefix() string { + return generateName("client.gen-") } -// GenerateName generates a unique name with the given prefix -func GenerateName(prefix string) string { - - var uid = uuid.New() - var uuidBytes = []byte(uid.String()) - var _md5 = md5.New() - var digest = _md5.Sum(uuidBytes) +// generateName generates a unique name with the given prefix +func generateName(prefix string) string { + uid := uuid.New() + uuidBytes := []byte(uid.String()) + md5obj := md5.New() + digest := md5obj.Sum(uuidBytes) result := base64.StdEncoding.EncodeToString(digest) result = strings.ReplaceAll(result, "+", "-") result = strings.ReplaceAll(result, "/", "_") result = strings.ReplaceAll(result, "=", "") return prefix + result - } diff --git a/rabbitmq_amqp/converters.go b/rabbitmq_amqp/converters.go index e7b2c0a..c27911a 100644 --- a/rabbitmq_amqp/converters.go +++ b/rabbitmq_amqp/converters.go @@ -1,7 +1,6 @@ package rabbitmq_amqp import ( - "errors" "fmt" "regexp" "strconv" @@ -47,7 +46,7 @@ func CapacityFrom(value string) (int64, error) { match, err := regexp.Compile("^((kb|mb|gb|tb))") if err != nil { return 0, - errors.New(fmt.Sprintf("Capacity, invalid unit size format:%s", value)) + fmt.Errorf("Capacity, invalid unit size format:%s", value) } foundUnitSize := strings.ToLower(value[len(value)-2:]) @@ -56,7 +55,7 @@ func CapacityFrom(value string) (int64, error) { size, err := strconv.Atoi(value[:len(value)-2]) if err != nil { - return 0, errors.New(fmt.Sprintf("Capacity, Invalid number format: %s", value)) + return 0, fmt.Errorf("Capacity, Invalid number format: %s", value) } switch foundUnitSize { case UnitKb: @@ -71,9 +70,7 @@ func CapacityFrom(value string) (int64, error) { case UnitTb: return CapacityTB(int64(size)), nil } - } - return 0, - errors.New(fmt.Sprintf("Capacity, Invalid unit size format: %s", value)) + return 0, fmt.Errorf("Capacity, Invalid unit size format: %s", value) } diff --git a/rabbitmq_amqp/converters_test.go b/rabbitmq_amqp/converters_test.go index 504c835..39180ab 100644 --- a/rabbitmq_amqp/converters_test.go +++ b/rabbitmq_amqp/converters_test.go @@ -7,8 +7,7 @@ import ( ) var _ = Describe("Converters", func() { - - It("Converter from number", func() { + It("Converts from number", func() { Expect(CapacityBytes(100)).To(Equal(int64(100))) Expect(CapacityKB(1)).To(Equal(int64(1000))) Expect(CapacityMB(1)).To(Equal(int64(1000 * 1000))) @@ -16,7 +15,7 @@ var _ = Describe("Converters", func() { Expect(CapacityTB(1)).To(Equal(int64(1000 * 1000 * 1000 * 1000))) }) - It("Converter from string", func() { + It("Converts from string", func() { v, err := CapacityFrom("1KB") Expect(err).NotTo(HaveOccurred()) Expect(v).To(Equal(int64(1000))) @@ -34,7 +33,7 @@ var _ = Describe("Converters", func() { Expect(v).To(Equal(int64(1000 * 1000 * 1000 * 1000))) }) - It("Converter from string logError", func() { + It("Converts from string logError", func() { v, err := CapacityFrom("10LL") Expect(fmt.Sprintf("%s", err)). To(ContainSubstring("Invalid unit size format")) @@ -51,5 +50,4 @@ var _ = Describe("Converters", func() { Expect(v).To(Equal(int64(0))) Expect(err).To(BeNil()) }) - }) diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go index 6d24211..13d3d9f 100644 --- a/rabbitmq_amqp/entities.go +++ b/rabbitmq_amqp/entities.go @@ -46,7 +46,7 @@ type IQueueInfo interface { GetName() string IsDurable() bool IsAutoDelete() bool - Exclusive() bool + IsExclusive() bool Type() TQueueType GetLeader() string GetReplicas() []string @@ -86,8 +86,10 @@ type IExchangeSpecification interface { } type IBindingSpecification interface { - SourceExchange(exchangeName string) IBindingSpecification - DestinationQueue(queueName string) IBindingSpecification + SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification + SourceExchangeName(exchangeName string) IBindingSpecification + DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification + DestinationQueueName(queueName string) IBindingSpecification Key(bindingKey string) IBindingSpecification Bind(ctx context.Context) error Unbind(ctx context.Context) error