From 6a00d124e244ae05ed3a587365aaaac85f6a1211 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 15 Nov 2024 14:20:28 +0100 Subject: [PATCH 1/3] Implement publisher closes: https://github.com/rabbitmq/rabbitmq-amqp-go-client/issues/6 Signed-off-by: Gabriele Santomaggio --- rabbitmq_amqp/amqp_connection.go | 14 +++++++++- rabbitmq_amqp/amqp_management.go | 33 ++++------------------- rabbitmq_amqp/amqp_publisher.go | 35 +++++++++++++++++++++++++ rabbitmq_amqp/amqp_publisher_test.go | 37 ++++++++++++++++++++++++++ rabbitmq_amqp/amqp_utils.go | 39 ++++++++++++++++++++++++++++ rabbitmq_amqp/connection.go | 11 ++++++++ rabbitmq_amqp/test_utils.go | 12 +++++++++ 7 files changed, 152 insertions(+), 29 deletions(-) create mode 100644 rabbitmq_amqp/amqp_publisher.go create mode 100644 rabbitmq_amqp/amqp_publisher_test.go create mode 100644 rabbitmq_amqp/amqp_utils.go create mode 100644 rabbitmq_amqp/test_utils.go diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index 8b58ba0..8e81705 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -18,6 +18,15 @@ type AmqpConnection struct { Connection *amqp.Conn management IManagement lifeCycle *LifeCycle + session *amqp.Session +} + +func (a *AmqpConnection) NewIMPublisher(ctx context.Context) (IMPublisher, error) { + sender, err := a.session.NewSender(ctx, "", createSenderLinkOptions("", "IMPublisher")) + if err != nil { + return nil, err + } + return NewMPublisher(sender), nil } // Management returns the management interface for the connection. @@ -72,7 +81,10 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *Connectio } a.Connection = conn a.lifeCycle.SetStatus(Open) - + a.session, err = a.Connection.NewSession(ctx, nil) + if err != nil { + return err + } err = a.Management().Open(ctx, a) if err != nil { // TODO close connection? diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index 55852a6..cb2ba82 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -4,11 +4,10 @@ import ( "context" "errors" "fmt" - "strconv" - "time" - "github.com/Azure/go-amqp" "github.com/google/uuid" + "strconv" + "time" ) var ErrPreconditionFailed = errors.New("precondition Failed") @@ -30,18 +29,7 @@ func NewAmqpManagement() *AmqpManagement { func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { if a.receiver == nil { - prop := make(map[string]any) - prop["paired"] = true - opts := &amqp.ReceiverOptions{ - DynamicAddress: false, - Name: linkPairName, - Properties: prop, - RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), - SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), - TargetAddress: managementNodeAddress, - ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, - Credit: 100, - } + opts := createReceiverLinkOptions(managementNodeAddress, linkPairName) receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) if err != nil { return err @@ -54,19 +42,8 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { if a.sender == nil { - prop := make(map[string]any) - prop["paired"] = true - opts := &amqp.SenderOptions{ - DynamicAddress: false, - ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, - ExpiryTimeout: 0, - Name: linkPairName, - Properties: prop, - SettlementMode: amqp.SenderSettleModeSettled.Ptr(), - RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), - SourceAddress: managementNodeAddress, - } - sender, err := a.session.NewSender(ctx, managementNodeAddress, opts) + sender, err := a.session.NewSender(ctx, managementNodeAddress, + createSenderLinkOptions(managementNodeAddress, linkPairName)) if err != nil { return err } diff --git a/rabbitmq_amqp/amqp_publisher.go b/rabbitmq_amqp/amqp_publisher.go new file mode 100644 index 0000000..53d226d --- /dev/null +++ b/rabbitmq_amqp/amqp_publisher.go @@ -0,0 +1,35 @@ +package rabbitmq_amqp + +import ( + "context" + "github.com/Azure/go-amqp" +) + +type MPublisher struct { + sender *amqp.Sender +} + +func NewMPublisher(sender *amqp.Sender) *MPublisher { + return &MPublisher{sender: sender} +} + +func (m *MPublisher) Publish(ctx context.Context, message *amqp.Message, address *AddressBuilder) error { + + messageTo, err := address.Address() + if err != nil { + return err + } + if message.Properties == nil { + message.Properties = &amqp.MessageProperties{} + } + message.Properties.To = &messageTo + err = m.sender.Send(ctx, message, nil) + if err != nil { + return err + } + return nil +} + +func (m *MPublisher) Close(ctx context.Context) error { + return m.sender.Close(ctx) +} diff --git a/rabbitmq_amqp/amqp_publisher_test.go b/rabbitmq_amqp/amqp_publisher_test.go new file mode 100644 index 0000000..d2aed4b --- /dev/null +++ b/rabbitmq_amqp/amqp_publisher_test.go @@ -0,0 +1,37 @@ +package rabbitmq_amqp + +import ( + "context" + "github.com/Azure/go-amqp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "time" +) + +var _ = Describe("AMQP publisher ", func() { + It("Send a message to a queue with a Message Target Publisher", func() { + qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher") + amqpConnection := NewAmqpConnection() + Expect(amqpConnection).NotTo(BeNil()) + err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + Expect(err).To(BeNil()) + publisher, err := amqpConnection.NewIMPublisher(context.Background()) + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + Expect(publisher).To(BeAssignableToTypeOf(&MPublisher{})) + queueInfo, err := amqpConnection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")), NewAddressBuilder().Queue(qName)) + Expect(err).To(BeNil()) + // TODO: Remove this sleep when the confirmation is done + time.Sleep(500 * time.Millisecond) + nMessages, err := amqpConnection.Management().PurgeQueue(context.Background(), qName) + Expect(err).To(BeNil()) + Expect(nMessages).To(Equal(1)) + Expect(amqpConnection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(publisher.Close(context.Background())).To(BeNil()) + }) +}) diff --git a/rabbitmq_amqp/amqp_utils.go b/rabbitmq_amqp/amqp_utils.go new file mode 100644 index 0000000..6fb01a8 --- /dev/null +++ b/rabbitmq_amqp/amqp_utils.go @@ -0,0 +1,39 @@ +package rabbitmq_amqp + +import "github.com/Azure/go-amqp" + +// senderLinkOptions returns the options for a sender link +// with the given address and link name. +// That should be the same for all the links. +func createSenderLinkOptions(address string, linkName string) *amqp.SenderOptions { + prop := make(map[string]any) + prop["paired"] = true + return &amqp.SenderOptions{ + DynamicAddress: false, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + ExpiryTimeout: 0, + Name: linkName, + Properties: prop, + SettlementMode: amqp.SenderSettleModeSettled.Ptr(), + RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), + SourceAddress: address, + } +} + +// receiverLinkOptions returns the options for a receiver link +// with the given address and link name. +// That should be the same for all the links. +func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOptions { + prop := make(map[string]any) + prop["paired"] = true + return &amqp.ReceiverOptions{ + DynamicAddress: false, + Name: linkName, + Properties: prop, + RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), + SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), + TargetAddress: address, + ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, + Credit: 100, + } +} diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go index 2498d57..f496bc6 100644 --- a/rabbitmq_amqp/connection.go +++ b/rabbitmq_amqp/connection.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "fmt" + "github.com/Azure/go-amqp" ) type TSaslMechanism string @@ -66,4 +67,14 @@ type IConnection interface { // Status returns the current status of the connection. // See LifeCycle struct for more information. Status() int + + NewIMPublisher(ctx context.Context) (IMPublisher, error) +} + +// IMPublisher is an interface for publishers messages based. +// on the AMQP 1.0 protocol. +// No Target address is specified each message is sent to a specific address. +type IMPublisher interface { + Publish(ctx context.Context, message *amqp.Message, address *AddressBuilder) error + Close(ctx context.Context) error } diff --git a/rabbitmq_amqp/test_utils.go b/rabbitmq_amqp/test_utils.go new file mode 100644 index 0000000..35801a4 --- /dev/null +++ b/rabbitmq_amqp/test_utils.go @@ -0,0 +1,12 @@ +package rabbitmq_amqp + +import ( + "fmt" + "strconv" + "time" +) + +func generateNameWithDateTime(name string) string { + return fmt.Sprintf("%s_%s", name, strconv.FormatInt(time.Now().Unix(), 10)) + +} From 36fca22f278ab05b2ca182b859579c69f8b160d9 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Nov 2024 17:51:44 +0100 Subject: [PATCH 2/3] Refactor API Signed-off-by: Gabriele Santomaggio --- examples/getting_started/main.go | 5 +- rabbitmq_amqp/address_builder_test.go | 78 ------ .../{address_builder.go => address_helper.go} | 76 +++-- rabbitmq_amqp/address_helper_test.go | 64 +++++ rabbitmq_amqp/amqp_binding_test.go | 9 +- rabbitmq_amqp/amqp_connection.go | 67 ++--- rabbitmq_amqp/amqp_connection_test.go | 76 ++--- rabbitmq_amqp/amqp_exchange.go | 4 +- rabbitmq_amqp/amqp_exchange_test.go | 9 +- rabbitmq_amqp/amqp_management.go | 2 +- rabbitmq_amqp/amqp_management_test.go | 46 +--- rabbitmq_amqp/amqp_publisher.go | 31 ++- rabbitmq_amqp/amqp_publisher_test.go | 27 +- rabbitmq_amqp/amqp_queue.go | 6 +- rabbitmq_amqp/amqp_queue_test.go | 11 +- rabbitmq_amqp/amqp_utils.go | 4 +- rabbitmq_amqp/connection.go | 58 +--- rabbitmq_amqp/uri.go | 117 ++++++++ rabbitmq_amqp/uri_test.go | 260 ++++++++++++++++++ 19 files changed, 591 insertions(+), 359 deletions(-) delete mode 100644 rabbitmq_amqp/address_builder_test.go rename rabbitmq_amqp/{address_builder.go => address_helper.go} (59%) create mode 100644 rabbitmq_amqp/address_helper_test.go create mode 100644 rabbitmq_amqp/uri.go create mode 100644 rabbitmq_amqp/uri_test.go diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index e47b9a9..b8bd64e 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -8,6 +8,7 @@ import ( ) func main() { + fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) @@ -17,12 +18,12 @@ func main() { } }(chStatusChanged) - amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged) - err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings()) + amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil) if err != nil { fmt.Printf("Error opening connection: %v\n", err) return } + amqpConnection.NotifyStatusChange(chStatusChanged) fmt.Printf("AMQP Connection opened.\n") management := amqpConnection.Management() exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{ diff --git a/rabbitmq_amqp/address_builder_test.go b/rabbitmq_amqp/address_builder_test.go deleted file mode 100644 index aa480a6..0000000 --- a/rabbitmq_amqp/address_builder_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package rabbitmq_amqp - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("Address builder test ", func() { - It("With exchange, queue and key should raise and error", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Queue("queue").Exchange("exchange").Key("key") - _, err := addressBuilder.Address() - Expect(err).NotTo(BeNil()) - Expect(err.Error()).To(Equal("exchange and queue cannot be set together")) - }) - - It("Without exchange and queue should raise and error", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - _, err := addressBuilder.Address() - Expect(err).NotTo(BeNil()) - Expect(err.Error()).To(Equal("exchange or queue must be set")) - }) - - It("With exchange and key should return address", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Exchange("my_exchange").Key("my_key") - address, err := addressBuilder.Address() - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_exchange/my_key")) - }) - - It("With exchange should return address", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Exchange("my_exchange") - address, err := addressBuilder.Address() - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_exchange")) - }) - - It("With exchange and key with names to encode should return the encoded address", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Exchange("my_ exchange/()").Key("my_key ") - address, err := addressBuilder.Address() - Expect(err).To(BeNil()) - Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20")) - }) - - It("With queue should return address", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Queue("my_queue>") - address, err := addressBuilder.Address() - Expect(err).To(BeNil()) - Expect(address).To(Equal("/queues/my_queue%3E")) - }) - - It("With queue and append should return address", func() { - addressBuilder := NewAddressBuilder() - Expect(addressBuilder).NotTo(BeNil()) - Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) - addressBuilder.Queue("my_queue").Append("/messages") - address, err := addressBuilder.Address() - Expect(err).To(BeNil()) - Expect(address).To(Equal("/queues/my_queue/messages")) - }) - -}) diff --git a/rabbitmq_amqp/address_builder.go b/rabbitmq_amqp/address_helper.go similarity index 59% rename from rabbitmq_amqp/address_builder.go rename to rabbitmq_amqp/address_helper.go index 71a0635..16c5eda 100644 --- a/rabbitmq_amqp/address_builder.go +++ b/rabbitmq_amqp/address_helper.go @@ -7,66 +7,56 @@ import ( "strings" ) -type AddressBuilder struct { - queue *string - exchange *string - key *string - append *string -} - -func NewAddressBuilder() *AddressBuilder { - return &AddressBuilder{} -} - -func (a *AddressBuilder) Queue(queue string) *AddressBuilder { - a.queue = &queue - return a -} - -func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder { - a.exchange = &exchange - return a -} - -func (a *AddressBuilder) Key(key string) *AddressBuilder { - a.key = &key - return a -} - -func (a *AddressBuilder) Append(append string) *AddressBuilder { - a.append = &append - return a -} - -func (a *AddressBuilder) Address() (string, error) { - if a.exchange == nil && a.queue == nil { +// Address Creates the address for the exchange or queue following the RabbitMQ conventions. +// see: https://www.rabbitmq.com/docs/next/amqp#address-v2 +func Address(exchange, key, queue *string, urlParameters *string) (string, error) { + if exchange == nil && queue == nil { return "", errors.New("exchange or queue must be set") } urlAppend := "" - if !isStringNilOrEmpty(a.append) { - urlAppend = *a.append + if !isStringNilOrEmpty(urlParameters) { + urlAppend = *urlParameters } - if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) { + if !isStringNilOrEmpty(exchange) && !isStringNilOrEmpty(queue) { return "", errors.New("exchange and queue cannot be set together") } - if !isStringNilOrEmpty(a.exchange) { - if !isStringNilOrEmpty(a.key) { - return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil + if !isStringNilOrEmpty(exchange) { + if !isStringNilOrEmpty(key) { + return "/" + exchanges + "/" + encodePathSegments(*exchange) + "/" + encodePathSegments(*key) + urlAppend, nil } - return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil + return "/" + exchanges + "/" + encodePathSegments(*exchange) + urlAppend, nil } - if a.queue == nil { + if queue == nil { return "", nil } - if isStringNilOrEmpty(a.queue) { + if isStringNilOrEmpty(queue) { return "", errors.New("queue must be set") } - return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil + return "/" + queues + "/" + encodePathSegments(*queue) + urlAppend, nil +} + +// ExchangeAddress Creates the address for the exchange +// See Address for more information +func ExchangeAddress(exchange, key *string) (string, error) { + return Address(exchange, key, nil, nil) +} + +// QueueAddress Creates the address for the queue. +// See Address for more information +func QueueAddress(queue *string) (string, error) { + return Address(nil, nil, queue, nil) +} + +// PurgeQueueAddress Creates the address for purging the queue. +// See Address for more information +func PurgeQueueAddress(queue *string) (string, error) { + parameter := "/messages" + return Address(nil, nil, queue, ¶meter) } // encodePathSegments takes a string and returns its percent-encoded representation. diff --git a/rabbitmq_amqp/address_helper_test.go b/rabbitmq_amqp/address_helper_test.go new file mode 100644 index 0000000..5e23528 --- /dev/null +++ b/rabbitmq_amqp/address_helper_test.go @@ -0,0 +1,64 @@ +package rabbitmq_amqp + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Address builder test ", func() { + It("With exchange, queue and key should raise and error", func() { + queue := "my_queue" + exchange := "my_exchange" + + _, err := Address(&exchange, nil, &queue, nil) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(Equal("exchange and queue cannot be set together")) + }) + + It("Without exchange and queue should raise and error", func() { + _, err := Address(nil, nil, nil, nil) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(Equal("exchange or queue must be set")) + }) + + It("With exchange and key should return address", func() { + exchange := "my_exchange" + key := "my_key" + + address, err := Address(&exchange, &key, nil, nil) + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_exchange/my_key")) + }) + + It("With exchange should return address", func() { + exchange := "my_exchange" + address, err := Address(&exchange, nil, nil, nil) + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_exchange")) + }) + + It("With exchange and key with names to encode should return the encoded address", func() { + + exchange := "my_ exchange/()" + key := "my_key " + + address, err := Address(&exchange, &key, nil, nil) + Expect(err).To(BeNil()) + Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20")) + }) + + It("With queue should return address", func() { + queue := "my_queue>" + address, err := Address(nil, nil, &queue, nil) + Expect(err).To(BeNil()) + Expect(address).To(Equal("/queues/my_queue%3E")) + }) + + It("With queue and urlParameters should return address", func() { + queue := "my_queue" + address, err := PurgeQueueAddress(&queue) + Expect(err).To(BeNil()) + Expect(address).To(Equal("/queues/my_queue/messages")) + }) + +}) diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go index 82c82ec..0a1c1ae 100644 --- a/rabbitmq_amqp/amqp_binding_test.go +++ b/rabbitmq_amqp/amqp_binding_test.go @@ -10,14 +10,9 @@ var _ = Describe("AMQP Bindings test ", func() { var connection IConnection var management IManagement BeforeEach(func() { - connection = NewAmqpConnection() - Expect(connection).NotTo(BeNil()) - Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := connection.Open(context.TODO(), connectionSettings) + conn, err := Dial(context.TODO(), "amqp://", nil) Expect(err).To(BeNil()) + connection = conn management = connection.Management() }) diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index 8e81705..23392e9 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -2,10 +2,11 @@ package rabbitmq_amqp import ( "context" + "fmt" "github.com/Azure/go-amqp" ) -//func (c *ConnectionSettings) UseSsl(value bool) { +//func (c *ConnUrlHelper) UseSsl(value bool) { // c.UseSsl = value // if value { // c.Scheme = "amqps" @@ -21,12 +22,12 @@ type AmqpConnection struct { session *amqp.Session } -func (a *AmqpConnection) NewIMPublisher(ctx context.Context) (IMPublisher, error) { - sender, err := a.session.NewSender(ctx, "", createSenderLinkOptions("", "IMPublisher")) +func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) { + sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName)) if err != nil { return nil, err } - return NewMPublisher(sender), nil + return newPublisher(sender), nil } // Management returns the management interface for the connection. @@ -35,52 +36,50 @@ func (a *AmqpConnection) Management() IManagement { return a.management } -// NewAmqpConnection creates a new AmqpConnection +// Dial creates a new AmqpConnection // with a new AmqpManagement and a new LifeCycle. // Returns a pointer to the new AmqpConnection -func NewAmqpConnection() IConnection { - return &AmqpConnection{ +func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) { + conn := &AmqpConnection{ management: NewAmqpManagement(), lifeCycle: NewLifeCycle(), } -} - -// NewAmqpConnectionNotifyStatusChanged creates a new AmqpConnection -// with a new AmqpManagement and a new LifeCycle -// and sets the channel for status changes. -// Returns a pointer to the new AmqpConnection -func NewAmqpConnectionNotifyStatusChanged(channel chan *StatusChanged) IConnection { - lifeCycle := NewLifeCycle() - lifeCycle.chStatusChanged = channel - return &AmqpConnection{ - management: NewAmqpManagement(), - lifeCycle: lifeCycle, + err := conn.open(ctx, addr, connOptions) + if err != nil { + return nil, err } + return conn, nil } // Open opens a connection to the AMQP 1.0 server. // using the provided connectionSettings and the AMQPLite library. // Setups the connection and the management interface. -func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *ConnectionSettings) error { - sASLType := amqp.SASLTypeAnonymous() - switch connectionSettings.SaslMechanism { - case Plain: - sASLType = amqp.SASLTypePlain(connectionSettings.User, connectionSettings.Password) - case External: - sASLType = amqp.SASLTypeExternal("") +func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amqp.ConnOptions) error { + + if connOptions == nil { + connOptions = &amqp.ConnOptions{ + // RabbitMQ requires SASL security layer + // to be enabled for AMQP 1.0 connections. + // So this is mandatory and default in case not defined. + SASLType: amqp.SASLTypeAnonymous(), + } } - conn, err := amqp.Dial(ctx, connectionSettings.BuildAddress(), &amqp.ConnOptions{ - ContainerID: connectionSettings.ContainerId, - SASLType: sASLType, - HostName: connectionSettings.VirtualHost, - TLSConfig: connectionSettings.TlsConfig, - }) + //connOptions.HostName is the way to set the virtual host + // so we need to pre-parse the URI to get the virtual host + // the PARSE is copied from go-amqp091 library + // the URI will be parsed is parsed again in the amqp lite library + uri, err := ParseURI(addr) + if err != nil { + return err + } + connOptions.HostName = fmt.Sprintf("vhost:%s", uri.Vhost) + + conn, err := amqp.Dial(ctx, addr, connOptions) if err != nil { return err } a.Connection = conn - a.lifeCycle.SetStatus(Open) a.session, err = a.Connection.NewSession(ctx, nil) if err != nil { return err @@ -90,6 +89,8 @@ func (a *AmqpConnection) Open(ctx context.Context, connectionSettings *Connectio // TODO close connection? return err } + + a.lifeCycle.SetStatus(Open) return nil } diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go index 398f91f..4eb7196 100644 --- a/rabbitmq_amqp/amqp_connection_test.go +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -2,6 +2,7 @@ package rabbitmq_amqp import ( "context" + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "time" @@ -9,94 +10,51 @@ import ( var _ = Describe("AMQP Connection Test", func() { It("AMQP SASLTypeAnonymous Connection should succeed", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - connectionSettings.SaslMechanism = Anonymous - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - - err := amqpConnection.Open(context.Background(), connectionSettings) + connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{ + SASLType: amqp.SASLTypeAnonymous()}) Expect(err).To(BeNil()) - err = amqpConnection.Close(context.Background()) + err = connection.Close(context.Background()) Expect(err).To(BeNil()) }) It("AMQP SASLTypePlain Connection should succeed", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - connectionSettings.SaslMechanism = Plain + connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{ + SASLType: amqp.SASLTypePlain("guest", "guest")}) - err := amqpConnection.Open(context.Background(), connectionSettings) Expect(err).To(BeNil()) - err = amqpConnection.Close(context.Background()) + err = connection.Close(context.Background()) Expect(err).To(BeNil()) }) It("AMQP Connection should fail due of wrong Port", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := &ConnectionSettings{ - Host: "localhost", - Port: 1234, - } - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - - err := amqpConnection.Open(context.Background(), connectionSettings) + _, err := Dial(context.Background(), "amqp://localhost:1234", nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should fail due of wrong Host", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - - connectionSettings := &ConnectionSettings{ - Host: "wronghost", - Port: 5672, - } - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - - err := amqpConnection.Open(context.Background(), connectionSettings) + _, err := Dial(context.Background(), "amqp://wrong_host:5672", nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should fail due to context cancellation", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) cancel() - err := amqpConnection.Open(ctx, NewConnectionSettings()) + _, err := Dial(ctx, "amqp://", nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should receive events", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) ch := make(chan *StatusChanged, 1) - amqpConnection.NotifyStatusChange(ch) - err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) - recv := <-ch - Expect(recv).NotTo(BeNil()) - Expect(recv.From).To(Equal(Closed)) - Expect(recv.To).To(Equal(Open)) - - err = amqpConnection.Close(context.Background()) + connection.NotifyStatusChange(ch) + err = connection.Close(context.Background()) Expect(err).To(BeNil()) - recv = <-ch - Expect(recv).NotTo(BeNil()) + recv := <-ch + Expect(recv).NotTo(BeNil()) Expect(recv.From).To(Equal(Open)) Expect(recv.To).To(Equal(Closed)) }) @@ -106,13 +64,13 @@ var _ = Describe("AMQP Connection Test", func() { // Expect(amqpConnection).NotTo(BeNil()) // Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) // - // connectionSettings := NewConnectionSettings(). + // connectionSettings := NewConnUrlHelper(). // UseSsl(true).Port(5671).TlsConfig(&tls.Config{ // //ServerName: "localhost", // InsecureSkipVerify: true, // }) // Expect(connectionSettings).NotTo(BeNil()) - // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) + // Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnUrlHelper{})) // err := amqpConnection.Open(context.Background(), connectionSettings) // Expect(err).To(BeNil()) //}) diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go index 6597d77..62a54b1 100644 --- a/rabbitmq_amqp/amqp_exchange.go +++ b/rabbitmq_amqp/amqp_exchange.go @@ -34,7 +34,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange { } func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { - path, err := NewAddressBuilder().Exchange(e.name).Address() + path, err := ExchangeAddress(&e.name, nil) if err != nil { return nil, err } @@ -59,7 +59,7 @@ func (e *AmqpExchange) IsAutoDelete() bool { } func (e *AmqpExchange) Delete(ctx context.Context) error { - path, err := NewAddressBuilder().Exchange(e.name).Address() + path, err := ExchangeAddress(&e.name, nil) if err != nil { return err } diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go index 8fcac70..3248995 100644 --- a/rabbitmq_amqp/amqp_exchange_test.go +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -11,13 +11,8 @@ var _ = Describe("AMQP Exchange test ", func() { var connection IConnection var management IManagement BeforeEach(func() { - connection = NewAmqpConnection() - Expect(connection).NotTo(BeNil()) - Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := connection.Open(context.TODO(), connectionSettings) + conn, err := Dial(context.TODO(), "amqp://", nil) + connection = conn Expect(err).To(BeNil()) management = connection.Management() }) diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index cb2ba82..c9d71c2 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -225,7 +225,7 @@ func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error { return bind.Unbind(ctx, bindingPath) } func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) { - path, err := NewAddressBuilder().Queue(queueName).Address() + path, err := QueueAddress(&queueName) if err != nil { return nil, err } diff --git a/rabbitmq_amqp/amqp_management_test.go b/rabbitmq_amqp/amqp_management_test.go index 0f868ee..3df3f9e 100644 --- a/rabbitmq_amqp/amqp_management_test.go +++ b/rabbitmq_amqp/amqp_management_test.go @@ -11,51 +11,37 @@ import ( var _ = Describe("Management tests", func() { It("AMQP Management should fail due to context cancellation", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) cancel() - err = amqpConnection.Management().Open(ctx, amqpConnection) + err = connection.Management().Open(ctx, connection) Expect(err).NotTo(BeNil()) - Expect(amqpConnection.Close(context.Background())).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) }) It("AMQP Management should receive events", func() { ch := make(chan *StatusChanged, 1) - amqpConnection := NewAmqpConnectionNotifyStatusChanged(ch) - Expect(amqpConnection).NotTo(BeNil()) - err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) - recv := <-ch - Expect(recv).NotTo(BeNil()) - Expect(recv.From).To(Equal(Closed)) - Expect(recv.To).To(Equal(Open)) - - err = amqpConnection.Close(context.Background()) + connection.NotifyStatusChange(ch) + err = connection.Close(context.Background()) Expect(err).To(BeNil()) - recv = <-ch + recv := <-ch Expect(recv).NotTo(BeNil()) Expect(recv.From).To(Equal(Open)) Expect(recv.To).To(Equal(Closed)) - Expect(amqpConnection.Close(context.Background())).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) }) It("Request", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := amqpConnection.Open(context.Background(), connectionSettings) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) - management := amqpConnection.Management() + management := connection.Management() kv := make(map[string]any) kv["durable"] = true kv["auto_delete"] = false @@ -67,21 +53,15 @@ var _ = Describe("Management tests", func() { Expect(err).To(BeNil()) Expect(result).NotTo(BeNil()) Expect(management.Close(context.Background())).To(BeNil()) - Expect(amqpConnection.Close(context.Background())).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) }) It("GET on non-existing queue returns ErrDoesNotExist", func() { - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := amqpConnection.Open(context.Background(), connectionSettings) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) - management := amqpConnection.Management() + management := connection.Management() path := "/queues/i-do-not-exist" result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404}) Expect(err).To(Equal(ErrDoesNotExist)) diff --git a/rabbitmq_amqp/amqp_publisher.go b/rabbitmq_amqp/amqp_publisher.go index 53d226d..c610131 100644 --- a/rabbitmq_amqp/amqp_publisher.go +++ b/rabbitmq_amqp/amqp_publisher.go @@ -5,31 +5,34 @@ import ( "github.com/Azure/go-amqp" ) -type MPublisher struct { +type Publisher struct { sender *amqp.Sender } -func NewMPublisher(sender *amqp.Sender) *MPublisher { - return &MPublisher{sender: sender} +func newPublisher(sender *amqp.Sender) *Publisher { + return &Publisher{sender: sender} } -func (m *MPublisher) Publish(ctx context.Context, message *amqp.Message, address *AddressBuilder) error { +func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) error { + + /// for the outcome of the message delivery, see https://github.com/Azure/go-amqp/issues/347 + //RELEASED + ///** + // * The broker could not route the message to any queue. + // * + // *

This is likely to be due to a topology misconfiguration. + // */ + // so at the moment we don't have access on this information + // TODO: remove this comment when the issue is resolved + + err := m.sender.Send(ctx, message, nil) - messageTo, err := address.Address() - if err != nil { - return err - } - if message.Properties == nil { - message.Properties = &amqp.MessageProperties{} - } - message.Properties.To = &messageTo - err = m.sender.Send(ctx, message, nil) if err != nil { return err } return nil } -func (m *MPublisher) Close(ctx context.Context) error { +func (m *Publisher) Close(ctx context.Context) error { return m.sender.Close(ctx) } diff --git a/rabbitmq_amqp/amqp_publisher_test.go b/rabbitmq_amqp/amqp_publisher_test.go index d2aed4b..173cebd 100644 --- a/rabbitmq_amqp/amqp_publisher_test.go +++ b/rabbitmq_amqp/amqp_publisher_test.go @@ -5,33 +5,32 @@ import ( "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "time" ) var _ = Describe("AMQP publisher ", func() { It("Send a message to a queue with a Message Target Publisher", func() { qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher") - amqpConnection := NewAmqpConnection() - Expect(amqpConnection).NotTo(BeNil()) - err := amqpConnection.Open(context.Background(), NewConnectionSettings()) + connection, err := Dial(context.Background(), "amqp://", nil) Expect(err).To(BeNil()) - publisher, err := amqpConnection.NewIMPublisher(context.Background()) - Expect(err).To(BeNil()) - Expect(publisher).NotTo(BeNil()) - Expect(publisher).To(BeAssignableToTypeOf(&MPublisher{})) - queueInfo, err := amqpConnection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Expect(connection).NotTo(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ Name: qName, }) Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello")), NewAddressBuilder().Queue(qName)) + dest, _ := QueueAddress(&qName) + publisher, err := connection.Publisher(context.Background(), dest, "test") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + Expect(publisher).To(BeAssignableToTypeOf(&Publisher{})) + + err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + Expect(err).To(BeNil()) - // TODO: Remove this sleep when the confirmation is done - time.Sleep(500 * time.Millisecond) - nMessages, err := amqpConnection.Management().PurgeQueue(context.Background(), qName) + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) Expect(err).To(BeNil()) Expect(nMessages).To(Equal(1)) - Expect(amqpConnection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(publisher.Close(context.Background())).To(BeNil()) }) }) diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index 061eea6..525221d 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -149,7 +149,7 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { a.name = generateNameWithDefaultPrefix() } - path, err := NewAddressBuilder().Queue(a.name).Address() + path, err := QueueAddress(&a.name) if err != nil { return nil, err } @@ -166,7 +166,7 @@ func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { } func (a *AmqpQueue) Delete(ctx context.Context) error { - path, err := NewAddressBuilder().Queue(a.name).Address() + path, err := QueueAddress(&a.name) if err != nil { return err } @@ -175,7 +175,7 @@ func (a *AmqpQueue) Delete(ctx context.Context) error { } func (a *AmqpQueue) Purge(ctx context.Context) (int, error) { - path, err := NewAddressBuilder().Queue(a.name).Append("/messages").Address() + path, err := PurgeQueueAddress(&a.name) if err != nil { return 0, err } diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index 8ca1b12..df8ecd6 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -13,14 +13,9 @@ var _ = Describe("AMQP Queue test ", func() { var connection IConnection var management IManagement BeforeEach(func() { - connection = NewAmqpConnection() - Expect(connection).NotTo(BeNil()) - Expect(connection).To(BeAssignableToTypeOf(&AmqpConnection{})) - connectionSettings := NewConnectionSettings() - Expect(connectionSettings).NotTo(BeNil()) - Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{})) - err := connection.Open(context.TODO(), connectionSettings) + conn, err := Dial(context.TODO(), "amqp://", nil) Expect(err).To(BeNil()) + connection = conn management = connection.Management() }) @@ -224,7 +219,7 @@ func publishMessages(queueName string, count int) { Fail(err.Error()) } - address, err := NewAddressBuilder().Queue(queueName).Address() + address, err := QueueAddress(&queueName) if err != nil { Fail(err.Error()) } diff --git a/rabbitmq_amqp/amqp_utils.go b/rabbitmq_amqp/amqp_utils.go index 6fb01a8..157e545 100644 --- a/rabbitmq_amqp/amqp_utils.go +++ b/rabbitmq_amqp/amqp_utils.go @@ -9,6 +9,7 @@ func createSenderLinkOptions(address string, linkName string) *amqp.SenderOption prop := make(map[string]any) prop["paired"] = true return &amqp.SenderOptions{ + SourceAddress: address, DynamicAddress: false, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, ExpiryTimeout: 0, @@ -16,7 +17,6 @@ func createSenderLinkOptions(address string, linkName string) *amqp.SenderOption Properties: prop, SettlementMode: amqp.SenderSettleModeSettled.Ptr(), RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), - SourceAddress: address, } } @@ -27,12 +27,12 @@ func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOp prop := make(map[string]any) prop["paired"] = true return &amqp.ReceiverOptions{ + TargetAddress: address, DynamicAddress: false, Name: linkName, Properties: prop, RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), - TargetAddress: address, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, Credit: 100, } diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go index f496bc6..c669b74 100644 --- a/rabbitmq_amqp/connection.go +++ b/rabbitmq_amqp/connection.go @@ -2,58 +2,10 @@ package rabbitmq_amqp import ( "context" - "crypto/tls" - "fmt" "github.com/Azure/go-amqp" ) -type TSaslMechanism string - -const ( - Plain TSaslMechanism = "plain" - External TSaslMechanism = "external" - Anonymous TSaslMechanism = "anonymous" -) - -type SaslMechanism struct { - Type TSaslMechanism -} - -type ConnectionSettings struct { - Host string - Port int - User string - Password string - VirtualHost string - Scheme string - ContainerId string - UseSsl bool - TlsConfig *tls.Config - SaslMechanism TSaslMechanism -} - -func (c *ConnectionSettings) BuildAddress() string { - return c.Scheme + "://" + c.Host + ":" + fmt.Sprint(c.Port) -} - -// NewConnectionSettings creates a new ConnectionSettings struct with default values. -func NewConnectionSettings() *ConnectionSettings { - return &ConnectionSettings{ - Host: "localhost", - Port: 5672, - User: "guest", - Password: "guest", - VirtualHost: "/", - Scheme: "amqp", - ContainerId: "amqp-go-client", - UseSsl: false, - TlsConfig: nil, - } -} - type IConnection interface { - // Open opens a connection to the AMQP 1.0 server. - Open(ctx context.Context, connectionSettings *ConnectionSettings) error // Close closes the connection to the AMQP 1.0 server. Close(ctx context.Context) error @@ -68,13 +20,13 @@ type IConnection interface { // See LifeCycle struct for more information. Status() int - NewIMPublisher(ctx context.Context) (IMPublisher, error) + // Publisher returns a new IPublisher interface for the connection. + Publisher(ctx context.Context, destinationAddr string, linkName string) (IPublisher, error) } -// IMPublisher is an interface for publishers messages based. +// IPublisher is an interface for publishers messages based. // on the AMQP 1.0 protocol. -// No Target address is specified each message is sent to a specific address. -type IMPublisher interface { - Publish(ctx context.Context, message *amqp.Message, address *AddressBuilder) error +type IPublisher interface { + Publish(ctx context.Context, message *amqp.Message) error Close(ctx context.Context) error } diff --git a/rabbitmq_amqp/uri.go b/rabbitmq_amqp/uri.go new file mode 100644 index 0000000..bcd232e --- /dev/null +++ b/rabbitmq_amqp/uri.go @@ -0,0 +1,117 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rabbitmq_amqp + +import ( + "errors" + "net/url" + "strconv" + "strings" +) + +var ( + errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'") + errURIWhitespace = errors.New("URI must not contain whitespace") +) + +var schemePorts = map[string]int{ + "amqp": 5672, + "amqps": 5671, +} + +var defaultURI = URI{ + Scheme: "amqp", + Host: "localhost", + Port: 5672, + Username: "guest", + Password: "guest", + Vhost: "/", +} + +// URI represents a parsed AMQP URI string. +type URI struct { + Scheme string + Host string + Port int + Username string + Password string + Vhost string +} + +// ParseURI attempts to parse the given AMQP URI according to the spec. +// See http://www.rabbitmq.com/uri-spec.html. +// +// Default values for the fields are: +// +// Scheme: amqp +// Host: localhost +// Port: 5672 +// Username: guest +// Password: guest +// Vhost: / +func ParseURI(uri string) (URI, error) { + builder := defaultURI + + if strings.Contains(uri, " ") { + return builder, errURIWhitespace + } + + u, err := url.Parse(uri) + if err != nil { + return builder, err + } + + defaultPort, okScheme := schemePorts[u.Scheme] + + if okScheme { + builder.Scheme = u.Scheme + } else { + return builder, errURIScheme + } + + host := u.Hostname() + port := u.Port() + + if host != "" { + builder.Host = host + } + + if port != "" { + port32, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return builder, err + } + builder.Port = int(port32) + } else { + builder.Port = defaultPort + } + + if u.User != nil { + builder.Username = u.User.Username() + if password, ok := u.User.Password(); ok { + builder.Password = password + } + } + + if u.Path != "" { + if strings.HasPrefix(u.Path, "/") { + if u.Host == "" && strings.HasPrefix(u.Path, "///") { + // net/url doesn't handle local context authorities and leaves that up + // to the scheme handler. In our case, we translate amqp:/// into the + // default host and whatever the vhost should be + if len(u.Path) > 3 { + builder.Vhost = u.Path[3:] + } + } else if len(u.Path) > 1 { + builder.Vhost = u.Path[1:] + } + } else { + builder.Vhost = u.Path + } + } + + return builder, nil +} diff --git a/rabbitmq_amqp/uri_test.go b/rabbitmq_amqp/uri_test.go new file mode 100644 index 0000000..319056f --- /dev/null +++ b/rabbitmq_amqp/uri_test.go @@ -0,0 +1,260 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rabbitmq_amqp + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// Test matrix defined on http://www.rabbitmq.com/uri-spec.html +type testURI struct { + url string + username string + password string + host string + port int + vhost string + canon string +} + +var uriTests = []testURI{ + { + url: "amqp://user:pass@host:10000/vhost", + username: "user", + password: "pass", + host: "host", + port: 10000, + vhost: "vhost", + canon: "amqp://user:pass@host:10000/vhost", + }, + + { + url: "amqp://", + username: defaultURI.Username, + password: defaultURI.Password, + host: defaultURI.Host, + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://localhost/", + }, + + { + url: "amqp://:@/", + username: "", + password: "", + host: defaultURI.Host, + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://:@localhost/", + }, + + { + url: "amqp://user@", + username: "user", + password: defaultURI.Password, + host: defaultURI.Host, + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://user@localhost/", + }, + + { + url: "amqp://user:pass@", + username: "user", + password: "pass", + host: defaultURI.Host, + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://user:pass@localhost/", + }, + + { + url: "amqp://guest:pass@", + username: "guest", + password: "pass", + host: defaultURI.Host, + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://guest:pass@localhost/", + }, + + { + url: "amqp://host", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://host/", + }, + + { + url: "amqp://:10000", + username: defaultURI.Username, + password: defaultURI.Password, + host: defaultURI.Host, + port: 10000, + vhost: defaultURI.Vhost, + canon: "amqp://localhost:10000/", + }, + + { + url: "amqp:///vhost", + username: defaultURI.Username, + password: defaultURI.Password, + host: defaultURI.Host, + port: defaultURI.Port, + vhost: "vhost", + canon: "amqp://localhost/vhost", + }, + + { + url: "amqp://host/", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://host/", + }, + + { + url: "amqp://host/%2F", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: defaultURI.Port, + vhost: "/", + canon: "amqp://host/", + }, + + { + url: "amqp://host/%2F%2F", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: defaultURI.Port, + vhost: "//", + canon: "amqp://host/%2F%2F", + }, + + { + url: "amqp://host/%2Fslash%2F", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: defaultURI.Port, + vhost: "/slash/", + canon: "amqp://host/%2Fslash%2F", + }, + + { + url: "amqp://192.168.1.1:1000/", + username: defaultURI.Username, + password: defaultURI.Password, + host: "192.168.1.1", + port: 1000, + vhost: defaultURI.Vhost, + canon: "amqp://192.168.1.1:1000/", + }, + + { + url: "amqp://[::1]", + username: defaultURI.Username, + password: defaultURI.Password, + host: "::1", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://[::1]/", + }, + + { + url: "amqp://[::1]:1000", + username: defaultURI.Username, + password: defaultURI.Password, + host: "::1", + port: 1000, + vhost: defaultURI.Vhost, + canon: "amqp://[::1]:1000/", + }, + + { + url: "amqp://[fe80::1]", + username: defaultURI.Username, + password: defaultURI.Password, + host: "fe80::1", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://[fe80::1]/", + }, + + { + url: "amqp://[fe80::1]", + username: defaultURI.Username, + password: defaultURI.Password, + host: "fe80::1", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://[fe80::1]/", + }, + + { + url: "amqp://[fe80::1%25en0]", + username: defaultURI.Username, + password: defaultURI.Password, + host: "fe80::1%en0", + port: defaultURI.Port, + vhost: defaultURI.Vhost, + canon: "amqp://[fe80::1%25en0]/", + }, + + { + url: "amqp://[fe80::1]:5671", + username: defaultURI.Username, + password: defaultURI.Password, + host: "fe80::1", + port: 5671, + vhost: defaultURI.Vhost, + canon: "amqp://[fe80::1]:5671/", + }, + + { + url: "amqps:///", + username: defaultURI.Username, + password: defaultURI.Password, + host: defaultURI.Host, + port: schemePorts["amqps"], + vhost: defaultURI.Vhost, + canon: "amqps://localhost/", + }, + + { + url: "amqps://host:1000/", + username: defaultURI.Username, + password: defaultURI.Password, + host: "host", + port: 1000, + vhost: defaultURI.Vhost, + canon: "amqps://host:1000/", + }, +} + +var _ = Describe("Parse Test ", func() { + It("ParseURI", func() { + + for _, test := range uriTests { + uri, err := ParseURI(test.url) + Expect(err).To(BeNil()) + Expect(uri.Username).To(Equal(test.username)) + Expect(uri.Password).To(Equal(test.password)) + Expect(uri.Host).To(Equal(test.host)) + Expect(uri.Port).To(Equal(test.port)) + Expect(uri.Vhost).To(Equal(test.vhost)) + } + }) + +}) From b304d88fb6f9cae827f0a9d9b2ab658fc2edfd24 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Nov 2024 18:06:18 +0100 Subject: [PATCH 3/3] Refactor API Signed-off-by: Gabriele Santomaggio --- examples/getting_started/main.go | 60 ++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index b8bd64e..59dc4ef 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -3,39 +3,51 @@ package main import ( "context" "fmt" + "github.com/Azure/go-amqp" "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" "time" ) func main() { + exchangeName := "getting-started-exchange" + queueName := "getting-started-queue" + routingKey := "routing-key" + fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") - chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) + /// Create a channel to receive status change notifications + chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) go func(ch chan *rabbitmq_amqp.StatusChanged) { for statusChanged := range ch { fmt.Printf("%s\n", statusChanged) } }(chStatusChanged) + // Open a connection to the AMQP 1.0 server amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil) if err != nil { fmt.Printf("Error opening connection: %v\n", err) return } + // Register the channel to receive status change notifications amqpConnection.NotifyStatusChange(chStatusChanged) + fmt.Printf("AMQP Connection opened.\n") + // Create the management interface for the connection + // so we can declare exchanges, queues, and bindings management := amqpConnection.Management() exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{ - Name: "getting-started-exchange", + Name: exchangeName, }) if err != nil { fmt.Printf("Error declaring exchange: %v\n", err) return } + // Declare a Quorum queue queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{ - Name: "getting-started-queue", + Name: queueName, QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum}, }) @@ -44,10 +56,11 @@ func main() { return } + // Bind the queue to the exchange bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{ - SourceExchange: exchangeInfo.Name(), - DestinationQueue: queueInfo.Name(), - BindingKey: "routing-key", + SourceExchange: exchangeName, + DestinationQueue: queueName, + BindingKey: routingKey, }) if err != nil { @@ -55,6 +68,33 @@ func main() { return } + addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey) + + publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher") + if err != nil { + fmt.Printf("Error creating publisher: %v\n", err) + return + } + + // Publish a message to the exchange + err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"))) + if err != nil { + fmt.Printf("Error publishing message: %v\n", err) + return + } + + println("press any key to close the connection") + + var input string + _, _ = fmt.Scanln(&input) + + // Close the publisher + err = publisher.Close(context.Background()) + if err != nil { + return + } + // Unbind the queue from the exchange + err = management.Unbind(context.TODO(), bindingPath) if err != nil { @@ -68,6 +108,14 @@ func main() { return } + // Purge the queue + purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) + if err != nil { + fmt.Printf("Error purging queue: %v\n", err) + return + } + fmt.Printf("Purged %d messages from the queue.\n", purged) + err = management.DeleteQueue(context.TODO(), queueInfo.Name()) if err != nil { fmt.Printf("Error deleting queue: %v\n", err)