diff --git a/Makefile b/Makefile index 130afc8..2fec76d 100644 --- a/Makefile +++ b/Makefile @@ -4,10 +4,10 @@ format: go fmt ./... vet: - go vet ./pkg/rabbitmq_amqp + go vet ./pkg/rabbitmqamqp test: format vet - cd ./pkg/rabbitmq_amqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \ + cd ./pkg/rabbitmqamqp && go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo \ --randomize-all --randomize-suites \ --cover --coverprofile=coverage.txt --covermode=atomic \ --race diff --git a/README.md b/README.md index 5c512f0..e842951 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# RabbitMQ AMQP 1.0 .Golang Client +# RabbitMQ AMQP 1.0 Golang Client This library is meant to be used with RabbitMQ 4.0. Suitable for testing in pre-production environments. @@ -6,9 +6,19 @@ Suitable for testing in pre-production environments. ## Getting Started -- [Getting_started](docs/examples/getting_started) +- [Getting Started](docs/examples/getting_started) - [Examples](docs/examples) + +# Packages + +The rabbitmq amqp client is a wrapper around the azure amqp client. +You need the following packages to use the rabbitmq amqp client: + +- `rabbitmqamqp` - The main package for the rabbitmq amqp client. +- `amqp` - The azure amqp client (You may not need to use this package directly). + + ## Build from source - Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. diff --git a/docs/examples/getting_started/main.go b/docs/examples/getting_started/main.go index 1daf1ec..4c91d96 100644 --- a/docs/examples/getting_started/main.go +++ b/docs/examples/getting_started/main.go @@ -4,8 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/Azure/go-amqp" - "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" "time" ) @@ -14,62 +13,62 @@ func main() { queueName := "getting-started-go-queue" routingKey := "routing-key" - rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client") + rmq.Info("Getting started with AMQP Go AMQP 1.0 Client") /// Create a channel to receive state change notifications - stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1) - go func(ch chan *rabbitmq_amqp.StateChanged) { + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { for statusChanged := range ch { - rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged) + rmq.Info("[connection]", "Status changed", statusChanged) } }(stateChanged) - // rabbitmq_amqp.NewEnvironment setups the environment. + // rmq.NewEnvironment setups the environment. // The environment is used to create connections // given the same parameters - env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment([]string{"amqp://"}, nil) // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) amqpConnection, err := env.NewConnection(context.Background()) if err != nil { - rabbitmq_amqp.Error("Error opening connection", err) + rmq.Error("Error opening connection", err) return } // Register the channel to receive status change notifications // this is valid for the connection lifecycle amqpConnection.NotifyStatusChange(stateChanged) - rabbitmq_amqp.Info("AMQP connection opened.\n") + rmq.Info("AMQP connection opened.\n") // Create the management interface for the connection // so we can declare exchanges, queues, and bindings management := amqpConnection.Management() - exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.TopicExchangeSpecification{ + exchangeInfo, err := management.DeclareExchange(context.TODO(), &rmq.TopicExchangeSpecification{ Name: exchangeName, }) if err != nil { - rabbitmq_amqp.Error("Error declaring exchange", err) + rmq.Error("Error declaring exchange", err) return } // Declare a Quorum queue - queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{ + queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{ Name: queueName, }) if err != nil { - rabbitmq_amqp.Error("Error declaring queue", err) + rmq.Error("Error declaring queue", err) return } // Bind the queue to the exchange - bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.ExchangeToQueueBindingSpecification{ + bindingPath, err := management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ SourceExchange: exchangeName, DestinationQueue: queueName, BindingKey: routingKey, }) if err != nil { - rabbitmq_amqp.Error("Error binding", err) + rmq.Error("Error binding", err) return } @@ -78,7 +77,7 @@ func main() { consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { - rabbitmq_amqp.Error("Error creating consumer", err) + rmq.Error("Error creating consumer", err) return } @@ -90,61 +89,61 @@ func main() { deliveryContext, err := consumer.Receive(ctx) if errors.Is(err, context.Canceled) { // The consumer was closed correctly - rabbitmq_amqp.Info("[NewConsumer]", "consumer closed. Context", err) + rmq.Info("[NewConsumer]", "consumer closed. Context", err) return } if err != nil { // An error occurred receiving the message - rabbitmq_amqp.Error("[NewConsumer]", "Error receiving message", err) + rmq.Error("[NewConsumer]", "Error receiving message", err) return } - rabbitmq_amqp.Info("[NewConsumer]", "Received message", + rmq.Info("[NewConsumer]", "Received message", fmt.Sprintf("%s", deliveryContext.Message().Data)) err = deliveryContext.Accept(context.Background()) if err != nil { - rabbitmq_amqp.Error("Error accepting message", err) + rmq.Error("Error accepting message", err) return } } }(consumerContext) - publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{ + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ Exchange: exchangeName, Key: routingKey, }, "getting-started-publisher") if err != nil { - rabbitmq_amqp.Error("Error creating publisher", err) + rmq.Error("Error creating publisher", err) return } for i := 0; i < 100; i++ { // Publish a message to the exchange - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { - rabbitmq_amqp.Error("Error publishing message", "error", err) + rmq.Error("Error publishing message", "error", err) time.Sleep(1 * time.Second) continue } switch publishResult.Outcome.(type) { - case *amqp.StateAccepted: - rabbitmq_amqp.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0]) + case *rmq.StateAccepted: + rmq.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0]) break - case *amqp.StateReleased: - rabbitmq_amqp.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0]) + case *rmq.StateReleased: + rmq.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0]) break - case *amqp.StateRejected: - rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0]) - stateType := publishResult.Outcome.(*amqp.StateRejected) + case *rmq.StateRejected: + rmq.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { - rabbitmq_amqp.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error) + rmq.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error) } break default: // these status are not supported. Leave it for AMQP 1.0 compatibility // see: https://www.rabbitmq.com/docs/next/amqp#outcomes - rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + rmq.Warn("Message state: %v", publishResult.Outcome) } } @@ -157,13 +156,13 @@ func main() { //Close the consumer err = consumer.Close(context.Background()) if err != nil { - rabbitmq_amqp.Error("[NewConsumer]", err) + rmq.Error("[NewConsumer]", err) return } // Close the publisher err = publisher.Close(context.Background()) if err != nil { - rabbitmq_amqp.Error("[NewPublisher]", err) + rmq.Error("[NewPublisher]", err) return } @@ -171,27 +170,27 @@ func main() { err = management.Unbind(context.TODO(), bindingPath) if err != nil { - rabbitmq_amqp.Error("Error unbinding: %v\n", err) + rmq.Error("Error unbinding: %v\n", err) return } err = management.DeleteExchange(context.TODO(), exchangeInfo.Name()) if err != nil { - rabbitmq_amqp.Error("Error deleting exchange: %v\n", err) + rmq.Error("Error deleting exchange: %v\n", err) return } // Purge the queue purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) if err != nil { - rabbitmq_amqp.Error("Error purging queue: %v\n", err) + rmq.Error("Error purging queue: %v\n", err) return } - rabbitmq_amqp.Info("Purged %d messages from the queue.\n", purged) + rmq.Info("Purged %d messages from the queue.\n", purged) err = management.DeleteQueue(context.TODO(), queueInfo.Name()) if err != nil { - rabbitmq_amqp.Error("Error deleting queue: %v\n", err) + rmq.Error("Error deleting queue: %v\n", err) return } @@ -199,11 +198,11 @@ func main() { // to create new connections err = env.CloseConnections(context.Background()) if err != nil { - rabbitmq_amqp.Error("Error closing connection: %v\n", err) + rmq.Error("Error closing connection: %v\n", err) return } - rabbitmq_amqp.Info("AMQP connection closed.\n") + rmq.Info("AMQP connection closed.\n") // not necessary. It waits for the status change to be printed time.Sleep(100 * time.Millisecond) close(stateChanged) diff --git a/docs/examples/publisher_msg_targets/publisher_msg_targets.go b/docs/examples/publisher_msg_targets/publisher_msg_targets.go index f8dd4d6..19a19ca 100644 --- a/docs/examples/publisher_msg_targets/publisher_msg_targets.go +++ b/docs/examples/publisher_msg_targets/publisher_msg_targets.go @@ -3,12 +3,12 @@ package main import ( "context" "github.com/Azure/go-amqp" - "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" ) func checkError(err error) { if err != nil { - rabbitmq_amqp.Error("Error", err) + rmq.Error("Error", err) // it should not happen for the example // so panic just to make sure we catch it panic(err) @@ -16,15 +16,15 @@ func checkError(err error) { } func main() { - rabbitmq_amqp.Info("Define the publisher message targets") + rmq.Info("Define the publisher message targets") - env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment([]string{"amqp://"}, nil) amqpConnection, err := env.NewConnection(context.Background()) checkError(err) queues := []string{"queue1", "queue2", "queue3"} management := amqpConnection.Management() for _, queue := range queues { - _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{ + _, err = management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{ Name: queue, }) checkError(err) @@ -40,24 +40,24 @@ func main() { // with this helper function we create a message with a target // that is the same to create a message with: // msg := amqp.NewMessage([]byte("hello")) - // MessageToAddressHelper(msg, &QueueAddress{Queue: qName}) + // MessagePropertyToAddress(msg, &QueueAddress{Queue: qName}) // same like: // msg := amqp.NewMessage([]byte("hello")) // msg.Properties = &amqp.MessageProperties{} // msg.Properties.To = &address - // NewMessageToAddress and MessageToAddressHelper helpers are provided to make the + // NewMessageWithAddress and MessagePropertyToAddress helpers are provided to make the // code more readable and easier to use - msg, err := rabbitmq_amqp.NewMessageToAddress([]byte("Hello World"), - &rabbitmq_amqp.QueueAddress{Queue: queues[i%3]}) + msg, err := rmq.NewMessageWithAddress([]byte("Hello World"), + &rmq.QueueAddress{Queue: queues[i%3]}) checkError(err) publishResult, err := publisher.Publish(context.Background(), msg) checkError(err) switch publishResult.Outcome.(type) { case *amqp.StateAccepted: - rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) break default: - rabbitmq_amqp.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0]) + rmq.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0]) } } diff --git a/docs/examples/reliable/reliable.go b/docs/examples/reliable/reliable.go index d985732..7ae9931 100644 --- a/docs/examples/reliable/reliable.go +++ b/docs/examples/reliable/reliable.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "github.com/Azure/go-amqp" - "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" "sync" "sync/atomic" "time" @@ -26,30 +26,30 @@ func main() { time.Sleep(5 * time.Second) total := stateAccepted + stateReleased + stateRejected messagesPerSecond := float64(total) / time.Since(startTime).Seconds() - rabbitmq_amqp.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond) + rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond) } }() - rabbitmq_amqp.Info("How to deal with network disconnections") + rmq.Info("How to deal with network disconnections") signalBlock := sync.Cond{L: &sync.Mutex{}} /// Create a channel to receive state change notifications - stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1) - go func(ch chan *rabbitmq_amqp.StateChanged) { + stateChanged := make(chan *rmq.StateChanged, 1) + go func(ch chan *rmq.StateChanged) { for statusChanged := range ch { - rabbitmq_amqp.Info("[connection]", "Status changed", statusChanged) + rmq.Info("[connection]", "Status changed", statusChanged) switch statusChanged.To.(type) { - case *rabbitmq_amqp.StateOpen: + case *rmq.StateOpen: signalBlock.Broadcast() } } }(stateChanged) // Open a connection to the AMQP 1.0 server - amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, &rabbitmq_amqp.AmqpConnOptions{ + amqpConnection, err := rmq.Dial(context.Background(), []string{"amqp://"}, &rmq.AmqpConnOptions{ SASLType: amqp.SASLTypeAnonymous(), ContainerID: "reliable-amqp10-go", - RecoveryConfiguration: &rabbitmq_amqp.RecoveryConfiguration{ + RecoveryConfiguration: &rmq.RecoveryConfiguration{ ActiveRecovery: true, BackOffReconnectInterval: 2 * time.Second, // we reduce the reconnect interval to speed up the test. The default is 5 seconds // In production, you should avoid BackOffReconnectInterval with low values since it can cause a high number of reconnection attempts @@ -57,7 +57,7 @@ func main() { }, }) if err != nil { - rabbitmq_amqp.Error("Error opening connection", err) + rmq.Error("Error opening connection", err) return } // Register the channel to receive status change notifications @@ -69,17 +69,17 @@ func main() { management := amqpConnection.Management() // Declare a Quorum queue - queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{ + queueInfo, err := management.DeclareQueue(context.TODO(), &rmq.QuorumQueueSpecification{ Name: queueName, }) if err != nil { - rabbitmq_amqp.Error("Error declaring queue", err) + rmq.Error("Error declaring queue", err) return } consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { - rabbitmq_amqp.Error("Error creating consumer", err) + rmq.Error("Error creating consumer", err) return } @@ -97,9 +97,9 @@ func main() { // An error occurred receiving the message // here the consumer could be disconnected from the server due to a network error signalBlock.L.Lock() - rabbitmq_amqp.Info("[Consumer]", "Consumer is blocked, queue", queueName, "error", err) + rmq.Info("[Consumer]", "Consumer is blocked, queue", queueName, "error", err) signalBlock.Wait() - rabbitmq_amqp.Info("[Consumer]", "Consumer is unblocked, queue", queueName) + rmq.Info("[Consumer]", "Consumer is unblocked, queue", queueName) signalBlock.L.Unlock() continue @@ -116,11 +116,11 @@ func main() { } }(consumerContext) - publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.QueueAddress{ + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{ Queue: queueName, }, "reliable-publisher") if err != nil { - rabbitmq_amqp.Error("Error creating publisher", err) + rmq.Error("Error creating publisher", err) return } @@ -130,7 +130,7 @@ func main() { go func() { defer wg.Done() for i := 0; i < 500_000; i++ { - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { // here you need to deal with the error. You can store the message in a local in memory/persistent storage // then retry to send the message as soon as the connection is reestablished @@ -138,26 +138,26 @@ func main() { atomic.AddInt32(&failed, 1) // block signalBlock until the connection is reestablished signalBlock.L.Lock() - rabbitmq_amqp.Info("[Publisher]", "Publisher is blocked, queue", queueName, "error", err) + rmq.Info("[Publisher]", "Publisher is blocked, queue", queueName, "error", err) signalBlock.Wait() - rabbitmq_amqp.Info("[Publisher]", "Publisher is unblocked, queue", queueName) + rmq.Info("[Publisher]", "Publisher is unblocked, queue", queueName) signalBlock.L.Unlock() } else { switch publishResult.Outcome.(type) { - case *amqp.StateAccepted: + case *rmq.StateAccepted: atomic.AddInt32(&stateAccepted, 1) break - case *amqp.StateReleased: + case *rmq.StateReleased: atomic.AddInt32(&stateReleased, 1) break - case *amqp.StateRejected: + case *rmq.StateRejected: atomic.AddInt32(&stateRejected, 1) break default: // these status are not supported. Leave it for AMQP 1.0 compatibility // see: https://www.rabbitmq.com/docs/next/amqp#outcomes - rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + rmq.Warn("Message state: %v", publishResult.Outcome) } } } @@ -174,13 +174,13 @@ func main() { //Close the consumer err = consumer.Close(context.Background()) if err != nil { - rabbitmq_amqp.Error("[NewConsumer]", err) + rmq.Error("[NewConsumer]", err) return } // Close the publisher err = publisher.Close(context.Background()) if err != nil { - rabbitmq_amqp.Error("[NewPublisher]", err) + rmq.Error("[NewPublisher]", err) return } diff --git a/docs/examples/streams/streams.go b/docs/examples/streams/streams.go index 14bc061..fa5e6ba 100644 --- a/docs/examples/streams/streams.go +++ b/docs/examples/streams/streams.go @@ -2,14 +2,13 @@ package main import ( "context" - "github.com/Azure/go-amqp" - "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" "time" ) func checkError(err error) { if err != nil { - rabbitmq_amqp.Error("Error", err) + rmq.Error("Error", err) // it should not happen for the example // so panic just to make sure we catch it panic(err) @@ -18,59 +17,59 @@ func checkError(err error) { func main() { - rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example") + rmq.Info("Golang AMQP 1.0 Streams example") queueStream := "stream-go-queue-" + time.Now().String() - env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment([]string{"amqp://"}, nil) amqpConnection, err := env.NewConnection(context.Background()) checkError(err) management := amqpConnection.Management() // define a stream queue - _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{ + _, err = management.DeclareQueue(context.TODO(), &rmq.StreamQueueSpecification{ Name: queueStream, // it is a best practice to set the max length of the stream // to avoid the stream to grow indefinitely // the value here is low just for the example - MaxLengthBytes: rabbitmq_amqp.CapacityGB(5), + MaxLengthBytes: rmq.CapacityGB(5), }) checkError(err) // create a stream publisher. In this case we use the QueueAddress to make the example // simple. So we use the default exchange here. - publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher") + publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher") checkError(err) // publish messages to the stream for i := 0; i < 10; i++ { - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello World"))) + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello World"))) checkError(err) // check the outcome of the publishResult switch publishResult.Outcome.(type) { - case *amqp.StateAccepted: - rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + case *rmq.StateAccepted: + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) break - case *amqp.StateReleased: - rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + case *rmq.StateReleased: + rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) break - case *amqp.StateRejected: - rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) - stateType := publishResult.Outcome.(*amqp.StateRejected) + case *rmq.StateRejected: + rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { - rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) } break default: // these status are not supported. Leave it for AMQP 1.0 compatibility // see: https://www.rabbitmq.com/docs/next/amqp#outcomes - rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + rmq.Warn("Message state: %v", publishResult.Outcome) } } // create a stream consumer - consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{ + consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rmq.StreamConsumerOptions{ // the offset is set to the first chunk of the stream // so here it starts from the beginning - Offset: &rabbitmq_amqp.OffsetFirst{}, + Offset: &rmq.OffsetFirst{}, }) checkError(err) @@ -78,7 +77,7 @@ func main() { for i := 0; i < 10; i++ { deliveryContext, err := consumer.Receive(context.Background()) checkError(err) - rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) + rmq.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) // accept the message err = deliveryContext.Accept(context.Background()) checkError(err) @@ -94,5 +93,5 @@ func main() { err = env.CloseConnections(context.Background()) checkError(err) - rabbitmq_amqp.Info("Example completed") + rmq.Info("Example completed") } diff --git a/docs/examples/streams_filtering/streams_filtering.go b/docs/examples/streams_filtering/streams_filtering.go index c097f9a..995581a 100644 --- a/docs/examples/streams_filtering/streams_filtering.go +++ b/docs/examples/streams_filtering/streams_filtering.go @@ -2,14 +2,13 @@ package main import ( "context" - "github.com/Azure/go-amqp" - "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" "time" ) func checkError(err error) { if err != nil { - rabbitmq_amqp.Error("Error", err) + rmq.Error("Error", err) // it should not happen for the example // so panic just to make sure we catch it panic(err) @@ -18,67 +17,62 @@ func checkError(err error) { func main() { - rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example with filtering") + rmq.Info("Golang AMQP 1.0 Streams example with filtering") queueStream := "stream-go-queue-filtering-" + time.Now().String() - env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + env := rmq.NewEnvironment([]string{"amqp://"}, nil) amqpConnection, err := env.NewConnection(context.Background()) checkError(err) management := amqpConnection.Management() // define a stream queue - _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{ + _, err = management.DeclareQueue(context.TODO(), &rmq.StreamQueueSpecification{ Name: queueStream, // it is a best practice to set the max length of the stream // to avoid the stream to grow indefinitely // the value here is low just for the example - MaxLengthBytes: rabbitmq_amqp.CapacityGB(5), + MaxLengthBytes: rmq.CapacityGB(5), }) checkError(err) // create a stream publisher. In this case we use the QueueAddress to make the example // simple. So we use the default exchange here. - publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher") + publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher") checkError(err) filters := []string{"MyFilter1", "MyFilter2", "MyFilter3", "MyFilter4"} // publish messages to the stream for i := 0; i < 40; i++ { - msg := amqp.NewMessage([]byte("Hello World! with filter:" + filters[i%4])) - // add a filter to the message - msg.Annotations = amqp.Annotations{ - // here we set the filter value taken from the filters array - rabbitmq_amqp.StreamFilterValue: filters[i%4], - } + msg := rmq.NewMessageWithFilter([]byte("Hello World"), filters[i%4]) publishResult, err := publisher.Publish(context.Background(), msg) checkError(err) // check the outcome of the publishResult switch publishResult.Outcome.(type) { - case *amqp.StateAccepted: - rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + case *rmq.StateAccepted: + rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) break - case *amqp.StateReleased: - rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + case *rmq.StateReleased: + rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) break - case *amqp.StateRejected: - rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) - stateType := publishResult.Outcome.(*amqp.StateRejected) + case *rmq.StateRejected: + rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*rmq.StateRejected) if stateType.Error != nil { - rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) } break default: // these status are not supported. Leave it for AMQP 1.0 compatibility // see: https://www.rabbitmq.com/docs/next/amqp#outcomes - rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + rmq.Warn("Message state: %v", publishResult.Outcome) } } // create a stream consumer - consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{ + consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rmq.StreamConsumerOptions{ // the offset is set to the first chunk of the stream // so here it starts from the beginning - Offset: &rabbitmq_amqp.OffsetFirst{}, + Offset: &rmq.OffsetFirst{}, // add a filter to the consumer, in this case we use only the filter values // MyFilter1 and MyFilter2. So all other messages won't be received @@ -92,7 +86,7 @@ func main() { for i := 0; i < 20; i++ { deliveryContext, err := consumer.Receive(context.Background()) checkError(err) - rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) + rmq.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) // accept the message err = deliveryContext.Accept(context.Background()) checkError(err) @@ -108,5 +102,5 @@ func main() { err = env.CloseConnections(context.Background()) checkError(err) - rabbitmq_amqp.Info("Example completed") + rmq.Info("Example completed") } diff --git a/pkg/rabbitmq_amqp/address.go b/pkg/rabbitmqamqp/address.go similarity index 80% rename from pkg/rabbitmq_amqp/address.go rename to pkg/rabbitmqamqp/address.go index 6958c87..6c08c38 100644 --- a/pkg/rabbitmq_amqp/address.go +++ b/pkg/rabbitmqamqp/address.go @@ -1,9 +1,8 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "errors" "fmt" - "github.com/Azure/go-amqp" "strings" ) @@ -45,38 +44,6 @@ func (eas *ExchangeAddress) toAddress() (string, error) { return exchangeAddress(ex, k) } -// MessageToAddressHelper sets the To property of the message to the address of the target. -// The target must be a QueueAddress or an ExchangeAddress. -// Note: The field msgRef.Properties.To will be overwritten if it is already set. -func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error { - if target == nil { - return errors.New("target cannot be nil") - } - - address, err := target.toAddress() - if err != nil { - return err - } - - if msgRef.Properties == nil { - msgRef.Properties = &amqp.MessageProperties{} - } - msgRef.Properties.To = &address - return nil -} - -// NewMessageToAddress creates a new message with the given payload and sets the To property to the address of the target. -// The target must be a QueueAddress or an ExchangeAddress. -// This function is a helper that combines NewMessage and MessageToAddressHelper. -func NewMessageToAddress(msg []byte, target TargetAddress) (*amqp.Message, error) { - message := amqp.NewMessage(msg) - err := MessageToAddressHelper(message, target) - if err != nil { - return nil, err - } - return message, nil -} - // address Creates the address for the exchange or queue following the RabbitMQ conventions. // see: https://www.rabbitmq.com/docs/next/amqp#address-v2 func address(exchange, key, queue *string, urlParameters *string) (string, error) { diff --git a/pkg/rabbitmq_amqp/address_test.go b/pkg/rabbitmqamqp/address_test.go similarity index 98% rename from pkg/rabbitmq_amqp/address_test.go rename to pkg/rabbitmqamqp/address_test.go index 4a0cc9f..4a0f114 100644 --- a/pkg/rabbitmq_amqp/address_test.go +++ b/pkg/rabbitmqamqp/address_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( . "github.com/onsi/ginkgo/v2" diff --git a/pkg/rabbitmq_amqp/amqp_binding.go b/pkg/rabbitmqamqp/amqp_binding.go similarity index 98% rename from pkg/rabbitmq_amqp/amqp_binding.go rename to pkg/rabbitmqamqp/amqp_binding.go index 0a42835..5638c3a 100644 --- a/pkg/rabbitmq_amqp/amqp_binding.go +++ b/pkg/rabbitmqamqp/amqp_binding.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_binding_test.go b/pkg/rabbitmqamqp/amqp_binding_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_binding_test.go rename to pkg/rabbitmqamqp/amqp_binding_test.go index 4930fab..40cda11 100644 --- a/pkg/rabbitmq_amqp/amqp_binding_test.go +++ b/pkg/rabbitmqamqp/amqp_binding_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_connection.go b/pkg/rabbitmqamqp/amqp_connection.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_connection.go rename to pkg/rabbitmqamqp/amqp_connection.go index 55ad5dc..c3ad61c 100644 --- a/pkg/rabbitmq_amqp/amqp_connection.go +++ b/pkg/rabbitmqamqp/amqp_connection.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_connection_recovery.go b/pkg/rabbitmqamqp/amqp_connection_recovery.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_connection_recovery.go rename to pkg/rabbitmqamqp/amqp_connection_recovery.go index 3869f12..ae22678 100644 --- a/pkg/rabbitmq_amqp/amqp_connection_recovery.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "sync" diff --git a/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_connection_recovery_test.go rename to pkg/rabbitmqamqp/amqp_connection_recovery_test.go index b1b45bd..067de7b 100644 --- a/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_recovery_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_connection_test.go b/pkg/rabbitmqamqp/amqp_connection_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_connection_test.go rename to pkg/rabbitmqamqp/amqp_connection_test.go index acfd54d..7119001 100644 --- a/pkg/rabbitmq_amqp/amqp_connection_test.go +++ b/pkg/rabbitmqamqp/amqp_connection_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_consumer.go b/pkg/rabbitmqamqp/amqp_consumer.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_consumer.go rename to pkg/rabbitmqamqp/amqp_consumer.go index 0ebbd1b..3d36e6a 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer.go +++ b/pkg/rabbitmqamqp/amqp_consumer.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go similarity index 98% rename from pkg/rabbitmq_amqp/amqp_consumer_stream_test.go rename to pkg/rabbitmqamqp/amqp_consumer_stream_test.go index afe207a..b366320 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_stream_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" @@ -21,10 +21,7 @@ func publishMessagesWithStreamTag(queueName string, filterValue string, count in for i := 0; i < count; i++ { body := filterValue + " #" + strconv.Itoa(i) - msg := amqp.NewMessage([]byte(body)) - msg.Annotations = amqp.Annotations{ - StreamFilterValue: filterValue, - } + msg := NewMessageWithFilter([]byte(body), filterValue) publishResult, err := publisher.Publish(context.TODO(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) diff --git a/pkg/rabbitmq_amqp/amqp_consumer_test.go b/pkg/rabbitmqamqp/amqp_consumer_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_consumer_test.go rename to pkg/rabbitmqamqp/amqp_consumer_test.go index 3b2fd04..af46ac3 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_test.go +++ b/pkg/rabbitmqamqp/amqp_consumer_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_environment.go b/pkg/rabbitmqamqp/amqp_environment.go similarity index 98% rename from pkg/rabbitmq_amqp/amqp_environment.go rename to pkg/rabbitmqamqp/amqp_environment.go index aac04b7..dd55901 100644 --- a/pkg/rabbitmq_amqp/amqp_environment.go +++ b/pkg/rabbitmqamqp/amqp_environment.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_environment_test.go b/pkg/rabbitmqamqp/amqp_environment_test.go similarity index 98% rename from pkg/rabbitmq_amqp/amqp_environment_test.go rename to pkg/rabbitmqamqp/amqp_environment_test.go index 0e102a8..4125f79 100644 --- a/pkg/rabbitmq_amqp/amqp_environment_test.go +++ b/pkg/rabbitmqamqp/amqp_environment_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_exchange.go b/pkg/rabbitmqamqp/amqp_exchange.go similarity index 98% rename from pkg/rabbitmq_amqp/amqp_exchange.go rename to pkg/rabbitmqamqp/amqp_exchange.go index bfc42ad..5714a03 100644 --- a/pkg/rabbitmq_amqp/amqp_exchange.go +++ b/pkg/rabbitmqamqp/amqp_exchange.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_exchange_test.go b/pkg/rabbitmqamqp/amqp_exchange_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_exchange_test.go rename to pkg/rabbitmqamqp/amqp_exchange_test.go index e02b800..dad934e 100644 --- a/pkg/rabbitmq_amqp/amqp_exchange_test.go +++ b/pkg/rabbitmqamqp/amqp_exchange_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_management.go b/pkg/rabbitmqamqp/amqp_management.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_management.go rename to pkg/rabbitmqamqp/amqp_management.go index 3fc443c..e638141 100644 --- a/pkg/rabbitmq_amqp/amqp_management.go +++ b/pkg/rabbitmqamqp/amqp_management.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_management_test.go b/pkg/rabbitmqamqp/amqp_management_test.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_management_test.go rename to pkg/rabbitmqamqp/amqp_management_test.go index b05ccde..24ec902 100644 --- a/pkg/rabbitmq_amqp/amqp_management_test.go +++ b/pkg/rabbitmqamqp/amqp_management_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_publisher.go b/pkg/rabbitmqamqp/amqp_publisher.go similarity index 93% rename from pkg/rabbitmq_amqp/amqp_publisher.go rename to pkg/rabbitmqamqp/amqp_publisher.go index de4b38c..75991d2 100644 --- a/pkg/rabbitmq_amqp/amqp_publisher.go +++ b/pkg/rabbitmqamqp/amqp_publisher.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" @@ -9,7 +9,7 @@ import ( ) type PublishResult struct { - Outcome amqp.DeliveryState + Outcome DeliveryState Message *amqp.Message } @@ -63,12 +63,12 @@ RabbitMQ supports the following DeliveryState types: See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information. Note: If the destination address is not defined during the creation, the message must have a TO property set. -You can use the helper "MessageToAddressHelper" to create the destination address. +You can use the helper "MessagePropertyToAddress" to create the destination address. See the examples: Create a new publisher that sends messages to a specific destination address: - publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmq_amqp.ExchangeAddress{ + publisher, err := amqpConnection.NewPublisher(context.Background(), &rabbitmqamqp.ExchangeAddress{ Exchange: "myExchangeName", Key: "myRoutingKey", } @@ -81,7 +81,7 @@ Create a new publisher that sends messages based on message destination address: publisher, err := connection.NewPublisher(context.Background(), nil, "test") msg := amqp.NewMessage([]byte("hello")) - ..:= MessageToAddressHelper(msg, &QueueAddress{Queue: "myQueueName"}) + ..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"}) ..:= publisher.Publish(context.Background(), msg) diff --git a/pkg/rabbitmq_amqp/amqp_publisher_test.go b/pkg/rabbitmqamqp/amqp_publisher_test.go similarity index 87% rename from pkg/rabbitmq_amqp/amqp_publisher_test.go rename to pkg/rabbitmqamqp/amqp_publisher_test.go index 4f9d347..91c54ba 100644 --- a/pkg/rabbitmq_amqp/amqp_publisher_test.go +++ b/pkg/rabbitmqamqp/amqp_publisher_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" @@ -23,7 +23,7 @@ var _ = Describe("AMQP publisher ", func() { Expect(publisher).NotTo(BeNil()) Expect(publisher).To(BeAssignableToTypeOf(&Publisher{})) - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + publishResult, err := publisher.Publish(context.Background(), NewMessage([]byte("hello"))) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) @@ -65,10 +65,10 @@ var _ = Describe("AMQP publisher ", func() { }, "test") Expect(err).To(BeNil()) Expect(publisher).NotTo(BeNil()) - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + publishResult, err := publisher.Publish(context.Background(), NewMessage([]byte("hello"))) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) + Expect(publishResult.Outcome).To(Equal(&StateReleased{})) Expect(connection.Management().DeleteExchange(context.Background(), eName)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) }) @@ -85,12 +85,12 @@ var _ = Describe("AMQP publisher ", func() { publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: qName}, "test") Expect(err).To(BeNil()) Expect(publisher).NotTo(BeNil()) - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + publishResult, err := publisher.Publish(context.Background(), NewMessage([]byte("hello"))) Expect(err).To(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) err = connection.management.DeleteQueue(context.Background(), qName) Expect(err).To(BeNil()) - publishResult, err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("hello"))) + publishResult, err = publisher.Publish(context.Background(), NewMessage([]byte("hello"))) Expect(err).NotTo(BeNil()) Expect(connection.Close(context.Background())) }) @@ -104,12 +104,12 @@ var _ = Describe("AMQP publisher ", func() { Expect(publisher).NotTo(BeNil()) qName := generateNameWithDateTime("Targets NewPublisher should fail when the destination does not exist") msg := amqp.NewMessage([]byte("hello")) - Expect(MessageToAddressHelper(msg, &QueueAddress{Queue: qName})).To(BeNil()) + Expect(MessagePropertyToAddress(msg, &QueueAddress{Queue: qName})).To(BeNil()) publishResult, err := publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) - msg, err = NewMessageToAddress([]byte("hello"), &QueueAddress{Queue: qName}) + msg, err = NewMessageWithAddress([]byte("hello"), &QueueAddress{Queue: qName}) Expect(err).To(BeNil()) publishResult, err = publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) @@ -134,25 +134,25 @@ var _ = Describe("AMQP publisher ", func() { Expect(err).To(BeNil()) // as first message is sent to a queue, the outcome should be StateReceived // since the message was accepted by the existing queue - msg := amqp.NewMessage([]byte("hello")) - Expect(MessageToAddressHelper(msg, &QueueAddress{Queue: name})).To(BeNil()) + msg := NewMessage([]byte("hello")) + Expect(MessagePropertyToAddress(msg, &QueueAddress{Queue: name})).To(BeNil()) publishResult, err := publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) _, err = connection.Management().DeclareExchange(context.Background(), &TopicExchangeSpecification{ Name: name, IsAutoDelete: false, }) - msg = amqp.NewMessage([]byte("hello")) - Expect(MessageToAddressHelper(msg, &ExchangeAddress{Exchange: name})).To(BeNil()) + msg = NewMessage([]byte("hello")) + Expect(MessagePropertyToAddress(msg, &ExchangeAddress{Exchange: name})).To(BeNil()) // the status should be StateReleased since the exchange does not have any binding publishResult, err = publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) + Expect(publishResult.Outcome).To(Equal(&StateReleased{})) // Create the binding between the exchange and the queue _, err = connection.Management().Bind(context.Background(), &ExchangeToQueueBindingSpecification{ @@ -162,12 +162,12 @@ var _ = Describe("AMQP publisher ", func() { }) Expect(err).To(BeNil()) // the status should be StateAccepted since the exchange has a binding - msg = amqp.NewMessage([]byte("hello")) - Expect(MessageToAddressHelper(msg, &ExchangeAddress{Exchange: name})).To(BeNil()) + msg = NewMessage([]byte("hello")) + Expect(MessagePropertyToAddress(msg, &ExchangeAddress{Exchange: name})).To(BeNil()) publishResult, err = publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) Expect(connection.Management().DeleteQueue(context.Background(), name)).To(BeNil()) Expect(connection.Management().DeleteExchange(context.Background(), name)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) @@ -180,7 +180,7 @@ var _ = Describe("AMQP publisher ", func() { publisher, err := connection.NewPublisher(context.Background(), nil, "test") Expect(err).To(BeNil()) Expect(publisher).NotTo(BeNil()) - msg := amqp.NewMessage([]byte("hello")) + msg := NewMessage([]byte("hello")) // the message should fail since the TO property is not set publishResult, err := publisher.Publish(context.Background(), msg) Expect(err).NotTo(BeNil()) diff --git a/pkg/rabbitmq_amqp/amqp_queue.go b/pkg/rabbitmqamqp/amqp_queue.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_queue.go rename to pkg/rabbitmqamqp/amqp_queue.go index 9890713..8bbdef4 100644 --- a/pkg/rabbitmq_amqp/amqp_queue.go +++ b/pkg/rabbitmqamqp/amqp_queue.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" diff --git a/pkg/rabbitmq_amqp/amqp_queue_test.go b/pkg/rabbitmqamqp/amqp_queue_test.go similarity index 97% rename from pkg/rabbitmq_amqp/amqp_queue_test.go rename to pkg/rabbitmqamqp/amqp_queue_test.go index b30d269..d0f2d26 100644 --- a/pkg/rabbitmq_amqp/amqp_queue_test.go +++ b/pkg/rabbitmqamqp/amqp_queue_test.go @@ -1,12 +1,10 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "context" - "strconv" - - "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "strconv" ) var _ = Describe("AMQP Queue test ", func() { @@ -230,10 +228,10 @@ func publishMessages(queueName string, count int, args ...string) { body = args[0] } - publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte(body))) + publishResult, err := publisher.Publish(context.TODO(), NewMessage([]byte(body))) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) - Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + Expect(publishResult.Outcome).To(Equal(&StateAccepted{})) } err = conn.Close(context.TODO()) Expect(err).To(BeNil()) diff --git a/pkg/rabbitmq_amqp/amqp_types.go b/pkg/rabbitmqamqp/amqp_types.go similarity index 93% rename from pkg/rabbitmq_amqp/amqp_types.go rename to pkg/rabbitmqamqp/amqp_types.go index 4dca596..31b1e27 100644 --- a/pkg/rabbitmq_amqp/amqp_types.go +++ b/pkg/rabbitmqamqp/amqp_types.go @@ -1,10 +1,18 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "github.com/Azure/go-amqp" "github.com/google/uuid" ) +// the following types are alias to the go-amqp package + +type DeliveryState = amqp.DeliveryState +type StateAccepted = amqp.StateAccepted +type StateRejected = amqp.StateRejected +type StateReleased = amqp.StateReleased +type StateModified = amqp.StateModified + type linkerName interface { linkName() string } diff --git a/pkg/rabbitmq_amqp/amqp_utils.go b/pkg/rabbitmqamqp/amqp_utils.go similarity index 99% rename from pkg/rabbitmq_amqp/amqp_utils.go rename to pkg/rabbitmqamqp/amqp_utils.go index 888f981..ea0f05f 100644 --- a/pkg/rabbitmq_amqp/amqp_utils.go +++ b/pkg/rabbitmqamqp/amqp_utils.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "fmt" diff --git a/pkg/rabbitmq_amqp/common.go b/pkg/rabbitmqamqp/common.go similarity index 98% rename from pkg/rabbitmq_amqp/common.go rename to pkg/rabbitmqamqp/common.go index 40b62de..072cf9d 100644 --- a/pkg/rabbitmq_amqp/common.go +++ b/pkg/rabbitmqamqp/common.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "crypto/md5" diff --git a/pkg/rabbitmq_amqp/converters.go b/pkg/rabbitmqamqp/converters.go similarity index 98% rename from pkg/rabbitmq_amqp/converters.go rename to pkg/rabbitmqamqp/converters.go index f0790d3..de2eb37 100644 --- a/pkg/rabbitmq_amqp/converters.go +++ b/pkg/rabbitmqamqp/converters.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "fmt" diff --git a/pkg/rabbitmq_amqp/converters_test.go b/pkg/rabbitmqamqp/converters_test.go similarity index 98% rename from pkg/rabbitmq_amqp/converters_test.go rename to pkg/rabbitmqamqp/converters_test.go index 39180ab..07b69cb 100644 --- a/pkg/rabbitmq_amqp/converters_test.go +++ b/pkg/rabbitmqamqp/converters_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "fmt" diff --git a/pkg/rabbitmq_amqp/entities.go b/pkg/rabbitmqamqp/entities.go similarity index 99% rename from pkg/rabbitmq_amqp/entities.go rename to pkg/rabbitmqamqp/entities.go index 471639b..c35914b 100644 --- a/pkg/rabbitmq_amqp/entities.go +++ b/pkg/rabbitmqamqp/entities.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp type entityIdentifier interface { Id() string diff --git a/pkg/rabbitmq_amqp/life_cycle.go b/pkg/rabbitmqamqp/life_cycle.go similarity index 98% rename from pkg/rabbitmq_amqp/life_cycle.go rename to pkg/rabbitmqamqp/life_cycle.go index 3c47e9d..c4b4c3b 100644 --- a/pkg/rabbitmq_amqp/life_cycle.go +++ b/pkg/rabbitmqamqp/life_cycle.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "fmt" diff --git a/pkg/rabbitmq_amqp/log.go b/pkg/rabbitmqamqp/log.go similarity index 92% rename from pkg/rabbitmq_amqp/log.go rename to pkg/rabbitmqamqp/log.go index 5f992e4..a2ecde5 100644 --- a/pkg/rabbitmq_amqp/log.go +++ b/pkg/rabbitmqamqp/log.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import "log/slog" diff --git a/pkg/rabbitmqamqp/messages_helper.go b/pkg/rabbitmqamqp/messages_helper.go new file mode 100644 index 0000000..726ba6d --- /dev/null +++ b/pkg/rabbitmqamqp/messages_helper.go @@ -0,0 +1,54 @@ +package rabbitmqamqp + +import ( + "errors" + "github.com/Azure/go-amqp" +) + +// MessagePropertyToAddress sets the To property of the message to the address of the target. +// The target must be a QueueAddress or an ExchangeAddress. +// Note: The field msgRef.Properties.To will be overwritten if it is already set. +func MessagePropertyToAddress(msgRef *amqp.Message, target TargetAddress) error { + if target == nil { + return errors.New("target cannot be nil") + } + + address, err := target.toAddress() + if err != nil { + return err + } + + if msgRef.Properties == nil { + msgRef.Properties = &amqp.MessageProperties{} + } + msgRef.Properties.To = &address + return nil +} + +// NewMessage creates a new AMQP 1.0 message with the given payload. +func NewMessage(body []byte) *amqp.Message { + return amqp.NewMessage(body) +} + +// NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target. +// The target must be a QueueAddress or an ExchangeAddress. +// This function is a helper that combines NewMessage and MessagePropertyToAddress. +func NewMessageWithAddress(body []byte, target TargetAddress) (*amqp.Message, error) { + message := amqp.NewMessage(body) + err := MessagePropertyToAddress(message, target) + if err != nil { + return nil, err + } + return message, nil +} + +// NewMessageWithFilter creates a new AMQP 1.0 message with the given payload and sets the +// StreamFilterValue property to the filter value. +func NewMessageWithFilter(body []byte, filter string) *amqp.Message { + msg := amqp.NewMessage(body) + msg.Annotations = amqp.Annotations{ + // here we set the filter value taken from the filters array + StreamFilterValue: filter, + } + return msg +} diff --git a/pkg/rabbitmq_amqp/pkg_suite_test.go b/pkg/rabbitmqamqp/pkg_suite_test.go similarity index 86% rename from pkg/rabbitmq_amqp/pkg_suite_test.go rename to pkg/rabbitmqamqp/pkg_suite_test.go index 08ee73d..e074a1a 100644 --- a/pkg/rabbitmq_amqp/pkg_suite_test.go +++ b/pkg/rabbitmqamqp/pkg_suite_test.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp_test +package rabbitmqamqp_test import ( "testing" diff --git a/pkg/rabbitmq_amqp/test_utils.go b/pkg/rabbitmqamqp/test_utils.go similarity index 88% rename from pkg/rabbitmq_amqp/test_utils.go rename to pkg/rabbitmqamqp/test_utils.go index 35801a4..a88678e 100644 --- a/pkg/rabbitmq_amqp/test_utils.go +++ b/pkg/rabbitmqamqp/test_utils.go @@ -1,4 +1,4 @@ -package rabbitmq_amqp +package rabbitmqamqp import ( "fmt" diff --git a/pkg/rabbitmq_amqp/uri.go b/pkg/rabbitmqamqp/uri.go similarity index 99% rename from pkg/rabbitmq_amqp/uri.go rename to pkg/rabbitmqamqp/uri.go index 911ac3b..2c7f8ae 100644 --- a/pkg/rabbitmq_amqp/uri.go +++ b/pkg/rabbitmqamqp/uri.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package rabbitmq_amqp +package rabbitmqamqp import ( "errors" diff --git a/pkg/rabbitmq_amqp/uri_test.go b/pkg/rabbitmqamqp/uri_test.go similarity index 99% rename from pkg/rabbitmq_amqp/uri_test.go rename to pkg/rabbitmqamqp/uri_test.go index 319056f..8f42f9f 100644 --- a/pkg/rabbitmq_amqp/uri_test.go +++ b/pkg/rabbitmqamqp/uri_test.go @@ -3,7 +3,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package rabbitmq_amqp +package rabbitmqamqp import ( . "github.com/onsi/ginkgo/v2"