diff --git a/README.md b/README.md index eb44fe8..5c512f0 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,18 @@ # RabbitMQ AMQP 1.0 .Golang Client -This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. +This library is meant to be used with RabbitMQ 4.0. +Suitable for testing in pre-production environments. -## How to Run - -- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. -- `make test` to run the tests -- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop` ## Getting Started -You can find an example in: `docs/examples/getting_started` +- [Getting_started](docs/examples/getting_started) +- [Examples](docs/examples) -## Examples +## Build from source -You can find more examples in: `docs/examples` +- Start the broker with `./.ci/ubuntu/gha-setup.sh start`. Note that this has been tested on Ubuntu 22 with docker. +- `make test` to run the tests +- Stop RabbitMQ with `./.ci/ubuntu/gha-setup.sh stop` diff --git a/docs/examples/README.md b/docs/examples/README.md new file mode 100644 index 0000000..426ea13 --- /dev/null +++ b/docs/examples/README.md @@ -0,0 +1,8 @@ +### AMQP 1.0 .Golang Client Examples + + +- [Getting Started](getting_started) - A simple example to get you started. +- [Reliable](reliable) - An example of how to deal with reconnections and error handling. +- [Streams](streams) - An example of how to use [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams) with AMQP 1.0 +- [Stream Filtering](streams_filtering) - An example of how to use streams [Filter Expressions](https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions) +- [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties. \ No newline at end of file diff --git a/docs/examples/getting_started/main.go b/docs/examples/getting_started/main.go index 3c505c5..1daf1ec 100644 --- a/docs/examples/getting_started/main.go +++ b/docs/examples/getting_started/main.go @@ -10,8 +10,8 @@ import ( ) func main() { - exchangeName := "getting-started-exchange" - queueName := "getting-started-queue" + exchangeName := "getting-started-go-exchange" + queueName := "getting-started-go-queue" routingKey := "routing-key" rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client") @@ -24,16 +24,22 @@ func main() { } }(stateChanged) - // Open a connection to the AMQP 1.0 server - amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil) + // rabbitmq_amqp.NewEnvironment setups the environment. + // The environment is used to create connections + // given the same parameters + env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + + // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) + amqpConnection, err := env.NewConnection(context.Background()) if err != nil { rabbitmq_amqp.Error("Error opening connection", err) return } // Register the channel to receive status change notifications + // this is valid for the connection lifecycle amqpConnection.NotifyStatusChange(stateChanged) - fmt.Printf("AMQP connection opened.\n") + rabbitmq_amqp.Info("AMQP connection opened.\n") // Create the management interface for the connection // so we can declare exchanges, queues, and bindings management := amqpConnection.Management() @@ -165,37 +171,39 @@ func main() { err = management.Unbind(context.TODO(), bindingPath) if err != nil { - fmt.Printf("Error unbinding: %v\n", err) + rabbitmq_amqp.Error("Error unbinding: %v\n", err) return } err = management.DeleteExchange(context.TODO(), exchangeInfo.Name()) if err != nil { - fmt.Printf("Error deleting exchange: %v\n", err) + rabbitmq_amqp.Error("Error deleting exchange: %v\n", err) return } // Purge the queue purged, err := management.PurgeQueue(context.TODO(), queueInfo.Name()) if err != nil { - fmt.Printf("Error purging queue: %v\n", err) + rabbitmq_amqp.Error("Error purging queue: %v\n", err) return } - fmt.Printf("Purged %d messages from the queue.\n", purged) + rabbitmq_amqp.Info("Purged %d messages from the queue.\n", purged) err = management.DeleteQueue(context.TODO(), queueInfo.Name()) if err != nil { - fmt.Printf("Error deleting queue: %v\n", err) + rabbitmq_amqp.Error("Error deleting queue: %v\n", err) return } - err = amqpConnection.Close(context.Background()) + // Close all the connections. but you can still use the environment + // to create new connections + err = env.CloseConnections(context.Background()) if err != nil { - fmt.Printf("Error closing connection: %v\n", err) + rabbitmq_amqp.Error("Error closing connection: %v\n", err) return } - fmt.Printf("AMQP connection closed.\n") + rabbitmq_amqp.Info("AMQP connection closed.\n") // not necessary. It waits for the status change to be printed time.Sleep(100 * time.Millisecond) close(stateChanged) diff --git a/docs/examples/publisher_msg_targets/publisher_msg_targets.go b/docs/examples/publisher_msg_targets/publisher_msg_targets.go new file mode 100644 index 0000000..f8dd4d6 --- /dev/null +++ b/docs/examples/publisher_msg_targets/publisher_msg_targets.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "github.com/Azure/go-amqp" + "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" +) + +func checkError(err error) { + if err != nil { + rabbitmq_amqp.Error("Error", err) + // it should not happen for the example + // so panic just to make sure we catch it + panic(err) + } +} +func main() { + + rabbitmq_amqp.Info("Define the publisher message targets") + + env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + amqpConnection, err := env.NewConnection(context.Background()) + checkError(err) + queues := []string{"queue1", "queue2", "queue3"} + management := amqpConnection.Management() + for _, queue := range queues { + _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QuorumQueueSpecification{ + Name: queue, + }) + checkError(err) + } + + // create a publisher without a target + publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, "stream-publisher") + checkError(err) + + // publish messages to the stream + for i := 0; i < 12; i++ { + + // with this helper function we create a message with a target + // that is the same to create a message with: + // msg := amqp.NewMessage([]byte("hello")) + // MessageToAddressHelper(msg, &QueueAddress{Queue: qName}) + // same like: + // msg := amqp.NewMessage([]byte("hello")) + // msg.Properties = &amqp.MessageProperties{} + // msg.Properties.To = &address + // NewMessageToAddress and MessageToAddressHelper helpers are provided to make the + // code more readable and easier to use + msg, err := rabbitmq_amqp.NewMessageToAddress([]byte("Hello World"), + &rabbitmq_amqp.QueueAddress{Queue: queues[i%3]}) + checkError(err) + publishResult, err := publisher.Publish(context.Background(), msg) + checkError(err) + switch publishResult.Outcome.(type) { + case *amqp.StateAccepted: + rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + break + default: + rabbitmq_amqp.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0]) + } + } + + // check the UI, you should see 4 messages in each queue + + // Close the publisher + err = publisher.Close(context.Background()) + checkError(err) + err = env.CloseConnections(context.Background()) + checkError(err) +} diff --git a/docs/examples/streams/streams.go b/docs/examples/streams/streams.go new file mode 100644 index 0000000..14bc061 --- /dev/null +++ b/docs/examples/streams/streams.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "github.com/Azure/go-amqp" + "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + "time" +) + +func checkError(err error) { + if err != nil { + rabbitmq_amqp.Error("Error", err) + // it should not happen for the example + // so panic just to make sure we catch it + panic(err) + } +} + +func main() { + + rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example") + queueStream := "stream-go-queue-" + time.Now().String() + env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + amqpConnection, err := env.NewConnection(context.Background()) + checkError(err) + management := amqpConnection.Management() + // define a stream queue + _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{ + Name: queueStream, + // it is a best practice to set the max length of the stream + // to avoid the stream to grow indefinitely + // the value here is low just for the example + MaxLengthBytes: rabbitmq_amqp.CapacityGB(5), + }) + checkError(err) + + // create a stream publisher. In this case we use the QueueAddress to make the example + // simple. So we use the default exchange here. + publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher") + checkError(err) + + // publish messages to the stream + for i := 0; i < 10; i++ { + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello World"))) + checkError(err) + + // check the outcome of the publishResult + switch publishResult.Outcome.(type) { + case *amqp.StateAccepted: + rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + break + case *amqp.StateReleased: + rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + break + case *amqp.StateRejected: + rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*amqp.StateRejected) + if stateType.Error != nil { + rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + } + break + default: + // these status are not supported. Leave it for AMQP 1.0 compatibility + // see: https://www.rabbitmq.com/docs/next/amqp#outcomes + rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + } + + } + // create a stream consumer + consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{ + // the offset is set to the first chunk of the stream + // so here it starts from the beginning + Offset: &rabbitmq_amqp.OffsetFirst{}, + }) + checkError(err) + + // receive messages from the stream + for i := 0; i < 10; i++ { + deliveryContext, err := consumer.Receive(context.Background()) + checkError(err) + rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) + // accept the message + err = deliveryContext.Accept(context.Background()) + checkError(err) + } + + // close the consumer + err = consumer.Close(context.Background()) + checkError(err) + + err = amqpConnection.Management().DeleteQueue(context.Background(), queueStream) + checkError(err) + + err = env.CloseConnections(context.Background()) + checkError(err) + + rabbitmq_amqp.Info("Example completed") +} diff --git a/docs/examples/streams_filtering/streams_filtering.go b/docs/examples/streams_filtering/streams_filtering.go new file mode 100644 index 0000000..c097f9a --- /dev/null +++ b/docs/examples/streams_filtering/streams_filtering.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "github.com/Azure/go-amqp" + "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmq_amqp" + "time" +) + +func checkError(err error) { + if err != nil { + rabbitmq_amqp.Error("Error", err) + // it should not happen for the example + // so panic just to make sure we catch it + panic(err) + } +} + +func main() { + + rabbitmq_amqp.Info("Golang AMQP 1.0 Streams example with filtering") + queueStream := "stream-go-queue-filtering-" + time.Now().String() + env := rabbitmq_amqp.NewEnvironment([]string{"amqp://"}, nil) + amqpConnection, err := env.NewConnection(context.Background()) + checkError(err) + management := amqpConnection.Management() + // define a stream queue + _, err = management.DeclareQueue(context.TODO(), &rabbitmq_amqp.StreamQueueSpecification{ + Name: queueStream, + // it is a best practice to set the max length of the stream + // to avoid the stream to grow indefinitely + // the value here is low just for the example + MaxLengthBytes: rabbitmq_amqp.CapacityGB(5), + }) + checkError(err) + + // create a stream publisher. In this case we use the QueueAddress to make the example + // simple. So we use the default exchange here. + publisher, err := amqpConnection.NewPublisher(context.TODO(), &rabbitmq_amqp.QueueAddress{Queue: queueStream}, "stream-publisher") + checkError(err) + + filters := []string{"MyFilter1", "MyFilter2", "MyFilter3", "MyFilter4"} + + // publish messages to the stream + for i := 0; i < 40; i++ { + msg := amqp.NewMessage([]byte("Hello World! with filter:" + filters[i%4])) + // add a filter to the message + msg.Annotations = amqp.Annotations{ + // here we set the filter value taken from the filters array + rabbitmq_amqp.StreamFilterValue: filters[i%4], + } + publishResult, err := publisher.Publish(context.Background(), msg) + checkError(err) + + // check the outcome of the publishResult + switch publishResult.Outcome.(type) { + case *amqp.StateAccepted: + rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + break + case *amqp.StateReleased: + rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + break + case *amqp.StateRejected: + rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*amqp.StateRejected) + if stateType.Error != nil { + rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + } + break + default: + // these status are not supported. Leave it for AMQP 1.0 compatibility + // see: https://www.rabbitmq.com/docs/next/amqp#outcomes + rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) + } + + } + // create a stream consumer + consumer, err := amqpConnection.NewConsumer(context.Background(), queueStream, &rabbitmq_amqp.StreamConsumerOptions{ + // the offset is set to the first chunk of the stream + // so here it starts from the beginning + Offset: &rabbitmq_amqp.OffsetFirst{}, + + // add a filter to the consumer, in this case we use only the filter values + // MyFilter1 and MyFilter2. So all other messages won't be received + Filters: []string{"MyFilter1", "MyFilter2"}, + }) + checkError(err) + + // receive messages from the stream. + // In this case we should receive only 20 messages with the filter values + // MyFilter1 and MyFilter2 + for i := 0; i < 20; i++ { + deliveryContext, err := consumer.Receive(context.Background()) + checkError(err) + rabbitmq_amqp.Info("[Consumer]", "Message received", deliveryContext.Message().Data[0]) + // accept the message + err = deliveryContext.Accept(context.Background()) + checkError(err) + } + + // close the consumer + err = consumer.Close(context.Background()) + checkError(err) + + err = amqpConnection.Management().DeleteQueue(context.Background(), queueStream) + checkError(err) + + err = env.CloseConnections(context.Background()) + checkError(err) + + rabbitmq_amqp.Info("Example completed") +} diff --git a/pkg/rabbitmq_amqp/address.go b/pkg/rabbitmq_amqp/address.go index e93ac5f..6958c87 100644 --- a/pkg/rabbitmq_amqp/address.go +++ b/pkg/rabbitmq_amqp/address.go @@ -47,7 +47,7 @@ func (eas *ExchangeAddress) toAddress() (string, error) { // MessageToAddressHelper sets the To property of the message to the address of the target. // The target must be a QueueAddress or an ExchangeAddress. -// Note: The field To will be overwritten if it is already set. +// Note: The field msgRef.Properties.To will be overwritten if it is already set. func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error { if target == nil { return errors.New("target cannot be nil") @@ -65,6 +65,18 @@ func MessageToAddressHelper(msgRef *amqp.Message, target TargetAddress) error { return nil } +// NewMessageToAddress creates a new message with the given payload and sets the To property to the address of the target. +// The target must be a QueueAddress or an ExchangeAddress. +// This function is a helper that combines NewMessage and MessageToAddressHelper. +func NewMessageToAddress(msg []byte, target TargetAddress) (*amqp.Message, error) { + message := amqp.NewMessage(msg) + err := MessageToAddressHelper(message, target) + if err != nil { + return nil, err + } + return message, nil +} + // address Creates the address for the exchange or queue following the RabbitMQ conventions. // see: https://www.rabbitmq.com/docs/next/amqp#address-v2 func address(exchange, key, queue *string, urlParameters *string) (string, error) { diff --git a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go index 10af86e..afe207a 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go @@ -23,7 +23,7 @@ func publishMessagesWithStreamTag(queueName string, filterValue string, count in body := filterValue + " #" + strconv.Itoa(i) msg := amqp.NewMessage([]byte(body)) msg.Annotations = amqp.Annotations{ - "x-stream-filter-value": filterValue, + StreamFilterValue: filterValue, } publishResult, err := publisher.Publish(context.TODO(), msg) Expect(err).To(BeNil()) diff --git a/pkg/rabbitmq_amqp/amqp_publisher_test.go b/pkg/rabbitmq_amqp/amqp_publisher_test.go index d45b6d5..4f9d347 100644 --- a/pkg/rabbitmq_amqp/amqp_publisher_test.go +++ b/pkg/rabbitmq_amqp/amqp_publisher_test.go @@ -105,11 +105,17 @@ var _ = Describe("AMQP publisher ", func() { qName := generateNameWithDateTime("Targets NewPublisher should fail when the destination does not exist") msg := amqp.NewMessage([]byte("hello")) Expect(MessageToAddressHelper(msg, &QueueAddress{Queue: qName})).To(BeNil()) - publishResult, err := publisher.Publish(context.Background(), msg) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) + msg, err = NewMessageToAddress([]byte("hello"), &QueueAddress{Queue: qName}) + Expect(err).To(BeNil()) + publishResult, err = publisher.Publish(context.Background(), msg) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateReleased{})) + Expect(connection.Close(context.Background())).To(BeNil()) }) diff --git a/pkg/rabbitmq_amqp/common.go b/pkg/rabbitmq_amqp/common.go index b785d21..40b62de 100644 --- a/pkg/rabbitmq_amqp/common.go +++ b/pkg/rabbitmq_amqp/common.go @@ -8,6 +8,10 @@ import ( "strings" ) +// public consts + +const StreamFilterValue = "x-stream-filter-value" + const ( responseCode200 = 200 responseCode201 = 201