diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index 5c615c4..e47b9a9 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -1,93 +1,87 @@ package main import ( - "bufio" "context" "fmt" - mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" - "os" + "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" "time" ) func main() { fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") - chStatusChanged := make(chan *mq.StatusChanged, 1) + chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) - go func(ch chan *mq.StatusChanged) { + go func(ch chan *rabbitmq_amqp.StatusChanged) { for statusChanged := range ch { - fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To) + fmt.Printf("%s\n", statusChanged) } }(chStatusChanged) - amqpConnection := mq.NewAmqpConnection() - amqpConnection.NotifyStatusChange(chStatusChanged) - err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings()) + amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged) + err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings()) if err != nil { + fmt.Printf("Error opening connection: %v\n", err) return } - fmt.Printf("AMQP Connection opened.\n") management := amqpConnection.Management() - queueSpec := management.Queue("getting_started_queue"). - QueueType(mq.QueueType{Type: mq.Quorum}). - MaxLengthBytes(mq.CapacityGB(1)) - exchangeSpec := management.Exchange("getting_started_exchange"). - ExchangeType(mq.ExchangeType{Type: mq.Topic}) - - queueInfo, err := queueSpec.Declare(context.Background()) + exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{ + Name: "getting-started-exchange", + }) if err != nil { - fmt.Printf("Error declaring queue %s\n", err) + fmt.Printf("Error declaring exchange: %v\n", err) return } - fmt.Printf("Queue %s created.\n", queueInfo.GetName()) - exchangeInfo, err := exchangeSpec.Declare(context.Background()) + queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{ + Name: "getting-started-queue", + QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum}, + }) + if err != nil { - fmt.Printf("Error declaring exchange %s\n", err) + fmt.Printf("Error declaring queue: %v\n", err) return } - fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName()) - bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key") + bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{ + SourceExchange: exchangeInfo.Name(), + DestinationQueue: queueInfo.Name(), + BindingKey: "routing-key", + }) - err = bindingSpec.Bind(context.Background()) if err != nil { - fmt.Printf("Error binding %s\n", err) + fmt.Printf("Error binding: %v\n", err) return } - fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName()) - - fmt.Println("Press any key to cleanup and exit") - reader := bufio.NewReader(os.Stdin) - _, _ = reader.ReadString('\n') + err = management.Unbind(context.TODO(), bindingPath) - err = bindingSpec.Unbind(context.Background()) if err != nil { - fmt.Printf("Error unbinding %s\n", err) + fmt.Printf("Error unbinding: %v\n", err) return } - fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName()) - - err = exchangeSpec.Delete(context.Background()) + err = management.DeleteExchange(context.TODO(), exchangeInfo.Name()) if err != nil { - fmt.Printf("Error deleting exchange %s\n", err) + fmt.Printf("Error deleting exchange: %v\n", err) return } - err = queueSpec.Delete(context.Background()) + err = management.DeleteQueue(context.TODO(), queueInfo.Name()) if err != nil { + fmt.Printf("Error deleting queue: %v\n", err) return } - fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) err = amqpConnection.Close(context.Background()) if err != nil { + fmt.Printf("Error closing connection: %v\n", err) return } + fmt.Printf("AMQP Connection closed.\n") // Wait for the status change to be printed time.Sleep(500 * time.Millisecond) + close(chStatusChanged) } diff --git a/rabbitmq_amqp/address_builder.go b/rabbitmq_amqp/address_builder.go new file mode 100644 index 0000000..71a0635 --- /dev/null +++ b/rabbitmq_amqp/address_builder.go @@ -0,0 +1,125 @@ +package rabbitmq_amqp + +import ( + "errors" + "fmt" + "net/url" + "strings" +) + +type AddressBuilder struct { + queue *string + exchange *string + key *string + append *string +} + +func NewAddressBuilder() *AddressBuilder { + return &AddressBuilder{} +} + +func (a *AddressBuilder) Queue(queue string) *AddressBuilder { + a.queue = &queue + return a +} + +func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder { + a.exchange = &exchange + return a +} + +func (a *AddressBuilder) Key(key string) *AddressBuilder { + a.key = &key + return a +} + +func (a *AddressBuilder) Append(append string) *AddressBuilder { + a.append = &append + return a +} + +func (a *AddressBuilder) Address() (string, error) { + if a.exchange == nil && a.queue == nil { + return "", errors.New("exchange or queue must be set") + } + + urlAppend := "" + if !isStringNilOrEmpty(a.append) { + urlAppend = *a.append + } + if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) { + return "", errors.New("exchange and queue cannot be set together") + } + + if !isStringNilOrEmpty(a.exchange) { + if !isStringNilOrEmpty(a.key) { + return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil + } + return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil + } + + if a.queue == nil { + return "", nil + } + + if isStringNilOrEmpty(a.queue) { + return "", errors.New("queue must be set") + } + + return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil +} + +// encodePathSegments takes a string and returns its percent-encoded representation. +func encodePathSegments(input string) string { + var encoded strings.Builder + + // Iterate over each character in the input string + for _, char := range input { + // Check if the character is an unreserved character (i.e., it doesn't need encoding) + if isUnreserved(char) { + encoded.WriteRune(char) // Append as is + } else { + // Encode character To %HH format + encoded.WriteString(fmt.Sprintf("%%%02X", char)) + } + } + + return encoded.String() +} + +// Decode takes a percent-encoded string and returns its decoded representation. +func decode(input string) (string, error) { + // Use url.QueryUnescape which properly decodes percent-encoded strings + decoded, err := url.QueryUnescape(input) + if err != nil { + return "", err + } + + return decoded, nil +} + +// isUnreserved checks if a character is an unreserved character in percent encoding +// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ +func isUnreserved(char rune) bool { + return (char >= 'A' && char <= 'Z') || + (char >= 'a' && char <= 'z') || + (char >= '0' && char <= '9') || + char == '-' || char == '.' || char == '_' || char == '~' +} + +func bindingPath() string { + return "/" + bindings +} + +func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string { + sourceNameEncoded := encodePathSegments(sourceName) + destinationNameEncoded := encodePathSegments(destinationName) + keyEncoded := encodePathSegments(key) + destinationType := "dste" + if toQueue { + destinationType = "dstq" + } + format := "/%s/src=%s;%s=%s;key=%s;args=" + return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded) + +} diff --git a/rabbitmq_amqp/address_builder_test.go b/rabbitmq_amqp/address_builder_test.go new file mode 100644 index 0000000..aa480a6 --- /dev/null +++ b/rabbitmq_amqp/address_builder_test.go @@ -0,0 +1,78 @@ +package rabbitmq_amqp + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Address builder test ", func() { + It("With exchange, queue and key should raise and error", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Queue("queue").Exchange("exchange").Key("key") + _, err := addressBuilder.Address() + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(Equal("exchange and queue cannot be set together")) + }) + + It("Without exchange and queue should raise and error", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + _, err := addressBuilder.Address() + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(Equal("exchange or queue must be set")) + }) + + It("With exchange and key should return address", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Exchange("my_exchange").Key("my_key") + address, err := addressBuilder.Address() + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_exchange/my_key")) + }) + + It("With exchange should return address", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Exchange("my_exchange") + address, err := addressBuilder.Address() + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_exchange")) + }) + + It("With exchange and key with names to encode should return the encoded address", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Exchange("my_ exchange/()").Key("my_key ") + address, err := addressBuilder.Address() + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20")) + }) + + It("With queue should return address", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Queue("my_queue>") + address, err := addressBuilder.Address() + Expect(err).To(BeNil()) + Expect(address).To(Equal("/queues/my_queue%3E")) + }) + + It("With queue and append should return address", func() { + addressBuilder := NewAddressBuilder() + Expect(addressBuilder).NotTo(BeNil()) + Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) + addressBuilder.Queue("my_queue").Append("/messages") + address, err := addressBuilder.Address() + Expect(err).To(BeNil()) + Expect(address).To(Equal("/queues/my_queue/messages")) + }) + +}) diff --git a/rabbitmq_amqp/amqp_binding.go b/rabbitmq_amqp/amqp_binding.go index b61b979..f2b430a 100644 --- a/rabbitmq_amqp/amqp_binding.go +++ b/rabbitmq_amqp/amqp_binding.go @@ -20,48 +20,36 @@ func newAMQPBinding(management *AmqpManagement) *AMQPBinding { return &AMQPBinding{management: management} } -func (b *AMQPBinding) Key(bindingKey string) IBindingSpecification { +func (b *AMQPBinding) BindingKey(bindingKey string) { b.bindingKey = bindingKey - return b } -func (b *AMQPBinding) SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification { - b.sourceName = exchangeSpec.GetName() - b.toQueue = false - return b +func (b *AMQPBinding) SourceExchange(sourceName string) { + if len(sourceName) > 0 { + b.sourceName = sourceName + b.toQueue = false + } } -func (b *AMQPBinding) SourceExchangeName(exchangeName string) IBindingSpecification { - b.sourceName = exchangeName - b.toQueue = false - return b +func (b *AMQPBinding) DestinationExchange(destinationName string) { + if len(destinationName) > 0 { + b.destinationName = destinationName + b.toQueue = false + } } -func (b *AMQPBinding) DestinationExchange(exchangeSpec IExchangeInfo) IBindingSpecification { - b.destinationName = exchangeSpec.GetName() - b.toQueue = false - return b +func (b *AMQPBinding) DestinationQueue(queueName string) { + if len(queueName) > 0 { + b.destinationName = queueName + b.toQueue = true + } } -func (b *AMQPBinding) DestinationExchangeName(exchangeName string) IBindingSpecification { - b.destinationName = exchangeName - b.toQueue = false - return b -} - -func (b *AMQPBinding) DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification { - b.destinationName = queueSpec.GetName() - b.toQueue = true - return b -} - -func (b *AMQPBinding) DestinationQueueName(queueName string) IBindingSpecification { - b.destinationName = queueName - b.toQueue = true - return b -} - -func (b *AMQPBinding) Bind(ctx context.Context) error { +// Bind creates a binding between an exchange and a queue or exchange +// with the specified binding key. +// Returns the binding path that can be used to unbind the binding. +// Given a virtual host, the binding path is unique. +func (b *AMQPBinding) Bind(ctx context.Context) (string, error) { path := bindingPath() kv := make(map[string]any) kv["binding_key"] = b.bindingKey @@ -69,11 +57,14 @@ func (b *AMQPBinding) Bind(ctx context.Context) error { kv["destination_queue"] = b.destinationName kv["arguments"] = make(map[string]any) _, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204}) - return err + bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) + return bindingPathWithExchangeQueueKey, err } -func (b *AMQPBinding) Unbind(ctx context.Context) error { - bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey) - _, err := b.management.Request(ctx, amqp.Null{}, bindingPathWithExchangeQueueKey, commandDelete, []int{responseCode204}) +// Unbind removes a binding between an exchange and a queue or exchange +// with the specified binding key. +// The bindingPath is the unique path that was returned when the binding was created. +func (b *AMQPBinding) Unbind(ctx context.Context, bindingPath string) error { + _, err := b.management.Request(ctx, amqp.Null{}, bindingPath, commandDelete, []int{responseCode204}) return err } diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go index 5eeb9ab..82c82ec 100644 --- a/rabbitmq_amqp/amqp_binding_test.go +++ b/rabbitmq_amqp/amqp_binding_test.go @@ -28,26 +28,30 @@ var _ = Describe("AMQP Bindings test ", func() { It("AMQP Bindings between Exchange and Queue Should succeed", func() { const exchangeName = "Exchange_AMQP Bindings between Exchange and Queue should uccess" const queueName = "Queue_AMQP Bindings between Exchange and Queue should succeed" - exchangeSpec := management.Exchange(exchangeName) - exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{ + Name: exchangeName, + }) Expect(err).To(BeNil()) Expect(exchangeInfo).NotTo(BeNil()) - Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) + Expect(exchangeInfo.Name()).To(Equal(exchangeName)) - queueSpec := management.Queue(queueName) - queueInfo, err := queueSpec.Declare(context.TODO()) + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + }) Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(Equal(queueName)) - - bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key") - err = bindingSpec.Bind(context.TODO()) + Expect(queueInfo.Name()).To(Equal(queueName)) + bindingPath, err := management.Bind(context.TODO(), &BindingSpecification{ + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: "routing-key", + }) Expect(err).To(BeNil()) - err = bindingSpec.Unbind(context.TODO()) + err = management.Unbind(context.TODO(), bindingPath) Expect(err).To(BeNil()) - err = exchangeSpec.Delete(context.TODO()) + err = management.DeleteExchange(context.TODO(), exchangeName) Expect(err).To(BeNil()) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) }) }) diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index d950dc0..8b58ba0 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -2,131 +2,17 @@ package rabbitmq_amqp import ( "context" - "crypto/tls" - "fmt" "github.com/Azure/go-amqp" ) -type ConnectionSettings struct { - host string - port int - user string - password string - virtualHost string - scheme string - containerId string - useSsl bool - tlsConfig *tls.Config - saslMechanism TSaslMechanism -} - -func (c *ConnectionSettings) GetSaslMechanism() TSaslMechanism { - return c.saslMechanism -} - -func (c *ConnectionSettings) SaslMechanism(mechanism SaslMechanism) IConnectionSettings { - c.saslMechanism = mechanism.Type - return c -} - -func (c *ConnectionSettings) TlsConfig(config *tls.Config) IConnectionSettings { - c.tlsConfig = config - return c -} - -func (c *ConnectionSettings) GetTlsConfig() *tls.Config { - return c.tlsConfig -} - -func (c *ConnectionSettings) Port(port int) IConnectionSettings { - c.port = port - return c -} - -func (c *ConnectionSettings) User(userName string) IConnectionSettings { - c.user = userName - return c -} - -func (c *ConnectionSettings) Password(password string) IConnectionSettings { - c.password = password - return c -} - -func (c *ConnectionSettings) VirtualHost(virtualHost string) IConnectionSettings { - c.virtualHost = virtualHost - return c -} - -func (c *ConnectionSettings) ContainerId(containerId string) IConnectionSettings { - c.containerId = containerId - return c -} - -func (c *ConnectionSettings) GetHost() string { - return c.host -} - -func (c *ConnectionSettings) Host(hostName string) IConnectionSettings { - c.host = hostName - return c -} - -func (c *ConnectionSettings) GetPort() int { - return c.port -} - -func (c *ConnectionSettings) GetUser() string { - return c.user -} - -func (c *ConnectionSettings) GetPassword() string { - return c.password -} - -func (c *ConnectionSettings) GetVirtualHost() string { - return c.virtualHost -} - -func (c *ConnectionSettings) GetScheme() string { - return c.scheme -} - -func (c *ConnectionSettings) GetContainerId() string { - return c.containerId -} - -func (c *ConnectionSettings) UseSsl(value bool) IConnectionSettings { - c.useSsl = value - if value { - c.scheme = "amqps" - } else { - c.scheme = "amqp" - } - return c -} - -func (c *ConnectionSettings) IsSsl() bool { - return c.useSsl -} - -func (c *ConnectionSettings) BuildAddress() string { - return c.scheme + "://" + c.host + ":" + fmt.Sprint(c.port) -} - -func NewConnectionSettings() IConnectionSettings { - return &ConnectionSettings{ - host: "localhost", - port: 5672, - user: "guest", - password: "guest", - virtualHost: "/", - scheme: "amqp", - containerId: "amqp-go-client", - useSsl: false, - tlsConfig: nil, - } -} +//func (c *ConnectionSettings) UseSsl(value bool) { +// c.UseSsl = value +// if value { +// c.Scheme = "amqps" +// } else { +// c.Scheme = "amqp" +// } +//} type AmqpConnection struct { Connection *amqp.Conn @@ -134,10 +20,15 @@ type AmqpConnection struct { lifeCycle *LifeCycle } +// Management returns the management interface for the connection. +// See IManagement interface. func (a *AmqpConnection) Management() IManagement { return a.management } +// NewAmqpConnection creates a new AmqpConnection +// with a new AmqpManagement and a new LifeCycle. +// Returns a pointer to the new AmqpConnection func NewAmqpConnection() IConnection { return &AmqpConnection{ management: NewAmqpManagement(), @@ -145,20 +36,36 @@ func NewAmqpConnection() IConnection { } } -func (a *AmqpConnection) Open(ctx context.Context, connectionSettings IConnectionSettings) error { +// NewAmqpConnectionNotifyStatusChanged creates a new AmqpConnection +// with a new AmqpManagement and a new LifeCycle +// and sets the channel for status changes. +// Returns a pointer to the new AmqpConnection +func NewAmqpConnectionNotifyStatusChanged(channel chan *StatusChanged) IConnection { + lifeCycle := NewLifeCycle() + lifeCycle.chStatusChanged = channel + return &AmqpConnection{ + management: NewAmqpManagement(), + lifeCycle: lifeCycle, + } +} + +// Open opens a connection to the AMQP 1.0 server. +// using the provided connectionSettings and the AMQPLite library. +// Setups the connection and the management interface. +func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *ConnectionSettings) error { sASLType := amqp.SASLTypeAnonymous() - switch connectionSettings.GetSaslMechanism() { + switch connectionSettings.SaslMechanism { case Plain: - sASLType = amqp.SASLTypePlain(connectionSettings.GetUser(), connectionSettings.GetPassword()) + sASLType = amqp.SASLTypePlain(connectionSettings.User, connectionSettings.Password) case External: sASLType = amqp.SASLTypeExternal("") } conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{ - ContainerID: connectionSettings.GetContainerId(), + ContainerID: connectionSettings.ContainerId, SASLType: sASLType, - HostName: connectionSettings.GetVirtualHost(), - TLSConfig: connectionSettings.GetTlsConfig(), + HostName: connectionSettings.VirtualHost, + TLSConfig: connectionSettings.TlsConfig, }) if err != nil { return err @@ -188,6 +95,6 @@ func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) { a.lifeCycle.chStatusChanged = channel } -func (a *AmqpConnection) GetStatus() int { +func (a *AmqpConnection) Status() int { return a.lifeCycle.Status() } diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go index 78a6297..398f91f 100644 --- a/rabbitmq_amqp/amqp_connection_test.go +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -15,7 +15,7 @@ var _ = Describe("AMQP Connection Test", func() { connectionSettings := NewConnectionSettings() Expect(connectionSettings).NotTo(BeNil()) - connectionSettings.SaslMechanism(SaslMechanism{Type: Anonymous}) + connectionSettings.SaslMechanism = Anonymous Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) err := amqpConnection.Open(context.Background(), connectionSettings) @@ -32,7 +32,7 @@ var _ = Describe("AMQP Connection Test", func() { connectionSettings := NewConnectionSettings() Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - connectionSettings.SaslMechanism(SaslMechanism{Type: Plain}) + connectionSettings.SaslMechanism = Plain err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).To(BeNil()) @@ -40,28 +40,32 @@ var _ = Describe("AMQP Connection Test", func() { Expect(err).To(BeNil()) }) - It("AMQP Connection should fail due of wrong port", func() { + It("AMQP Connection should fail due of wrong Port", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() + connectionSettings := &ConnectionSettings{ + Host: "localhost", + Port: 1234, + } Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - connectionSettings.Host("localhost").Port(1234) err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).NotTo(BeNil()) }) - It("AMQP Connection should fail due of wrong host", func() { + It("AMQP Connection should fail due of wrong Host", func() { amqpConnection := NewAmqpConnection() Expect(amqpConnection).NotTo(BeNil()) Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() + connectionSettings := &ConnectionSettings{ + Host: "wronghost", + Port: 5672, + } Expect(connectionSettings).NotTo(BeNil()) Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - connectionSettings.Host("wronghost").Port(5672) err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).NotTo(BeNil()) diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go index 47cea6e..6597d77 100644 --- a/rabbitmq_amqp/amqp_exchange.go +++ b/rabbitmq_amqp/amqp_exchange.go @@ -13,7 +13,7 @@ func newAmqpExchangeInfo(name string) IExchangeInfo { return &AmqpExchangeInfo{name: name} } -func (a *AmqpExchangeInfo) GetName() string { +func (a *AmqpExchangeInfo) Name() string { return a.name } @@ -34,22 +34,24 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange { } func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { - path := exchangePath(e.name) + path, err := NewAddressBuilder().Exchange(e.name).Address() + if err != nil { + return nil, err + } kv := make(map[string]any) kv["auto_delete"] = e.isAutoDelete kv["durable"] = true kv["type"] = e.exchangeType.String() kv["arguments"] = e.arguments - _, err := e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409}) + _, err = e.management.Request(ctx, kv, path, commandPut, []int{responseCode204, responseCode201, responseCode409}) if err != nil { return nil, err } return newAmqpExchangeInfo(e.name), nil } -func (e *AmqpExchange) AutoDelete(isAutoDelete bool) IExchangeSpecification { +func (e *AmqpExchange) AutoDelete(isAutoDelete bool) { e.isAutoDelete = isAutoDelete - return e } func (e *AmqpExchange) IsAutoDelete() bool { @@ -57,20 +59,24 @@ func (e *AmqpExchange) IsAutoDelete() bool { } func (e *AmqpExchange) Delete(ctx context.Context) error { - path := exchangePath(e.name) - _, err := e.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode204}) + path, err := NewAddressBuilder().Exchange(e.name).Address() + if err != nil { + return err + } + _, err = e.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode204}) return err } -func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) IExchangeSpecification { - e.exchangeType = exchangeType - return e +func (e *AmqpExchange) ExchangeType(exchangeType ExchangeType) { + if len(exchangeType.Type) > 0 { + e.exchangeType = exchangeType + } } func (e *AmqpExchange) GetExchangeType() TExchangeType { return e.exchangeType.Type } -func (e *AmqpExchange) GetName() string { +func (e *AmqpExchange) Name() string { return e.name } diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go index 82b326e..8fcac70 100644 --- a/rabbitmq_amqp/amqp_exchange_test.go +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -28,34 +28,40 @@ var _ = Describe("AMQP Exchange test ", func() { It("AMQP Exchange Declare with Default and Delete should succeed", func() { const exchangeName = "AMQP Exchange Declare and Delete with Default should succeed" - exchangeSpec := management.Exchange(exchangeName) - exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{ + Name: exchangeName, + }) Expect(err).To(BeNil()) Expect(exchangeInfo).NotTo(BeNil()) - Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) - err = exchangeSpec.Delete(context.TODO()) + Expect(exchangeInfo.Name()).To(Equal(exchangeName)) + err = management.DeleteExchange(context.TODO(), exchangeName) Expect(err).To(BeNil()) }) It("AMQP Exchange Declare with Topic and Delete should succeed", func() { const exchangeName = "AMQP Exchange Declare with Topic and Delete should succeed" - exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{Topic}) - exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{ + Name: exchangeName, + ExchangeType: ExchangeType{Topic}, + }) Expect(err).To(BeNil()) Expect(exchangeInfo).NotTo(BeNil()) - Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) - err = exchangeSpec.Delete(context.TODO()) + Expect(exchangeInfo.Name()).To(Equal(exchangeName)) + err = management.DeleteExchange(context.TODO(), exchangeName) Expect(err).To(BeNil()) }) It("AMQP Exchange Declare with FanOut and Delete should succeed", func() { const exchangeName = "AMQP Exchange Declare with FanOut and Delete should succeed" - exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut}) - exchangeInfo, err := exchangeSpec.Declare(context.TODO()) + //exchangeSpec := management.Exchange(exchangeName).ExchangeType(ExchangeType{FanOut}) + exchangeInfo, err := management.DeclareExchange(context.TODO(), &ExchangeSpecification{ + Name: exchangeName, + ExchangeType: ExchangeType{FanOut}, + }) Expect(err).To(BeNil()) Expect(exchangeInfo).NotTo(BeNil()) - Expect(exchangeInfo.GetName()).To(Equal(exchangeName)) - err = exchangeSpec.Delete(context.TODO()) + Expect(exchangeInfo.Name()).To(Equal(exchangeName)) + err = management.DeleteExchange(context.TODO(), exchangeName) Expect(err).To(BeNil()) }) }) diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index 85e8c74..55852a6 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -22,14 +22,6 @@ type AmqpManagement struct { cancel context.CancelFunc } -func (a *AmqpManagement) Binding() IBindingSpecification { - return newAMQPBinding(a) -} - -func (a *AmqpManagement) Exchange(exchangeName string) IExchangeSpecification { - return newAmqpExchange(a, exchangeName) -} - func NewAmqpManagement() *AmqpManagement { return &AmqpManagement{ lifeCycle: NewLifeCycle(), @@ -201,12 +193,65 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path return make(map[string]any), nil } -func (a *AmqpManagement) Queue(queueName string) IQueueSpecification { - return newAmqpQueue(a, queueName) +func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) { + var amqpQueue *AmqpQueue + + if specification == nil || len(specification.Name) <= 0 { + // If the specification is nil or the name is empty, then we create a new queue + // with a random name with generateNameWithDefaultPrefix() + amqpQueue = newAmqpQueue(a, "") + } else { + amqpQueue = newAmqpQueue(a, specification.Name) + amqpQueue.AutoDelete(specification.IsAutoDelete) + amqpQueue.Exclusive(specification.IsExclusive) + amqpQueue.MaxLengthBytes(specification.MaxLengthBytes) + amqpQueue.DeadLetterExchange(specification.DeadLetterExchange) + amqpQueue.DeadLetterRoutingKey(specification.DeadLetterRoutingKey) + amqpQueue.QueueType(specification.QueueType) + } + + return amqpQueue.Declare(ctx) +} + +func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error { + q := newAmqpQueue(a, name) + return q.Delete(ctx) +} + +func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) { + if exchangeSpecification == nil { + return nil, fmt.Errorf("exchangeSpecification is nil") + } + + exchange := newAmqpExchange(a, exchangeSpecification.Name) + exchange.AutoDelete(exchangeSpecification.IsAutoDelete) + exchange.ExchangeType(exchangeSpecification.ExchangeType) + return exchange.Declare(ctx) } +func (a *AmqpManagement) DeleteExchange(ctx context.Context, name string) error { + e := newAmqpExchange(a, name) + return e.Delete(ctx) +} + +func (a *AmqpManagement) Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error) { + bind := newAMQPBinding(a) + bind.SourceExchange(bindingSpecification.SourceExchange) + bind.DestinationQueue(bindingSpecification.DestinationQueue) + bind.DestinationExchange(bindingSpecification.DestinationExchange) + bind.BindingKey(bindingSpecification.BindingKey) + return bind.Bind(ctx) + +} +func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error { + bind := newAMQPBinding(a) + return bind.Unbind(ctx, bindingPath) +} func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) { - path := queuePath(queueName) + path, err := NewAddressBuilder().Queue(queueName).Address() + if err != nil { + return nil, err + } result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404}) if err != nil { return nil, err @@ -214,14 +259,15 @@ func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueu return newAmqpQueueInfo(result), nil } -func (a *AmqpManagement) QueueClientName() IQueueSpecification { - return newAmqpQueue(a, "") +func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, error) { + purge := newAmqpQueue(a, queueName) + return purge.Purge(ctx) } func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) { a.lifeCycle.chStatusChanged = channel } -func (a *AmqpManagement) GetStatus() int { +func (a *AmqpManagement) Status() int { return a.lifeCycle.Status() } diff --git a/rabbitmq_amqp/amqp_management_test.go b/rabbitmq_amqp/amqp_management_test.go index 8b8d840..0f868ee 100644 --- a/rabbitmq_amqp/amqp_management_test.go +++ b/rabbitmq_amqp/amqp_management_test.go @@ -20,14 +20,13 @@ var _ = Describe("Management tests", func() { cancel() err = amqpConnection.Management().Open(ctx, amqpConnection) Expect(err).NotTo(BeNil()) - amqpConnection.Close(context.Background()) + Expect(amqpConnection.Close(context.Background())).To(BeNil()) }) It("AMQP Management should receive events", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) ch := make(chan *StatusChanged, 1) - amqpConnection.Management().NotifyStatusChange(ch) + amqpConnection := NewAmqpConnectionNotifyStatusChanged(ch) + Expect(amqpConnection).NotTo(BeNil()) err := amqpConnection.Open(context.Background(), NewConnectionSettings()) Expect(err).To(BeNil()) recv := <-ch @@ -42,7 +41,7 @@ var _ = Describe("Management tests", func() { Expect(recv.From).To(Equal(Open)) Expect(recv.To).To(Equal(Closed)) - amqpConnection.Close(context.Background()) + Expect(amqpConnection.Close(context.Background())).To(BeNil()) }) It("Request", func() { @@ -68,7 +67,7 @@ var _ = Describe("Management tests", func() { Expect(err).To(BeNil()) Expect(result).NotTo(BeNil()) Expect(management.Close(context.Background())).To(BeNil()) - amqpConnection.Close(context.Background()) + Expect(amqpConnection.Close(context.Background())).To(BeNil()) }) It("GET on non-existing queue returns ErrDoesNotExist", func() { diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index d146a90..061eea6 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -12,17 +12,17 @@ type AmqpQueueInfo struct { isAutoDelete bool isExclusive bool leader string - replicas []string + members []string arguments map[string]any queueType TQueueType } -func (a *AmqpQueueInfo) GetLeader() string { +func (a *AmqpQueueInfo) Leader() string { return a.leader } -func (a *AmqpQueueInfo) GetReplicas() []string { - return a.replicas +func (a *AmqpQueueInfo) Members() []string { + return a.members } func newAmqpQueueInfo(response map[string]any) IQueueInfo { @@ -33,7 +33,7 @@ func newAmqpQueueInfo(response map[string]any) IQueueInfo { isExclusive: response["exclusive"].(bool), queueType: TQueueType(response["type"].(string)), leader: response["leader"].(string), - replicas: response["replicas"].([]string), + members: response["replicas"].([]string), arguments: response["arguments"].(map[string]any), } } @@ -54,11 +54,11 @@ func (a *AmqpQueueInfo) Type() TQueueType { return a.queueType } -func (a *AmqpQueueInfo) GetName() string { +func (a *AmqpQueueInfo) Name() string { return a.name } -func (a *AmqpQueueInfo) GetArguments() map[string]any { +func (a *AmqpQueueInfo) Arguments() map[string]any { return a.arguments } @@ -70,24 +70,28 @@ type AmqpQueue struct { name string } -func (a *AmqpQueue) DeadLetterExchange(dlx string) IQueueSpecification { - a.arguments["x-dead-letter-exchange"] = dlx - return a +func (a *AmqpQueue) DeadLetterExchange(dlx string) { + if len(dlx) != 0 { + a.arguments["x-dead-letter-exchange"] = dlx + } } -func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) IQueueSpecification { - a.arguments["x-dead-letter-routing-key"] = dlrk - return a +func (a *AmqpQueue) DeadLetterRoutingKey(dlrk string) { + if len(dlrk) != 0 { + a.arguments["x-dead-letter-routing-key"] = dlrk + } } -func (a *AmqpQueue) MaxLengthBytes(length int64) IQueueSpecification { - a.arguments["max-length-bytes"] = length - return a +func (a *AmqpQueue) MaxLengthBytes(length int64) { + if length != 0 { + a.arguments["max-length-bytes"] = length + } } -func (a *AmqpQueue) QueueType(queueType QueueType) IQueueSpecification { - a.arguments["x-queue-type"] = queueType.String() - return a +func (a *AmqpQueue) QueueType(queueType QueueType) { + if len(queueType.String()) != 0 { + a.arguments["x-queue-type"] = queueType.String() + } } func (a *AmqpQueue) GetQueueType() TQueueType { @@ -97,25 +101,23 @@ func (a *AmqpQueue) GetQueueType() TQueueType { return TQueueType(a.arguments["x-queue-type"].(string)) } -func (a *AmqpQueue) Exclusive(isExclusive bool) IQueueSpecification { +func (a *AmqpQueue) Exclusive(isExclusive bool) { a.isExclusive = isExclusive - return a } func (a *AmqpQueue) IsExclusive() bool { return a.isExclusive } -func (a *AmqpQueue) AutoDelete(isAutoDelete bool) IQueueSpecification { +func (a *AmqpQueue) AutoDelete(isAutoDelete bool) { a.isAutoDelete = isAutoDelete - return a } func (a *AmqpQueue) IsAutoDelete() bool { return a.isAutoDelete } -func newAmqpQueue(management *AmqpManagement, queueName string) IQueueSpecification { +func newAmqpQueue(management *AmqpManagement, queueName string) *AmqpQueue { return &AmqpQueue{management: management, name: queueName, arguments: make(map[string]any)} @@ -135,7 +137,8 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { if Quorum == a.GetQueueType() || Stream == a.GetQueueType() { // mandatory arguments for quorum queues and streams - a.Exclusive(false).AutoDelete(false) + a.Exclusive(false) + a.AutoDelete(false) } if err := a.validate(); err != nil { @@ -146,7 +149,10 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { a.name = generateNameWithDefaultPrefix() } - path := queuePath(a.name) + path, err := NewAddressBuilder().Queue(a.name).Address() + if err != nil { + return nil, err + } kv := make(map[string]any) kv["durable"] = true kv["auto_delete"] = a.isAutoDelete @@ -160,22 +166,24 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { } func (a *AmqpQueue) Delete(ctx context.Context) error { - path := queuePath(a.name) - _, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200}) + path, err := NewAddressBuilder().Queue(a.name).Address() + if err != nil { + return err + } + _, err = a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200}) return err } func (a *AmqpQueue) Purge(ctx context.Context) (int, error) { - path := queuePurgePath(a.name) + path, err := NewAddressBuilder().Queue(a.name).Append("/messages").Address() + if err != nil { + return 0, err + } + response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200}) return int(response["message_count"].(uint64)), err } -func (a *AmqpQueue) Name(queueName string) IQueueSpecification { +func (a *AmqpQueue) Name(queueName string) { a.name = queueName - return a -} - -func (a *AmqpQueue) GetName() string { - return a.name } diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index 1dfe444..8ca1b12 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -31,11 +31,12 @@ var _ = Describe("AMQP Queue test ", func() { It("AMQP Queue Declare With Response and Get/Delete should succeed", func() { const queueName = "AMQP Queue Declare With Response and Delete should succeed" - queueSpec := management.Queue(queueName) - queueInfo, err := queueSpec.Declare(context.TODO()) + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + }) Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.Name()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) @@ -45,137 +46,160 @@ var _ = Describe("AMQP Queue test ", func() { queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) Expect(queueInfoReceived).To(Equal(queueInfo)) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) }) It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() { const queueName = "AMQP Queue Declare With Parameters and Delete should succeed" - queueSpec := management.Queue(queueName).Exclusive(true). - AutoDelete(true). - QueueType(QueueType{Classic}). - MaxLengthBytes(CapacityGB(1)). - DeadLetterExchange("dead-letter-exchange"). - DeadLetterRoutingKey("dead-letter-routing-key") - queueInfo, err := queueSpec.Declare(context.TODO()) + + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + IsAutoDelete: true, + IsExclusive: true, + QueueType: QueueType{Classic}, + MaxLengthBytes: CapacityGB(1), + DeadLetterExchange: "dead-letter-exchange", + DeadLetterRoutingKey: "dead-letter-routing-key", + }) + Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.Name()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeTrue()) Expect(queueInfo.IsExclusive()).To(BeTrue()) Expect(queueInfo.Type()).To(Equal(Classic)) - Expect(queueInfo.GetLeader()).To(ContainSubstring("rabbit")) - Expect(len(queueInfo.GetReplicas())).To(BeNumerically(">", 0)) + Expect(queueInfo.Leader()).To(ContainSubstring("rabbit")) + Expect(len(queueInfo.Members())).To(BeNumerically(">", 0)) - Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange")) - Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key")) - Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000))) + Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-exchange", "dead-letter-exchange")) + Expect(queueInfo.Arguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key")) + Expect(queueInfo.Arguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000))) // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) Expect(queueInfoReceived).To(Equal(queueInfo)) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) + }) It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() { const queueName = "AMQP Declare Quorum Queue and Delete should succeed" // Quorum queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues - queueSpec := management.Queue(queueName). - Exclusive(true). - AutoDelete(true).QueueType(QueueType{Quorum}) - queueInfo, err := queueSpec.Declare(context.TODO()) + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + IsAutoDelete: true, + IsExclusive: true, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.Name()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Quorum)) - // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) Expect(queueInfoReceived).To(Equal(queueInfo)) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) + }) It("AMQP Declare Stream Queue and Get/Delete should succeed", func() { const queueName = "AMQP Declare Stream Queue and Delete should succeed" // Stream queue will ignore Exclusive and AutoDelete settings // since they are not supported by quorum queues - queueSpec := management.Queue(queueName). - Exclusive(true). - AutoDelete(true).QueueType(QueueType{Stream}) - queueInfo, err := queueSpec.Declare(context.TODO()) + + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + IsAutoDelete: true, + IsExclusive: true, + QueueType: QueueType{Stream}, + }) + Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(Equal(queueName)) + Expect(queueInfo.Name()).To(Equal(queueName)) Expect(queueInfo.IsDurable()).To(BeTrue()) Expect(queueInfo.IsAutoDelete()).To(BeFalse()) Expect(queueInfo.IsExclusive()).To(BeFalse()) Expect(queueInfo.Type()).To(Equal(Stream)) - // validate GET (query queue info) queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName) Expect(queueInfoReceived).To(Equal(queueInfo)) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) + }) It("AMQP Declare Queue with invalid type should fail", func() { const queueName = "AMQP Declare Queue with invalid type should fail" - queueSpec := management.Queue(queueName). - QueueType(QueueType{Type: "invalid"}) - _, err := queueSpec.Declare(context.TODO()) + _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + QueueType: QueueType{Type: "invalid"}, + }) Expect(err).NotTo(BeNil()) }) It("AMQP Declare Queue should fail with Precondition fail", func() { - // The first queue is declared as Classic and it should succeed - // The second queue is declared as Quorum and it should fail since it is already declared as Classic + // The first queue is declared as Classic, and it should succeed + // The second queue is declared as Quorum, and it should fail since it is already declared as Classic const queueName = "AMQP Declare Queue should fail with Precondition fail" - queueSpec := management.Queue(queueName).QueueType(QueueType{Classic}) - _, err := queueSpec.Declare(context.TODO()) + + _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + QueueType: QueueType{Classic}, + }) Expect(err).To(BeNil()) - queueSpecFail := management.Queue(queueName).QueueType(QueueType{Quorum}) - _, err = queueSpecFail.Declare(context.TODO()) + + _, err = management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + QueueType: QueueType{Quorum}, + }) + Expect(err).NotTo(BeNil()) Expect(err).To(Equal(ErrPreconditionFailed)) - err = queueSpec.Delete(context.TODO()) + err = management.DeleteQueue(context.TODO(), queueName) Expect(err).To(BeNil()) }) It("AMQP Declare Queue should fail during validation", func() { const queueName = "AMQP Declare Queue should fail during validation" - queueSpec := management.Queue(queueName).MaxLengthBytes(-1) - _, err := queueSpec.Declare(context.TODO()) + _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + MaxLengthBytes: -1, + }) + Expect(err).NotTo(BeNil()) Expect(err).To(HaveOccurred()) }) It("AMQP Declare Queue should create client name queue", func() { - queueSpec := management.QueueClientName() - queueInfo, err := queueSpec.Declare(context.TODO()) + queueInfo, err := management.DeclareQueue(context.TODO(), nil) Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - Expect(queueInfo.GetName()).To(ContainSubstring("client.gen-")) - err = queueSpec.Delete(context.TODO()) + Expect(queueInfo.Name()).To(ContainSubstring("client.gen-")) + err = management.DeleteQueue(context.TODO(), queueInfo.Name()) Expect(err).To(BeNil()) }) It("AMQP Purge Queue should succeed and return the number of messages purged", func() { const queueName = "AMQP Purge Queue should succeed and return the number of messages purged" - queueSpec := management.Queue(queueName) - _, err := queueSpec.Declare(context.TODO()) + queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ + Name: queueName, + }) Expect(err).To(BeNil()) publishMessages(queueName, 10) - purged, err := queueSpec.Purge(context.TODO()) + purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) Expect(err).To(BeNil()) Expect(purged).To(Equal(10)) }) @@ -199,7 +223,13 @@ func publishMessages(queueName string, count int) { if err != nil { Fail(err.Error()) } - sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil) + + address, err := NewAddressBuilder().Queue(queueName).Address() + if err != nil { + Fail(err.Error()) + } + + sender, err := session.NewSender(context.TODO(), address, nil) if err != nil { Fail(err.Error()) } diff --git a/rabbitmq_amqp/common.go b/rabbitmq_amqp/common.go index a270b98..b785d21 100644 --- a/rabbitmq_amqp/common.go +++ b/rabbitmq_amqp/common.go @@ -4,10 +4,8 @@ import ( "crypto/md5" "encoding/base64" "fmt" - "net/url" - "strings" - "github.com/google/uuid" + "strings" ) const ( @@ -29,73 +27,6 @@ const ( bindings = "bindings" ) -// encodePathSegments takes a string and returns its percent-encoded representation. -func encodePathSegments(input string) string { - var encoded strings.Builder - - // Iterate over each character in the input string - for _, char := range input { - // Check if the character is an unreserved character (i.e., it doesn't need encoding) - if isUnreserved(char) { - encoded.WriteRune(char) // Append as is - } else { - // Encode character To %HH format - encoded.WriteString(fmt.Sprintf("%%%02X", char)) - } - } - - return encoded.String() -} - -// Decode takes a percent-encoded string and returns its decoded representation. -func decode(input string) (string, error) { - // Use url.QueryUnescape which properly decodes percent-encoded strings - decoded, err := url.QueryUnescape(input) - if err != nil { - return "", err - } - - return decoded, nil -} - -// isUnreserved checks if a character is an unreserved character in percent encoding -// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ -func isUnreserved(char rune) bool { - return (char >= 'A' && char <= 'Z') || - (char >= 'a' && char <= 'z') || - (char >= '0' && char <= '9') || - char == '-' || char == '.' || char == '_' || char == '~' -} - -func queuePath(queueName string) string { - return "/" + queues + "/" + encodePathSegments(queueName) -} - -func queuePurgePath(queueName string) string { - return "/" + queues + "/" + encodePathSegments(queueName) + "/messages" -} - -func exchangePath(exchangeName string) string { - return "/" + exchanges + "/" + encodePathSegments(exchangeName) -} - -func bindingPath() string { - return "/" + bindings -} - -func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string { - sourceNameEncoded := encodePathSegments(sourceName) - destinationNameEncoded := encodePathSegments(destinationName) - keyEncoded := encodePathSegments(key) - destinationType := "dste" - if toQueue { - destinationType = "dstq" - } - format := "/%s/src=%s;%s=%s;key=%s;args=" - return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded) - -} - func validatePositive(label string, value int64) error { if value < 0 { return fmt.Errorf("value for %s must be positive, got %d", label, value) @@ -119,3 +50,8 @@ func generateName(prefix string) string { result = strings.ReplaceAll(result, "=", "") return prefix + result } + +func isStringNilOrEmpty(str *string) bool { + return str == nil || len(*str) == 0 + +} diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go index d5513c2..2498d57 100644 --- a/rabbitmq_amqp/connection.go +++ b/rabbitmq_amqp/connection.go @@ -3,6 +3,7 @@ package rabbitmq_amqp import ( "context" "crypto/tls" + "fmt" ) type TSaslMechanism string @@ -17,33 +18,52 @@ type SaslMechanism struct { Type TSaslMechanism } -type IConnectionSettings interface { - GetHost() string - Host(hostName string) IConnectionSettings - GetPort() int - Port(port int) IConnectionSettings - GetUser() string - User(userName string) IConnectionSettings - GetPassword() string - Password(password string) IConnectionSettings - GetVirtualHost() string - VirtualHost(virtualHost string) IConnectionSettings - GetScheme() string - GetContainerId() string - ContainerId(containerId string) IConnectionSettings - UseSsl(value bool) IConnectionSettings - IsSsl() bool - BuildAddress() string - TlsConfig(config *tls.Config) IConnectionSettings - GetTlsConfig() *tls.Config - GetSaslMechanism() TSaslMechanism - SaslMechanism(mechanism SaslMechanism) IConnectionSettings +type ConnectionSettings struct { + Host string + Port int + User string + Password string + VirtualHost string + Scheme string + ContainerId string + UseSsl bool + TlsConfig *tls.Config + SaslMechanism TSaslMechanism +} + +func (c *ConnectionSettings) BuildAddress() string { + return c.Scheme + "://" + c.Host + ":" + fmt.Sprint(c.Port) +} + +// NewConnectionSettings creates a new ConnectionSettings struct with default values. +func NewConnectionSettings() *ConnectionSettings { + return &ConnectionSettings{ + Host: "localhost", + Port: 5672, + User: "guest", + Password: "guest", + VirtualHost: "/", + Scheme: "amqp", + ContainerId: "amqp-go-client", + UseSsl: false, + TlsConfig: nil, + } } type IConnection interface { - Open(ctx context.Context, connectionSettings IConnectionSettings) error + // Open opens a connection to the AMQP 1.0 server. + Open(ctx context.Context, connectionSettings *ConnectionSettings) error + + // Close closes the connection to the AMQP 1.0 server. Close(ctx context.Context) error + + // Management returns the management interface for the connection. Management() IManagement + + // NotifyStatusChange registers a channel to receive status change notifications. + // The channel will receive a StatusChanged struct whenever the status of the connection changes. NotifyStatusChange(channel chan *StatusChanged) - GetStatus() int + // Status returns the current status of the connection. + // See LifeCycle struct for more information. + Status() int } diff --git a/rabbitmq_amqp/converters.go b/rabbitmq_amqp/converters.go index c27911a..f0790d3 100644 --- a/rabbitmq_amqp/converters.go +++ b/rabbitmq_amqp/converters.go @@ -46,7 +46,7 @@ func CapacityFrom(value string) (int64, error) { match, err := regexp.Compile("^((kb|mb|gb|tb))") if err != nil { return 0, - fmt.Errorf("Capacity, invalid unit size format:%s", value) + fmt.Errorf("capacity, invalid unit size format:%s", value) } foundUnitSize := strings.ToLower(value[len(value)-2:]) @@ -55,7 +55,7 @@ func CapacityFrom(value string) (int64, error) { size, err := strconv.Atoi(value[:len(value)-2]) if err != nil { - return 0, fmt.Errorf("Capacity, Invalid number format: %s", value) + return 0, fmt.Errorf("capacity, Invalid number format: %s", value) } switch foundUnitSize { case UnitKb: @@ -72,5 +72,5 @@ func CapacityFrom(value string) (int64, error) { } } - return 0, fmt.Errorf("Capacity, Invalid unit size format: %s", value) + return 0, fmt.Errorf("capacity, Invalid unit size format: %s", value) } diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go index 4ea0c4c..af40da3 100644 --- a/rabbitmq_amqp/entities.go +++ b/rabbitmq_amqp/entities.go @@ -1,9 +1,5 @@ package rabbitmq_amqp -import ( - "context" -) - type TQueueType string const ( @@ -20,38 +16,29 @@ func (e QueueType) String() string { return string(e.Type) } -type IEntityInfoSpecification[T any] interface { - Declare(ctx context.Context) (T, error) - Delete(ctx context.Context) error -} - -type IQueueSpecification interface { - GetName() string - Exclusive(isExclusive bool) IQueueSpecification - IsExclusive() bool - AutoDelete(isAutoDelete bool) IQueueSpecification - IsAutoDelete() bool - IEntityInfoSpecification[IQueueInfo] - QueueType(queueType QueueType) IQueueSpecification - GetQueueType() TQueueType - MaxLengthBytes(length int64) IQueueSpecification - DeadLetterExchange(dlx string) IQueueSpecification - DeadLetterRoutingKey(dlrk string) IQueueSpecification - Purge(ctx context.Context) (int, error) +// QueueSpecification represents the specification of a queue +type QueueSpecification struct { + Name string + IsAutoDelete bool + IsExclusive bool + QueueType QueueType + MaxLengthBytes int64 + DeadLetterExchange string + DeadLetterRoutingKey string } // IQueueInfo represents the information of a queue // It is returned by the Declare method of IQueueSpecification // The information come from the server type IQueueInfo interface { - GetName() string + Name() string IsDurable() bool IsAutoDelete() bool IsExclusive() bool Type() TQueueType - GetLeader() string - GetReplicas() []string - GetArguments() map[string]any + Leader() string + Members() []string + Arguments() map[string]any } type TExchangeType string @@ -74,24 +61,18 @@ func (e ExchangeType) String() string { // It is empty at the moment because the server does not return any information // We leave it here for future use. In case the server returns information about an exchange type IExchangeInfo interface { - GetName() string + Name() string } -type IExchangeSpecification interface { - GetName() string - AutoDelete(isAutoDelete bool) IExchangeSpecification - IsAutoDelete() bool - IEntityInfoSpecification[IExchangeInfo] - ExchangeType(exchangeType ExchangeType) IExchangeSpecification - GetExchangeType() TExchangeType +type ExchangeSpecification struct { + Name string + IsAutoDelete bool + ExchangeType ExchangeType } -type IBindingSpecification interface { - SourceExchange(exchangeSpec IExchangeSpecification) IBindingSpecification - SourceExchangeName(exchangeName string) IBindingSpecification - DestinationQueue(queueSpec IQueueSpecification) IBindingSpecification - DestinationQueueName(queueName string) IBindingSpecification - Key(bindingKey string) IBindingSpecification - Bind(ctx context.Context) error - Unbind(ctx context.Context) error +type BindingSpecification struct { + SourceExchange string + DestinationQueue string + DestinationExchange string + BindingKey string } diff --git a/rabbitmq_amqp/life_cycle.go b/rabbitmq_amqp/life_cycle.go index 10b4dd5..e7b5021 100644 --- a/rabbitmq_amqp/life_cycle.go +++ b/rabbitmq_amqp/life_cycle.go @@ -1,6 +1,9 @@ package rabbitmq_amqp -import "sync" +import ( + "fmt" + "sync" +) const ( Open = iota @@ -9,11 +12,30 @@ const ( Closed = iota ) +func statusToString(status int) string { + switch status { + case Open: + return "Open" + case Reconnecting: + return "Reconnecting" + case Closing: + return "Closing" + case Closed: + return "Closed" + } + return "Unknown" + +} + type StatusChanged struct { From int To int } +func (s StatusChanged) String() string { + return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To)) +} + type LifeCycle struct { status int chStatusChanged chan *StatusChanged diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go index e28084b..fc8b2be 100644 --- a/rabbitmq_amqp/management.go +++ b/rabbitmq_amqp/management.go @@ -5,15 +5,40 @@ import ( ) type IManagement interface { + // Open setups the sender and receiver links to the management interface. Open(ctx context.Context, connection IConnection) error + // Close closes the sender and receiver links to the management interface. Close(ctx context.Context) error - Queue(queueName string) IQueueSpecification + + // DeclareQueue creates a queue with the specified specification. + DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) + // DeleteQueue deletes the queue with the specified name. + DeleteQueue(ctx context.Context, name string) error + // DeclareExchange creates an exchange with the specified specification. + DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) + // DeleteExchange deletes the exchange with the specified name. + DeleteExchange(ctx context.Context, name string) error + //Bind creates a binding between an exchange and a queue or exchange + Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error) + // Unbind removes a binding between an exchange and a queue or exchange given the binding path. + Unbind(ctx context.Context, bindingPath string) error + // PurgeQueue removes all messages from the queue. Returns the number of messages purged. + PurgeQueue(ctx context.Context, queueName string) (int, error) + // QueueInfo returns information about the queue with the specified name. QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) - Exchange(exchangeName string) IExchangeSpecification - Binding() IBindingSpecification - QueueClientName() IQueueSpecification - GetStatus() int + + // Status returns the current status of the management interface. + // See LifeCycle struct for more information. + Status() int + + // NotifyStatusChange registers a channel to receive status change notifications. + // The channel will receive a StatusChanged struct whenever the status of the management interface changes. NotifyStatusChange(channel chan *StatusChanged) + + //Request sends a request to the management interface with the specified body, path, and method. + //Returns the response body as a map[string]any. + //It usually is not necessary to call this method directly. Leave it public for custom use cases. + // The calls above are the recommended way to interact with the management interface. Request(ctx context.Context, body any, path string, method string, expectedResponseCodes []int) (map[string]any, error) }