diff --git a/examples/getting_started/main.go b/examples/getting_started/main.go index c4fd6fc..b030516 100644 --- a/examples/getting_started/main.go +++ b/examples/getting_started/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "github.com/Azure/go-amqp" "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" @@ -67,34 +68,77 @@ func main() { return } - addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey) - - publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher") + // Create a consumer to receive messages from the queue + // you need to build the address of the queue, but you can use the helper function + addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName) + consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer") if err != nil { - rabbitmq_amqp.Error("Error creating publisher", err) + rabbitmq_amqp.Error("Error creating consumer", err) return } - // Publish a message to the exchange - publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"))) + consumerContext, cancel := context.WithCancel(context.Background()) + + // Consume messages from the queue + go func(ctx context.Context) { + for { + deliveryContext, err := consumer.Receive(ctx) + if errors.Is(err, context.Canceled) { + // The consumer was closed correctly + rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err) + return + } + if err != nil { + // An error occurred receiving the message + rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err) + return + } + + rabbitmq_amqp.Info("[Consumer]", "Received message", + fmt.Sprintf("%s", deliveryContext.Message().Data)) + + err = deliveryContext.Accept(context.Background()) + if err != nil { + rabbitmq_amqp.Error("Error accepting message", err) + return + } + } + }(consumerContext) + + addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey) + publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher") if err != nil { - rabbitmq_amqp.Error("Error publishing message", err) + rabbitmq_amqp.Error("Error creating publisher", err) return } - switch publishResult.Outcome { - case &amqp.StateAccepted{}: - rabbitmq_amqp.Info("Message accepted") - case &amqp.StateReleased{}: - rabbitmq_amqp.Warn("Message was not routed") - case &amqp.StateRejected{}: - rabbitmq_amqp.Warn("Message rejected") - stateType := publishResult.Outcome.(*amqp.StateRejected) - if stateType.Error != nil { - rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error) + + for i := 0; i < 10; i++ { + + // Publish a message to the exchange + publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + if err != nil { + rabbitmq_amqp.Error("Error publishing message", err) + return + } + switch publishResult.Outcome.(type) { + case *amqp.StateAccepted: + rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0]) + break + case *amqp.StateReleased: + rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0]) + break + case *amqp.StateRejected: + rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0]) + stateType := publishResult.Outcome.(*amqp.StateRejected) + if stateType.Error != nil { + rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error) + } + break + default: + // these status are not supported. Leave it for AMQP 1.0 compatibility + // see: https://www.rabbitmq.com/docs/next/amqp#outcomes + rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) } - default: - // these status are not supported - rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome) } println("press any key to close the connection") @@ -102,13 +146,19 @@ func main() { var input string _, _ = fmt.Scanln(&input) + cancel() + //Close the consumer + err = consumer.Close(context.Background()) + if err != nil { + rabbitmq_amqp.Error("[Consumer]", err) + } // Close the publisher err = publisher.Close(context.Background()) if err != nil { return } - // Unbind the queue from the exchange + // Unbind the queue from the exchange err = management.Unbind(context.TODO(), bindingPath) if err != nil { @@ -143,8 +193,7 @@ func main() { } fmt.Printf("AMQP Connection closed.\n") - // Wait for the status change to be printed - time.Sleep(500 * time.Millisecond) - - close(stateChangeds) + // not necessary. It waits for the status change to be printed + time.Sleep(100 * time.Millisecond) + close(stateChanged) } diff --git a/rabbitmq_amqp/amqp_connection.go b/rabbitmq_amqp/amqp_connection.go index bc294d9..db401f8 100644 --- a/rabbitmq_amqp/amqp_connection.go +++ b/rabbitmq_amqp/amqp_connection.go @@ -26,7 +26,6 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l if !validateAddress(destinationAdd) { return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues) } - sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce)) if err != nil { return nil, err @@ -34,6 +33,17 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l return newPublisher(sender), nil } +func (a *AmqpConnection) Consumer(ctx context.Context, destinationAdd string, linkName string) (*Consumer, error) { + if !validateAddress(destinationAdd) { + return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues) + } + receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce)) + if err != nil { + return nil, err + } + return newConsumer(receiver), nil +} + // Dial connect to the AMQP 1.0 server using the provided connectionSettings // Returns a pointer to the new AmqpConnection if successful else an error. // addresses is a list of addresses to connect to. It picks one randomly. @@ -93,6 +103,7 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq } a.Connection = conn a.session, err = a.Connection.NewSession(ctx, nil) + if err != nil { return err } diff --git a/rabbitmq_amqp/amqp_consumer.go b/rabbitmq_amqp/amqp_consumer.go new file mode 100644 index 0000000..6311919 --- /dev/null +++ b/rabbitmq_amqp/amqp_consumer.go @@ -0,0 +1,82 @@ +package rabbitmq_amqp + +import ( + "context" + "github.com/Azure/go-amqp" +) + +type DeliveryContext struct { + receiver *amqp.Receiver + message *amqp.Message +} + +func (dc *DeliveryContext) Message() *amqp.Message { + return dc.message +} + +func (dc *DeliveryContext) Accept(ctx context.Context) error { + return dc.receiver.AcceptMessage(ctx, dc.message) +} + +func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error { + return dc.receiver.RejectMessage(ctx, dc.message, e) +} + +func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error { + if err := validateMessageAnnotations(annotations); err != nil { + return err + } + // copy the rabbitmq annotations to amqp annotations + destination := make(amqp.Annotations) + for key, value := range annotations { + destination[key] = value + + } + + return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{ + DeliveryFailed: true, + UndeliverableHere: true, + Annotations: destination, + }) +} + +func (dc *DeliveryContext) Requeue(ctx context.Context) error { + return dc.receiver.ReleaseMessage(ctx, dc.message) +} + +func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error { + if err := validateMessageAnnotations(annotations); err != nil { + return err + } + // copy the rabbitmq annotations to amqp annotations + destination := make(amqp.Annotations) + for key, value := range annotations { + destination[key] = value + + } + return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{ + DeliveryFailed: false, + UndeliverableHere: false, + Annotations: destination, + }) +} + +type Consumer struct { + receiver *amqp.Receiver +} + +func newConsumer(receiver *amqp.Receiver) *Consumer { + return &Consumer{receiver: receiver} +} + +func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) { + msg, err := c.receiver.Receive(ctx, nil) + if err != nil { + return nil, err + } + return &DeliveryContext{receiver: c.receiver, message: msg}, nil +} + +func (c *Consumer) Close(ctx context.Context) error { + return c.receiver.Close(ctx) +} diff --git a/rabbitmq_amqp/amqp_consumer_test.go b/rabbitmq_amqp/amqp_consumer_test.go new file mode 100644 index 0000000..ceabaa0 --- /dev/null +++ b/rabbitmq_amqp/amqp_consumer_test.go @@ -0,0 +1,181 @@ +package rabbitmq_amqp + +import ( + "context" + "github.com/Azure/go-amqp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "time" +) + +var _ = Describe("Consumer tests", func() { + + It("AMQP Consumer should fail due to context cancellation", func() { + qName := generateNameWithDateTime("AMQP Consumer should fail due to context cancellation") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + addr, _ := QueueAddress(&qName) + queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + IsAutoDelete: false, + IsExclusive: false, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) + Expect(queue).NotTo(BeNil()) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) + cancel() + _, err = connection.Consumer(ctx, addr, "test") + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(ContainSubstring("context canceled")) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Consumer should ack and empty the queue", func() { + qName := generateNameWithDateTime("AMQP Consumer should ack and empty the queue") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + IsAutoDelete: false, + IsExclusive: false, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) + Expect(queue).NotTo(BeNil()) + publishMessages(qName, 10) + addr, _ := QueueAddress(&qName) + consumer, err := connection.Consumer(context.Background(), addr, "test") + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 10; i++ { + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(err).To(BeNil()) + Expect(nMessages).To(Equal(0)) + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Consumer should requeue the message to the queue", func() { + + qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + IsAutoDelete: false, + IsExclusive: false, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) + Expect(queue).NotTo(BeNil()) + publishMessages(qName, 1) + addr, _ := QueueAddress(&qName) + consumer, err := connection.Consumer(context.Background(), addr, "test") + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(dc.Requeue(context.Background())).To(BeNil()) + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(err).To(BeNil()) + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(nMessages).To(Equal(1)) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Consumer should requeue the message to the queue with annotations", func() { + + qName := generateNameWithDateTime("AMQP Consumer should requeue the message to the queue with annotations") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + IsAutoDelete: false, + IsExclusive: false, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) + Expect(queue).NotTo(BeNil()) + publishMessages(qName, 1) + addr, _ := QueueAddress(&qName) + consumer, err := connection.Consumer(context.Background(), addr, "test") + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + myAnnotations := amqp.Annotations{ + "x-key1": "value1", + "x-key2": "value2", + } + Expect(dc.RequeueWithAnnotations(context.Background(), myAnnotations)).To(BeNil()) + dcWithAnnotation, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dcWithAnnotation.Message().Annotations["x-key1"]).To(Equal("value1")) + Expect(dcWithAnnotation.Message().Annotations["x-key2"]).To(Equal("value2")) + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(err).To(BeNil()) + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(nMessages).To(Equal(1)) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("AMQP Consumer should discard the message to the queue with and without annotations", func() { + // TODO: Implement this test with a dead letter queue to test the discard feature + qName := generateNameWithDateTime("AMQP Consumer should discard the message to the queue with and without annotations") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queue, err := connection.Management().DeclareQueue(context.Background(), &QueueSpecification{ + Name: qName, + IsAutoDelete: false, + IsExclusive: false, + QueueType: QueueType{Quorum}, + }) + Expect(err).To(BeNil()) + Expect(queue).NotTo(BeNil()) + publishMessages(qName, 2) + addr, _ := QueueAddress(&qName) + consumer, err := connection.Consumer(context.Background(), addr, "test") + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + myAnnotations := amqp.Annotations{ + "x-key1": "value1", + "x-key2": "value2", + } + Expect(dc.DiscardWithAnnotations(context.Background(), myAnnotations)).To(BeNil()) + dc, err = consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(dc.Discard(context.Background(), &amqp.Error{ + Condition: "my error", + Description: "my error description", + Info: nil, + })).To(BeNil()) + nMessages, err := connection.Management().PurgeQueue(context.Background(), qName) + Expect(nMessages).To(Equal(0)) + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + }) + +}) diff --git a/rabbitmq_amqp/amqp_management.go b/rabbitmq_amqp/amqp_management.go index c806488..75f8b73 100644 --- a/rabbitmq_amqp/amqp_management.go +++ b/rabbitmq_amqp/amqp_management.go @@ -29,7 +29,7 @@ func NewAmqpManagement() *AmqpManagement { func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { if a.receiver == nil { - opts := createReceiverLinkOptions(managementNodeAddress, linkPairName) + opts := createReceiverLinkOptions(managementNodeAddress, linkPairName, AtMostOnce) receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) if err != nil { return err diff --git a/rabbitmq_amqp/amqp_publisher.go b/rabbitmq_amqp/amqp_publisher.go index fde42fe..508949d 100644 --- a/rabbitmq_amqp/amqp_publisher.go +++ b/rabbitmq_amqp/amqp_publisher.go @@ -27,7 +27,6 @@ func newPublisher(sender *amqp.Sender) *Publisher { // - StateRejected // See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information. func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) { - r, err := m.sender.SendWithReceipt(ctx, message, nil) if err != nil { return nil, err @@ -36,7 +35,6 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis if err != nil { return nil, err } - publishResult := &PublishResult{ Message: message, Outcome: state, diff --git a/rabbitmq_amqp/amqp_utils.go b/rabbitmq_amqp/amqp_utils.go index 5427be3..fcd8b86 100644 --- a/rabbitmq_amqp/amqp_utils.go +++ b/rabbitmq_amqp/amqp_utils.go @@ -1,6 +1,7 @@ package rabbitmq_amqp import ( + "fmt" "github.com/Azure/go-amqp" "math/rand" "time" @@ -39,16 +40,27 @@ func createSenderLinkOptions(address string, linkName string, deliveryMode int) // receiverLinkOptions returns the options for a receiver link // with the given address and link name. // That should be the same for all the links. -func createReceiverLinkOptions(address string, linkName string) *amqp.ReceiverOptions { +func createReceiverLinkOptions(address string, linkName string, deliveryMode int) *amqp.ReceiverOptions { prop := make(map[string]any) prop["paired"] = true + receiverSettleMode := amqp.SenderSettleModeSettled.Ptr() + /// SndSettleMode = deliveryMode == DeliveryMode.AtMostOnce + // ? SenderSettleMode.Settled + // : SenderSettleMode.Unsettled, + + if deliveryMode == AtLeastOnce { + receiverSettleMode = amqp.SenderSettleModeUnsettled.Ptr() + } + return &amqp.ReceiverOptions{ TargetAddress: address, DynamicAddress: false, Name: linkName, Properties: prop, - RequestedSenderSettleMode: amqp.SenderSettleModeSettled.Ptr(), + Durability: 0, + ExpiryTimeout: 0, SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), + RequestedSenderSettleMode: receiverSettleMode, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, Credit: 100, } @@ -58,3 +70,24 @@ func random(max int) int { r := rand.New(rand.NewSource(time.Now().Unix())) return r.Intn(max) } + +func validateMessageAnnotations(annotations amqp.Annotations) error { + for k, _ := range annotations { + switch tp := k.(type) { + case string: + if err := validateMessageAnnotationKey(tp); err != nil { + return err + } + default: + return fmt.Errorf("message annotation key must be a string: %v", k) + } + } + return nil +} + +func validateMessageAnnotationKey(key string) error { + if key[:2] != "x-" { + return fmt.Errorf("message annotation key must start with 'x-': %s", key) + } + return nil +}