diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index 59dc4ef..c4fd6fc 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -9,29 +9,28 @@ import ( ) func main() { - exchangeName := "getting-started-exchange" queueName := "getting-started-queue" routingKey := "routing-key" - fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") + rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client") - /// Create a channel to receive status change notifications - chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) - go func(ch chan *rabbitmq_amqp.StatusChanged) { + /// Create a channel to receive state change notifications + stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1) + go func(ch chan *rabbitmq_amqp.StateChanged) { for statusChanged := range ch { - fmt.Printf("%s\n", statusChanged) + rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged) } - }(chStatusChanged) + }(stateChanged) // Open a connection to the AMQP 1.0 server - amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil) + amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil) if err != nil { - fmt.Printf("Error opening connection: %v\n", err) + rabbitmq_amqp.Error("Error opening connection", err) return } // Register the channel to receive status change notifications - amqpConnection.NotifyStatusChange(chStatusChanged) + amqpConnection.NotifyStatusChange(stateChanged) fmt.Printf("AMQP Connection opened.\n") // Create the management interface for the connection @@ -41,7 +40,7 @@ func main() { Name: exchangeName, }) if err != nil { - fmt.Printf("Error declaring exchange: %v\n", err) + rabbitmq_amqp.Error("Error declaring exchange", err) return } @@ -52,7 +51,7 @@ func main() { }) if err != nil { - fmt.Printf("Error declaring queue: %v\n", err) + rabbitmq_amqp.Error("Error declaring queue", err) return } @@ -64,7 +63,7 @@ func main() { }) if err != nil { - fmt.Printf("Error binding: %v\n", err) + rabbitmq_amqp.Error("Error binding", err) return } @@ -72,16 +71,31 @@ func main() { publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher") if err != nil { - fmt.Printf("Error creating publisher: %v\n", err) + rabbitmq_amqp.Error("Error creating publisher", err) return } // Publish a message to the exchange - err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"))) + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"))) if err != nil { - fmt.Printf("Error publishing message: %v\n", err) + rabbitmq_amqp.Error("Error publishing message", err) return } + switch publishResult.Outcome { + case &amqp.StateAccepted{}: + rabbitmq_amqp.Info("Message accepted") + case &amqp.StateReleased{}: + rabbitmq_amqp.Warn("Message was not routed") + case &amqp.StateRejected{}: + rabbitmq_amqp.Warn("Message rejected") + stateType := publishResult.Outcome.(*amqp.StateRejected) + if stateType.Error != nil { + rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error) + } + default: + // these status are not supported + rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + } println("press any key to close the connection") @@ -132,5 +146,5 @@ func main() { // Wait for the status change to be printed time.Sleep(500 * time.Millisecond) - close(chStatusChanged) + close(stateChangeds) } diff --git a/go.mod b/go.mod index 3e1d92d..5b13eb5 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client go 1.22.0 require ( - github.com/Azure/go-amqp v1.2.0 + github.com/Azure/go-amqp v1.4.0-beta.1 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 diff --git a/go.sum b/go.sum index 83532f3..f5e896e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ -github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A= -github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I= -github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/Azure/go-amqp v1.4.0-beta.1 h1:BjZM/308FpfsQjX0gXtYK8Vx+WgQ1eng3oVQDEeXMmA= +github.com/Azure/go-amqp v1.4.0-beta.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= diff --git a/rabbitmq_amqp/address_helper.go b/rabbitmq_amqp/address_helper.go index 16c5eda..47382b1 100644 --- a/rabbitmq_amqp/address_helper.go +++ b/rabbitmq_amqp/address_helper.go @@ -3,7 +3,6 @@ package rabbitmq_amqp import ( "errors" "fmt" - "net/url" "strings" ) @@ -77,16 +76,16 @@ func encodePathSegments(input string) string { return encoded.String() } -// Decode takes a percent-encoded string and returns its decoded representation. -func decode(input string) (string, error) { - // Use url.QueryUnescape which properly decodes percent-encoded strings - decoded, err := url.QueryUnescape(input) - if err != nil { - return "", err - } - - return decoded, nil -} +//// Decode takes a percent-encoded string and returns its decoded representation. +//func decode(input string) (string, error) { +// // Use url.QueryUnescape which properly decodes percent-encoded strings +// decoded, err := url.QueryUnescape(input) +// if err != nil { +// return "", err +// } +// +// return decoded, nil +//} // isUnreserved checks if a character is an unreserved character in percent encoding // Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ @@ -111,5 +110,8 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, } format := "/%s/src=%s;%s=%s;key=%s;args=" return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded) +} +func validateAddress(address string) bool { + return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues)) } diff --git a/rabbitmq_amqp/amqp_binding_test.go b/rabbitmq_amqp/amqp_binding_test.go index 0a1c1ae..5abc234 100644 --- a/rabbitmq_amqp/amqp_binding_test.go +++ b/rabbitmq_amqp/amqp_binding_test.go @@ -7,10 +7,10 @@ import ( ) var _ = Describe("AMQP Bindings test ", func() { - var connection IConnection - var management IManagement + var connection *AmqpConnection + var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), "amqp://", nil) + conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) connection = conn management = connection.Management() diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index 23392e9..bc294d9 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -17,38 +17,50 @@ import ( type AmqpConnection struct { Connection *amqp.Conn - management IManagement + management *AmqpManagement lifeCycle *LifeCycle session *amqp.Session } -func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) { - sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName)) +func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (*Publisher, error) { + if !validateAddress(destinationAdd) { + return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues) + } + + sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce)) if err != nil { return nil, err } return newPublisher(sender), nil } -// Management returns the management interface for the connection. -// See IManagement interface. -func (a *AmqpConnection) Management() IManagement { - return a.management -} - -// Dial creates a new AmqpConnection -// with a new AmqpManagement and a new LifeCycle. -// Returns a pointer to the new AmqpConnection -func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) { +// Dial connect to the AMQP 1.0 server using the provided connectionSettings +// Returns a pointer to the new AmqpConnection if successful else an error. +// addresses is a list of addresses to connect to. It picks one randomly. +// It is enough that one of the addresses is reachable. +func Dial(ctx context.Context, addresses []string, connOptions *amqp.ConnOptions) (*AmqpConnection, error) { conn := &AmqpConnection{ management: NewAmqpManagement(), lifeCycle: NewLifeCycle(), } - err := conn.open(ctx, addr, connOptions) - if err != nil { - return nil, err + tmp := make([]string, len(addresses)) + copy(tmp, addresses) + + // random pick and extract one address to use for connection + for len(tmp) > 0 { + idx := random(len(tmp)) + addr := tmp[idx] + // remove the index from the tmp list + tmp = append(tmp[:idx], tmp[idx+1:]...) + err := conn.open(ctx, addr, connOptions) + if err != nil { + Error("Failed to open connection", ExtractWithoutPassword(addr), err) + continue + } + Debug("Connected to", ExtractWithoutPassword(addr)) + return conn, nil } - return conn, nil + return nil, fmt.Errorf("no address to connect to") } // Open opens a connection to the AMQP 1.0 server. @@ -84,30 +96,42 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq if err != nil { return err } - err = a.Management().Open(ctx, a) + err = a.management.Open(ctx, a) if err != nil { // TODO close connection? return err } - a.lifeCycle.SetStatus(Open) + a.lifeCycle.SetState(&StateOpen{}) return nil } func (a *AmqpConnection) Close(ctx context.Context) error { - err := a.Management().Close(ctx) + err := a.management.Close(ctx) if err != nil { return err } err = a.Connection.Close() - a.lifeCycle.SetStatus(Closed) + a.lifeCycle.SetState(&StateClosed{}) return err } -func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) { +// NotifyStatusChange registers a channel to receive getState change notifications +// from the connection. +func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) { a.lifeCycle.chStatusChanged = channel } -func (a *AmqpConnection) Status() int { - return a.lifeCycle.Status() +func (a *AmqpConnection) State() LifeCycleState { + return a.lifeCycle.State() +} + +// *** management section *** + +// Management returns the management interface for the connection. +// The management interface is used to declare and delete exchanges, queues, and bindings. +func (a *AmqpConnection) Management() *AmqpManagement { + return a.management } + +//*** end management section *** diff --git a/rabbitmq_amqp/amqp_connection_test.go b/rabbitmq_amqp/amqp_connection_test.go index 4eb7196..fc24327 100644 --- a/rabbitmq_amqp/amqp_connection_test.go +++ b/rabbitmq_amqp/amqp_connection_test.go @@ -11,7 +11,7 @@ import ( var _ = Describe("AMQP Connection Test", func() { It("AMQP SASLTypeAnonymous Connection should succeed", func() { - connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{ + connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{ SASLType: amqp.SASLTypeAnonymous()}) Expect(err).To(BeNil()) err = connection.Close(context.Background()) @@ -20,7 +20,7 @@ var _ = Describe("AMQP Connection Test", func() { It("AMQP SASLTypePlain Connection should succeed", func() { - connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{ + connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{ SASLType: amqp.SASLTypePlain("guest", "guest")}) Expect(err).To(BeNil()) @@ -28,26 +28,37 @@ var _ = Describe("AMQP Connection Test", func() { Expect(err).To(BeNil()) }) + It("AMQP Connection connect to the one correct uri and fails the others", func() { + conn, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://"}, nil) + Expect(err).To(BeNil()) + Expect(conn.Close(context.Background())) + }) + It("AMQP Connection should fail due of wrong Port", func() { - _, err := Dial(context.Background(), "amqp://localhost:1234", nil) + _, err := Dial(context.Background(), []string{"amqp://localhost:1234"}, nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should fail due of wrong Host", func() { - _, err := Dial(context.Background(), "amqp://wrong_host:5672", nil) + _, err := Dial(context.Background(), []string{"amqp://wrong_host:5672"}, nil) + Expect(err).NotTo(BeNil()) + }) + + It("AMQP Connection should fails with all the wrong uris", func() { + _, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://nono"}, nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should fail due to context cancellation", func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) cancel() - _, err := Dial(ctx, "amqp://", nil) + _, err := Dial(ctx, []string{"amqp://"}, nil) Expect(err).NotTo(BeNil()) }) It("AMQP Connection should receive events", func() { - ch := make(chan *StatusChanged, 1) - connection, err := Dial(context.Background(), "amqp://", nil) + ch := make(chan *StateChanged, 1) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) connection.NotifyStatusChange(ch) err = connection.Close(context.Background()) @@ -55,8 +66,8 @@ var _ = Describe("AMQP Connection Test", func() { recv := <-ch Expect(recv).NotTo(BeNil()) - Expect(recv.From).To(Equal(Open)) - Expect(recv.To).To(Equal(Closed)) + Expect(recv.From).To(Equal(&StateOpen{})) + Expect(recv.To).To(Equal(&StateClosed{})) }) //It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() { diff --git a/rabbitmq_amqp/amqp_exchange.go b/rabbitmq_amqp/amqp_exchange.go index 62a54b1..b0e0299 100644 --- a/rabbitmq_amqp/amqp_exchange.go +++ b/rabbitmq_amqp/amqp_exchange.go @@ -9,7 +9,7 @@ type AmqpExchangeInfo struct { name string } -func newAmqpExchangeInfo(name string) IExchangeInfo { +func newAmqpExchangeInfo(name string) *AmqpExchangeInfo { return &AmqpExchangeInfo{name: name} } @@ -33,7 +33,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange { } } -func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) { +func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) { path, err := ExchangeAddress(&e.name, nil) if err != nil { return nil, err diff --git a/rabbitmq_amqp/amqp_exchange_test.go b/rabbitmq_amqp/amqp_exchange_test.go index 3248995..6867273 100644 --- a/rabbitmq_amqp/amqp_exchange_test.go +++ b/rabbitmq_amqp/amqp_exchange_test.go @@ -8,10 +8,10 @@ import ( ) var _ = Describe("AMQP Exchange test ", func() { - var connection IConnection - var management IManagement + var connection *AmqpConnection + var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), "amqp://", nil) + conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) connection = conn Expect(err).To(BeNil()) management = connection.Management() diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index c9d71c2..c806488 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -43,7 +43,7 @@ func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { if a.sender == nil { sender, err := a.session.NewSender(ctx, managementNodeAddress, - createSenderLinkOptions(managementNodeAddress, linkPairName)) + createSenderLinkOptions(managementNodeAddress, linkPairName, AtMostOnce)) if err != nil { return err } @@ -54,8 +54,8 @@ func (a *AmqpManagement) ensureSenderLink(ctx context.Context) error { return nil } -func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error { - session, err := connection.(*AmqpConnection).Connection.NewSession(ctx, nil) +func (a *AmqpManagement) Open(ctx context.Context, connection *AmqpConnection) error { + session, err := connection.Connection.NewSession(ctx, nil) if err != nil { return err } @@ -77,7 +77,7 @@ func (a *AmqpManagement) Open(ctx context.Context, connection IConnection) error // some channels or I/O or something elsewhere time.Sleep(time.Millisecond * 10) - a.lifeCycle.SetStatus(Open) + a.lifeCycle.SetState(&StateOpen{}) return ctx.Err() } @@ -85,7 +85,7 @@ func (a *AmqpManagement) Close(ctx context.Context) error { _ = a.sender.Close(ctx) _ = a.receiver.Close(ctx) err := a.session.Close(ctx) - a.lifeCycle.SetStatus(Closed) + a.lifeCycle.SetState(&StateClosed{}) return err } @@ -170,7 +170,7 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path return make(map[string]any), nil } -func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) { +func (a *AmqpManagement) DeclareQueue(ctx context.Context, specification *QueueSpecification) (*AmqpQueueInfo, error) { var amqpQueue *AmqpQueue if specification == nil || len(specification.Name) <= 0 { @@ -195,7 +195,7 @@ func (a *AmqpManagement) DeleteQueue(ctx context.Context, name string) error { return q.Delete(ctx) } -func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) { +func (a *AmqpManagement) DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (*AmqpExchangeInfo, error) { if exchangeSpecification == nil { return nil, fmt.Errorf("exchangeSpecification is nil") } @@ -224,7 +224,7 @@ func (a *AmqpManagement) Unbind(ctx context.Context, bindingPath string) error { bind := newAMQPBinding(a) return bind.Unbind(ctx, bindingPath) } -func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) { +func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (*AmqpQueueInfo, error) { path, err := QueueAddress(&queueName) if err != nil { return nil, err @@ -241,10 +241,10 @@ func (a *AmqpManagement) PurgeQueue(ctx context.Context, queueName string) (int, return purge.Purge(ctx) } -func (a *AmqpManagement) NotifyStatusChange(channel chan *StatusChanged) { +func (a *AmqpManagement) NotifyStatusChange(channel chan *StateChanged) { a.lifeCycle.chStatusChanged = channel } -func (a *AmqpManagement) Status() int { - return a.lifeCycle.Status() +func (a *AmqpManagement) State() LifeCycleState { + return a.lifeCycle.State() } diff --git a/rabbitmq_amqp/amqp_management_test.go b/rabbitmq_amqp/amqp_management_test.go index 3df3f9e..a00c430 100644 --- a/rabbitmq_amqp/amqp_management_test.go +++ b/rabbitmq_amqp/amqp_management_test.go @@ -11,7 +11,7 @@ import ( var _ = Describe("Management tests", func() { It("AMQP Management should fail due to context cancellation", func() { - connection, err := Dial(context.Background(), "amqp://", nil) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -22,8 +22,8 @@ var _ = Describe("Management tests", func() { }) It("AMQP Management should receive events", func() { - ch := make(chan *StatusChanged, 1) - connection, err := Dial(context.Background(), "amqp://", nil) + ch := make(chan *StateChanged, 1) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) connection.NotifyStatusChange(ch) err = connection.Close(context.Background()) @@ -31,14 +31,14 @@ var _ = Describe("Management tests", func() { recv := <-ch Expect(recv).NotTo(BeNil()) - Expect(recv.From).To(Equal(Open)) - Expect(recv.To).To(Equal(Closed)) + Expect(recv.From).To(Equal(&StateOpen{})) + Expect(recv.To).To(Equal(&StateClosed{})) Expect(connection.Close(context.Background())).To(BeNil()) }) It("Request", func() { - connection, err := Dial(context.Background(), "amqp://", nil) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) management := connection.Management() @@ -58,7 +58,7 @@ var _ = Describe("Management tests", func() { It("GET on non-existing queue returns ErrDoesNotExist", func() { - connection, err := Dial(context.Background(), "amqp://", nil) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) management := connection.Management() diff --git a/rabbitmq_amqp/amqp_publisher.go b/rabbitmq_amqp/amqp_publisher.go index c610131..fde42fe 100644 --- a/rabbitmq_amqp/amqp_publisher.go +++ b/rabbitmq_amqp/amqp_publisher.go @@ -5,6 +5,11 @@ import ( "github.com/Azure/go-amqp" ) +type PublishResult struct { + Outcome amqp.DeliveryState + Message *amqp.Message +} + type Publisher struct { sender *amqp.Sender } @@ -13,26 +18,33 @@ func newPublisher(sender *amqp.Sender) *Publisher { return &Publisher{sender: sender} } -func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) error { - - /// for the outcome of the message delivery, see https://github.com/Azure/go-amqp/issues/347 - //RELEASED - ///** - // * The broker could not route the message to any queue. - // * - // *

This is likely to be due to a topology misconfiguration. - // */ - // so at the moment we don't have access on this information - // TODO: remove this comment when the issue is resolved - - err := m.sender.Send(ctx, message, nil) - +// Publish sends a message to the destination address. +// The message is sent to the destination address and the outcome of the operation is returned. +// The outcome is a DeliveryState that indicates if the message was accepted or rejected. +// RabbitMQ supports the following DeliveryState types: +// - StateAccepted +// - StateReleased +// - StateRejected +// See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information. +func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) { + + r, err := m.sender.SendWithReceipt(ctx, message, nil) if err != nil { - return err + return nil, err + } + state, err := r.Wait(ctx) + if err != nil { + return nil, err + } + + publishResult := &PublishResult{ + Message: message, + Outcome: state, } - return nil + return publishResult, err } +// Close closes the publisher. func (m *Publisher) Close(ctx context.Context) error { return m.sender.Close(ctx) } diff --git a/rabbitmq_amqp/amqp_publisher_test.go b/rabbitmq_amqp/amqp_publisher_test.go index 173cebd..bfd2285 100644 --- a/rabbitmq_amqp/amqp_publisher_test.go +++ b/rabbitmq_amqp/amqp_publisher_test.go @@ -10,7 +10,7 @@ import ( var _ = Describe("AMQP publisher ", func() { It("Send a message to a queue with a Message Target Publisher", func() { qName := generateNameWithDateTime("Send a message to a queue with a Message Target Publisher") - connection, err := Dial(context.Background(), "amqp://", nil) + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) Expect(connection).NotTo(BeNil()) queueInfo, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ @@ -24,13 +24,90 @@ var _ = Describe("AMQP publisher ", func() { Expect(publisher).NotTo(BeNil()) Expect(publisher).To(BeAssignableToTypeOf(&Publisher{})) - err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) - + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) Expect(err).To(BeNil()) Expect(nMessages).To(Equal(1)) - Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(publisher.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + }) + + It("Publisher should fail to a not existing exchange", func() { + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + exchangeName := "Nope" + addr, err := ExchangeAddress(&exchangeName, nil) + Expect(err).To(BeNil()) + publisher, err := connection.Publisher(context.Background(), addr, "test") + Expect(err).NotTo(BeNil()) + Expect(publisher).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("Publisher should fail if the destination address does not start in the correct way", func() { + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + destinationAddress := "this is not valid since does not start with exchanges or queues" + Expect(err).To(BeNil()) + publisher, err := connection.Publisher(context.Background(), destinationAddress, "test") + Expect(err).NotTo(BeNil()) + Expect(publisher).To(BeNil()) + Expect(err.Error()).To(ContainSubstring("invalid destination address")) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("publishResult should released to a not existing routing key", func() { + eName := generateNameWithDateTime("publishResult should released to a not existing routing key") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + exchange, err := connection.Management().DeclareExchange(context.Background(), &ExchangeSpecification{ + Name: eName, + IsAutoDelete: false, + ExchangeType: ExchangeType{Type: Topic}, + }) + Expect(err).To(BeNil()) + Expect(exchange).NotTo(BeNil()) + routingKeyNope := "I don't exist" + addr, err := ExchangeAddress(&eName, &routingKeyNope) + Expect(err).To(BeNil()) + publisher, err := connection.Publisher(context.Background(), addr, "test") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) + Expect(connection.Management().DeleteExchange(context.Background(), eName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("Send a message to a deleted queue should fail", func() { + qName := generateNameWithDateTime("Send a message to a deleted queue should fail") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + Expect(connection).NotTo(BeNil()) + _, err = connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + dest, _ := QueueAddress(&qName) + publisher, err := connection.Publisher(context.Background(), dest, "test") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + Expect(err).To(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + err = connection.management.DeleteQueue(context.Background(), qName) + Expect(err).To(BeNil()) + publishResult, err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + Expect(err).NotTo(BeNil()) + Expect(connection.Close(context.Background())) }) }) diff --git a/rabbitmq_amqp/amqp_queue.go b/rabbitmq_amqp/amqp_queue.go index 525221d..16441c3 100644 --- a/rabbitmq_amqp/amqp_queue.go +++ b/rabbitmq_amqp/amqp_queue.go @@ -25,7 +25,7 @@ func (a *AmqpQueueInfo) Members() []string { return a.members } -func newAmqpQueueInfo(response map[string]any) IQueueInfo { +func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo { return &AmqpQueueInfo{ name: response["name"].(string), isDurable: response["durable"].(bool), @@ -133,7 +133,7 @@ func (a *AmqpQueue) validate() error { return nil } -func (a *AmqpQueue) Declare(ctx context.Context) (IQueueInfo, error) { +func (a *AmqpQueue) Declare(ctx context.Context) (*AmqpQueueInfo, error) { if Quorum == a.GetQueueType() || Stream == a.GetQueueType() { // mandatory arguments for quorum queues and streams diff --git a/rabbitmq_amqp/amqp_queue_test.go b/rabbitmq_amqp/amqp_queue_test.go index df8ecd6..305fc00 100644 --- a/rabbitmq_amqp/amqp_queue_test.go +++ b/rabbitmq_amqp/amqp_queue_test.go @@ -10,10 +10,10 @@ import ( ) var _ = Describe("AMQP Queue test ", func() { - var connection IConnection - var management IManagement + var connection *AmqpConnection + var management *AmqpManagement BeforeEach(func() { - conn, err := Dial(context.TODO(), "amqp://", nil) + conn, err := Dial(context.TODO(), []string{"amqp://"}, nil) Expect(err).To(BeNil()) connection = conn management = connection.Management() @@ -148,7 +148,7 @@ var _ = Describe("AMQP Queue test ", func() { It("AMQP Declare Queue should fail with Precondition fail", func() { // The first queue is declared as Classic, and it should succeed // The second queue is declared as Quorum, and it should fail since it is already declared as Classic - const queueName = "AMQP Declare Queue should fail with Precondition fail" + queueName := generateName("AMQP Declare Queue should fail with Precondition fail") _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ Name: queueName, @@ -168,7 +168,7 @@ var _ = Describe("AMQP Queue test ", func() { }) It("AMQP Declare Queue should fail during validation", func() { - const queueName = "AMQP Declare Queue should fail during validation" + queueName := generateName("AMQP Declare Queue should fail during validation") _, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ Name: queueName, MaxLengthBytes: -1, @@ -188,7 +188,7 @@ var _ = Describe("AMQP Queue test ", func() { }) It("AMQP Purge Queue should succeed and return the number of messages purged", func() { - const queueName = "AMQP Purge Queue should succeed and return the number of messages purged" + queueName := generateName("AMQP Purge Queue should succeed and return the number of messages purged") queueInfo, err := management.DeclareQueue(context.TODO(), &QueueSpecification{ Name: queueName, }) @@ -197,6 +197,8 @@ var _ = Describe("AMQP Queue test ", func() { purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) Expect(err).To(BeNil()) Expect(purged).To(Equal(10)) + err = management.DeleteQueue(context.TODO(), queueName) + Expect(err).To(BeNil()) }) It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() { @@ -207,33 +209,24 @@ var _ = Describe("AMQP Queue test ", func() { }) }) -// TODO: This should be replaced with this library's publish function -// but for the time being, we need a way to publish messages or test purposes func publishMessages(queueName string, count int) { - conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil) - if err != nil { - Fail(err.Error()) - } - session, err := conn.NewSession(context.TODO(), nil) - if err != nil { - Fail(err.Error()) - } + conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil) + Expect(err).To(BeNil()) address, err := QueueAddress(&queueName) - if err != nil { - Fail(err.Error()) - } + Expect(err).To(BeNil()) - sender, err := session.NewSender(context.TODO(), address, nil) - if err != nil { - Fail(err.Error()) - } + publisher, err := conn.Publisher(context.TODO(), address, "test") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) for i := 0; i < count; i++ { - err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil) - if err != nil { - Fail(err.Error()) - } + publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i)))) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) } + err = conn.Close(context.TODO()) + Expect(err).To(BeNil()) } diff --git a/rabbitmq_amqp/amqp_utils.go b/rabbitmq_amqp/amqp_utils.go index 157e545..5427be3 100644 --- a/rabbitmq_amqp/amqp_utils.go +++ b/rabbitmq_amqp/amqp_utils.go @@ -1,13 +1,29 @@ package rabbitmq_amqp -import "github.com/Azure/go-amqp" +import ( + "github.com/Azure/go-amqp" + "math/rand" + "time" +) + +const AtMostOnce = 0 +const AtLeastOnce = 1 // senderLinkOptions returns the options for a sender link // with the given address and link name. // That should be the same for all the links. -func createSenderLinkOptions(address string, linkName string) *amqp.SenderOptions { +func createSenderLinkOptions(address string, linkName string, deliveryMode int) *amqp.SenderOptions { prop := make(map[string]any) prop["paired"] = true + sndSettleMode := amqp.SenderSettleModeSettled.Ptr() + /// SndSettleMode = deliveryMode == DeliveryMode.AtMostOnce + // ? SenderSettleMode.Settled + // : SenderSettleMode.Unsettled, + + if deliveryMode == AtLeastOnce { + sndSettleMode = amqp.SenderSettleModeUnsettled.Ptr() + } + return &amqp.SenderOptions{ SourceAddress: address, DynamicAddress: false, @@ -15,7 +31,7 @@ func createSenderLinkOptions(address string, linkName string) *amqp.SenderOption ExpiryTimeout: 0, Name: linkName, Properties: prop, - SettlementMode: amqp.SenderSettleModeSettled.Ptr(), + SettlementMode: sndSettleMode, RequestedReceiverSettleMode: amqp.ReceiverSettleModeFirst.Ptr(), } } @@ -37,3 +53,8 @@ func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOp Credit: 100, } } + +func random(max int) int { + r := rand.New(rand.NewSource(time.Now().Unix())) + return r.Intn(max) +} diff --git a/rabbitmq_amqp/connection.go b/rabbitmq_amqp/connection.go deleted file mode 100644 index c669b74..0000000 --- a/rabbitmq_amqp/connection.go +++ /dev/null @@ -1,32 +0,0 @@ -package rabbitmq_amqp - -import ( - "context" - "github.com/Azure/go-amqp" -) - -type IConnection interface { - - // Close closes the connection to the AMQP 1.0 server. - Close(ctx context.Context) error - - // Management returns the management interface for the connection. - Management() IManagement - - // NotifyStatusChange registers a channel to receive status change notifications. - // The channel will receive a StatusChanged struct whenever the status of the connection changes. - NotifyStatusChange(channel chan *StatusChanged) - // Status returns the current status of the connection. - // See LifeCycle struct for more information. - Status() int - - // Publisher returns a new IPublisher interface for the connection. - Publisher(ctx context.Context, destinationAddr string, linkName string) (IPublisher, error) -} - -// IPublisher is an interface for publishers messages based. -// on the AMQP 1.0 protocol. -type IPublisher interface { - Publish(ctx context.Context, message *amqp.Message) error - Close(ctx context.Context) error -} diff --git a/rabbitmq_amqp/entities.go b/rabbitmq_amqp/entities.go index af40da3..2a9d72d 100644 --- a/rabbitmq_amqp/entities.go +++ b/rabbitmq_amqp/entities.go @@ -27,20 +27,6 @@ type QueueSpecification struct { DeadLetterRoutingKey string } -// IQueueInfo represents the information of a queue -// It is returned by the Declare method of IQueueSpecification -// The information come from the server -type IQueueInfo interface { - Name() string - IsDurable() bool - IsAutoDelete() bool - IsExclusive() bool - Type() TQueueType - Leader() string - Members() []string - Arguments() map[string]any -} - type TExchangeType string const ( @@ -57,13 +43,6 @@ func (e ExchangeType) String() string { return string(e.Type) } -// IExchangeInfo represents the information of an exchange -// It is empty at the moment because the server does not return any information -// We leave it here for future use. In case the server returns information about an exchange -type IExchangeInfo interface { - Name() string -} - type ExchangeSpecification struct { Name string IsAutoDelete bool diff --git a/rabbitmq_amqp/life_cycle.go b/rabbitmq_amqp/life_cycle.go index e7b5021..6de8c51 100644 --- a/rabbitmq_amqp/life_cycle.go +++ b/rabbitmq_amqp/life_cycle.go @@ -5,70 +5,102 @@ import ( "sync" ) +type LifeCycleState interface { + getState() int +} + +type StateOpen struct { +} + +func (o *StateOpen) getState() int { + return open +} + +type StateReconnecting struct { +} + +func (r *StateReconnecting) getState() int { + return reconnecting +} + +type StateClosing struct { +} + +func (c *StateClosing) getState() int { + return closing +} + +type StateClosed struct { +} + +func (c *StateClosed) getState() int { + return closed +} + const ( - Open = iota - Reconnecting = iota - Closing = iota - Closed = iota + open = iota + reconnecting = iota + closing = iota + closed = iota ) -func statusToString(status int) string { - switch status { - case Open: - return "Open" - case Reconnecting: - return "Reconnecting" - case Closing: - return "Closing" - case Closed: - return "Closed" +func statusToString(status LifeCycleState) string { + switch status.getState() { + case open: + return "open" + case reconnecting: + return "reconnecting" + case closing: + return "closing" + case closed: + return "closed" } - return "Unknown" + return "unknown" } -type StatusChanged struct { - From int - To int +type StateChanged struct { + From LifeCycleState + To LifeCycleState } -func (s StatusChanged) String() string { +func (s StateChanged) String() string { return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To)) } type LifeCycle struct { - status int - chStatusChanged chan *StatusChanged + state LifeCycleState + chStatusChanged chan *StateChanged mutex *sync.Mutex } func NewLifeCycle() *LifeCycle { return &LifeCycle{ - status: Closed, - mutex: &sync.Mutex{}, + state: &StateClosed{}, + mutex: &sync.Mutex{}, } } -func (l *LifeCycle) Status() int { +func (l *LifeCycle) State() LifeCycleState { l.mutex.Lock() defer l.mutex.Unlock() - return l.status + return l.state } -func (l *LifeCycle) SetStatus(value int) { +func (l *LifeCycle) SetState(value LifeCycleState) { l.mutex.Lock() defer l.mutex.Unlock() - if l.status == value { + if l.state == value { return } - oldState := l.status - l.status = value + oldState := l.state + l.state = value if l.chStatusChanged == nil { return } - l.chStatusChanged <- &StatusChanged{ + l.chStatusChanged <- &StateChanged{ From: oldState, To: value, } diff --git a/rabbitmq_amqp/log.go b/rabbitmq_amqp/log.go new file mode 100644 index 0000000..5f992e4 --- /dev/null +++ b/rabbitmq_amqp/log.go @@ -0,0 +1,19 @@ +package rabbitmq_amqp + +import "log/slog" + +func Info(msg string, args ...any) { + slog.Info(msg, args...) +} + +func Debug(msg string, args ...any) { + slog.Debug(msg, args...) +} + +func Error(msg string, args ...any) { + slog.Error(msg, args...) +} + +func Warn(msg string, args ...any) { + slog.Warn(msg, args...) +} diff --git a/rabbitmq_amqp/management.go b/rabbitmq_amqp/management.go deleted file mode 100644 index fc8b2be..0000000 --- a/rabbitmq_amqp/management.go +++ /dev/null @@ -1,44 +0,0 @@ -package rabbitmq_amqp - -import ( - "context" -) - -type IManagement interface { - // Open setups the sender and receiver links to the management interface. - Open(ctx context.Context, connection IConnection) error - // Close closes the sender and receiver links to the management interface. - Close(ctx context.Context) error - - // DeclareQueue creates a queue with the specified specification. - DeclareQueue(ctx context.Context, specification *QueueSpecification) (IQueueInfo, error) - // DeleteQueue deletes the queue with the specified name. - DeleteQueue(ctx context.Context, name string) error - // DeclareExchange creates an exchange with the specified specification. - DeclareExchange(ctx context.Context, exchangeSpecification *ExchangeSpecification) (IExchangeInfo, error) - // DeleteExchange deletes the exchange with the specified name. - DeleteExchange(ctx context.Context, name string) error - //Bind creates a binding between an exchange and a queue or exchange - Bind(ctx context.Context, bindingSpecification *BindingSpecification) (string, error) - // Unbind removes a binding between an exchange and a queue or exchange given the binding path. - Unbind(ctx context.Context, bindingPath string) error - // PurgeQueue removes all messages from the queue. Returns the number of messages purged. - PurgeQueue(ctx context.Context, queueName string) (int, error) - // QueueInfo returns information about the queue with the specified name. - QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) - - // Status returns the current status of the management interface. - // See LifeCycle struct for more information. - Status() int - - // NotifyStatusChange registers a channel to receive status change notifications. - // The channel will receive a StatusChanged struct whenever the status of the management interface changes. - NotifyStatusChange(channel chan *StatusChanged) - - //Request sends a request to the management interface with the specified body, path, and method. - //Returns the response body as a map[string]any. - //It usually is not necessary to call this method directly. Leave it public for custom use cases. - // The calls above are the recommended way to interact with the management interface. - Request(ctx context.Context, body any, path string, method string, - expectedResponseCodes []int) (map[string]any, error) -} diff --git a/rabbitmq_amqp/uri.go b/rabbitmq_amqp/uri.go index bcd232e..911ac3b 100644 --- a/rabbitmq_amqp/uri.go +++ b/rabbitmq_amqp/uri.go @@ -115,3 +115,13 @@ func ParseURI(uri string) (URI, error) { return builder, nil } + +// Extract the Uri by omitting the password + +func ExtractWithoutPassword(addr string) string { + u, err := ParseURI(addr) + if err != nil { + return "" + } + return u.Scheme + "://" + u.Username + "@*****" + u.Host + ":" + strconv.Itoa(u.Port) + u.Vhost +}