From d6f13185be511affe81c0c558a4d3cb18efdb349 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 7 Feb 2025 16:20:17 +0100 Subject: [PATCH 1/4] add stream support Signed-off-by: Gabriele Santomaggio --- docs/examples/getting_started/main.go | 6 +- docs/examples/reliable/reliable.go | 4 +- pkg/rabbitmq_amqp/amqp_connection.go | 9 +- .../amqp_connection_recovery_test.go | 5 +- pkg/rabbitmq_amqp/amqp_consumer.go | 10 +- pkg/rabbitmq_amqp/amqp_consumer_test.go | 12 +- pkg/rabbitmq_amqp/amqp_management.go | 2 +- pkg/rabbitmq_amqp/amqp_types.go | 106 ++++++++++++++++++ pkg/rabbitmq_amqp/amqp_utils.go | 6 +- 9 files changed, 132 insertions(+), 28 deletions(-) create mode 100644 pkg/rabbitmq_amqp/amqp_types.go diff --git a/docs/examples/getting_started/main.go b/docs/examples/getting_started/main.go index 1a9954b..3c505c5 100644 --- a/docs/examples/getting_started/main.go +++ b/docs/examples/getting_started/main.go @@ -70,9 +70,7 @@ func main() { // Create a consumer to receive messages from the queue // you need to build the address of the queue, but you can use the helper function - consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{ - Queue: queueName, - }, "getting-started-consumer") + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { rabbitmq_amqp.Error("Error creating consumer", err) return @@ -115,7 +113,7 @@ func main() { return } - for i := 0; i < 1_000; i++ { + for i := 0; i < 100; i++ { // Publish a message to the exchange publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) if err != nil { diff --git a/docs/examples/reliable/reliable.go b/docs/examples/reliable/reliable.go index 6710a4b..d985732 100644 --- a/docs/examples/reliable/reliable.go +++ b/docs/examples/reliable/reliable.go @@ -77,9 +77,7 @@ func main() { return } - consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{ - Queue: queueName, - }, "reliable-consumer") + consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil) if err != nil { rabbitmq_amqp.Error("Error creating consumer", err) return diff --git a/pkg/rabbitmq_amqp/amqp_connection.go b/pkg/rabbitmq_amqp/amqp_connection.go index 3b7cd79..55ad5dc 100644 --- a/pkg/rabbitmq_amqp/amqp_connection.go +++ b/pkg/rabbitmq_amqp/amqp_connection.go @@ -87,14 +87,17 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAdd } // NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress. -func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddress, linkName string) (*Consumer, error) { +func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error) { + destination := &QueueAddress{ + Queue: queueName, + } + destinationAdd, err := destination.toAddress() if err != nil { return nil, err } - err = validateAddress(destinationAdd) - return newConsumer(ctx, a, destinationAdd, linkName) + return newConsumer(ctx, a, destinationAdd, options) } // Dial connect to the AMQP 1.0 server using the provided connectionSettings diff --git a/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go b/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go index b5126f6..b1b45bd 100644 --- a/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go +++ b/pkg/rabbitmq_amqp/amqp_connection_recovery_test.go @@ -43,9 +43,8 @@ var _ = Describe("Recovery connection test", func() { Expect(err).To(BeNil()) Expect(queueInfo).NotTo(BeNil()) - consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{ - Queue: qName, - }, "test") + consumer, err := connection.NewConsumer(context.Background(), + qName, nil) publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{ Queue: qName, diff --git a/pkg/rabbitmq_amqp/amqp_consumer.go b/pkg/rabbitmq_amqp/amqp_consumer.go index 22f702c..a917862 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer.go +++ b/pkg/rabbitmq_amqp/amqp_consumer.go @@ -67,7 +67,7 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio type Consumer struct { receiver atomic.Pointer[amqp.Receiver] connection *AmqpConnection - linkName string + options ConsumerOptions destinationAdd string id string } @@ -76,12 +76,13 @@ func (c *Consumer) Id() string { return c.id } -func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, linkName string, args ...string) (*Consumer, error) { +func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options ConsumerOptions, args ...string) (*Consumer, error) { id := fmt.Sprintf("consumer-%s", uuid.New().String()) if len(args) > 0 { id = args[0] } - r := &Consumer{connection: connection, linkName: linkName, destinationAdd: destinationAdd, id: id} + + r := &Consumer{connection: connection, options: options, destinationAdd: destinationAdd, id: id} connection.entitiesTracker.storeOrReplaceConsumer(r) err := r.createReceiver(ctx) if err != nil { @@ -91,7 +92,8 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd } func (c *Consumer) createReceiver(ctx context.Context) error { - receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, createReceiverLinkOptions(c.destinationAdd, c.linkName, AtLeastOnce)) + receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, + createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce)) if err != nil { return err } diff --git a/pkg/rabbitmq_amqp/amqp_consumer_test.go b/pkg/rabbitmq_amqp/amqp_consumer_test.go index 117ebbd..3b2fd04 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_test.go +++ b/pkg/rabbitmq_amqp/amqp_consumer_test.go @@ -24,9 +24,7 @@ var _ = Describe("NewConsumer tests", func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) cancel() - _, err = connection.NewConsumer(ctx, &QueueAddress{ - Queue: qName, - }, "test") + _, err = connection.NewConsumer(ctx, qName, nil) Expect(err).NotTo(BeNil()) Expect(err.Error()).To(ContainSubstring("context canceled")) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) @@ -43,7 +41,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(err).To(BeNil()) Expect(queue).NotTo(BeNil()) publishMessages(qName, 10) - consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test") + consumer, err := connection.NewConsumer(context.Background(), qName, nil) Expect(err).To(BeNil()) Expect(consumer).NotTo(BeNil()) Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) @@ -72,7 +70,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(err).To(BeNil()) Expect(queue).NotTo(BeNil()) publishMessages(qName, 1) - consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test") + consumer, err := connection.NewConsumer(context.Background(), qName, nil) Expect(err).To(BeNil()) Expect(consumer).NotTo(BeNil()) Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) @@ -99,7 +97,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(err).To(BeNil()) Expect(queue).NotTo(BeNil()) publishMessages(qName, 1) - consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test") + consumer, err := connection.NewConsumer(context.Background(), qName, nil) Expect(err).To(BeNil()) Expect(consumer).NotTo(BeNil()) Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) @@ -134,7 +132,7 @@ var _ = Describe("NewConsumer tests", func() { Expect(err).To(BeNil()) Expect(queue).NotTo(BeNil()) publishMessages(qName, 2) - consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{Queue: qName}, "test") + consumer, err := connection.NewConsumer(context.Background(), qName, nil) Expect(err).To(BeNil()) Expect(consumer).NotTo(BeNil()) Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) diff --git a/pkg/rabbitmq_amqp/amqp_management.go b/pkg/rabbitmq_amqp/amqp_management.go index 39aa42d..3fc443c 100644 --- a/pkg/rabbitmq_amqp/amqp_management.go +++ b/pkg/rabbitmq_amqp/amqp_management.go @@ -32,7 +32,7 @@ func NewAmqpManagement() *AmqpManagement { } func (a *AmqpManagement) ensureReceiverLink(ctx context.Context) error { - opts := createReceiverLinkOptions(managementNodeAddress, linkPairName, AtMostOnce) + opts := createReceiverLinkOptions(managementNodeAddress, &managementOptions{}, AtMostOnce) receiver, err := a.session.NewReceiver(ctx, managementNodeAddress, opts) if err != nil { return err diff --git a/pkg/rabbitmq_amqp/amqp_types.go b/pkg/rabbitmq_amqp/amqp_types.go new file mode 100644 index 0000000..8faec3c --- /dev/null +++ b/pkg/rabbitmq_amqp/amqp_types.go @@ -0,0 +1,106 @@ +package rabbitmq_amqp + +import "github.com/google/uuid" + +type linkerName interface { + linkName() string +} + +func getLinkName(l linkerName) string { + if l == nil { + return uuid.New().String() + } + return l.linkName() +} + +/// ConsumerOptions /// + +type ConsumerOptions interface { + linkName() string + initialCredits() int32 +} + +func getInitialCredits(co ConsumerOptions) int32 { + if co == nil { + return 100 + } + return co.initialCredits() +} + +type managementOptions struct { +} + +func (mo *managementOptions) linkName() string { + return linkPairName +} + +func (mo *managementOptions) initialCredits() int32 { + return 10 +} + +type AMQPConsumerOptions struct { + ReceiverLinkName string + InitialCredits uint32 +} + +func (aco *AMQPConsumerOptions) linkName() string { + return aco.ReceiverLinkName +} + +func (aco *AMQPConsumerOptions) initialCredits() uint32 { + return aco.InitialCredits +} + +type OffsetSpecification interface { + toMap() map[string]any +} + +type OffsetFirst struct { +} + +func (of *OffsetFirst) toMap() map[string]any { + return map[string]any{"offset": "first"} +} + +type OffsetLast struct { +} + +func (ol *OffsetLast) toMap() map[string]any { + return map[string]any{"offset": "last"} +} + +type OffsetValue struct { + Offset uint64 +} + +func (o *OffsetValue) toMap() map[string]any { + return map[string]any{"offset": o.Offset} +} + +type StreamConsumerOptions struct { + ReceiverLinkName string + InitialCredits uint32 + Offset OffsetSpecification +} + +func (sco *StreamConsumerOptions) linkName() string { + return sco.ReceiverLinkName +} + +func (sco *StreamConsumerOptions) initialCredits() uint32 { + return sco.InitialCredits +} + +///// ProducerOptions ///// + +type ProducerOptions interface { + linkName() string +} + +type AMQPProducerOptions struct { + SenderLinkName string +} + +func (apo *AMQPProducerOptions) linkName() string { + return apo.SenderLinkName +} diff --git a/pkg/rabbitmq_amqp/amqp_utils.go b/pkg/rabbitmq_amqp/amqp_utils.go index fcd8b86..71f9e65 100644 --- a/pkg/rabbitmq_amqp/amqp_utils.go +++ b/pkg/rabbitmq_amqp/amqp_utils.go @@ -40,7 +40,7 @@ func createSenderLinkOptions(address string, linkName string, deliveryMode int) // receiverLinkOptions returns the options for a receiver link // with the given address and link name. // That should be the same for all the links. -func createReceiverLinkOptions(address string, linkName string, deliveryMode int) *amqp.ReceiverOptions { +func createReceiverLinkOptions(address string, options ConsumerOptions, deliveryMode int) *amqp.ReceiverOptions { prop := make(map[string]any) prop["paired"] = true receiverSettleMode := amqp.SenderSettleModeSettled.Ptr() @@ -55,14 +55,14 @@ func createReceiverLinkOptions(address string, linkName string, deliveryMode int return &amqp.ReceiverOptions{ TargetAddress: address, DynamicAddress: false, - Name: linkName, + Name: getLinkName(options), Properties: prop, Durability: 0, ExpiryTimeout: 0, SettlementMode: amqp.ReceiverSettleModeFirst.Ptr(), RequestedSenderSettleMode: receiverSettleMode, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, - Credit: 100, + Credit: getInitialCredits(options), } } From 8f01d461561dc0c14c2dbbe739d3c14f46059d9f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 10 Feb 2025 17:52:51 +0100 Subject: [PATCH 2/4] add the offset specification Signed-off-by: Gabriele Santomaggio --- .../amqp_consumer_stream_test.go | 116 ++++++++++++++++++ pkg/rabbitmq_amqp/amqp_queue_test.go | 9 +- pkg/rabbitmq_amqp/amqp_types.go | 61 +++++++-- pkg/rabbitmq_amqp/amqp_utils.go | 1 + pkg/rabbitmq_amqp/entities.go | 37 ++++++ 5 files changed, 210 insertions(+), 14 deletions(-) create mode 100644 pkg/rabbitmq_amqp/amqp_consumer_stream_test.go diff --git a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go new file mode 100644 index 0000000..fbe08b1 --- /dev/null +++ b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go @@ -0,0 +1,116 @@ +package rabbitmq_amqp + +import ( + "context" + "fmt" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Consumer stream test", func() { + + It("start consuming with different offset types", func() { + /* + Test the different offset types for stream consumers + 1. OffsetValue + 2. OffsetFirst + 3. OffsetLast + 4. OffsetNext + + With 10 messages in the queue, the test will create a consumer with different offset types + the test 1, 2, 4 can be deterministic. The test 3 can't be deterministic (in this test), + but we can check if there is at least one message, and it is not the first one. + It is enough to verify the functionality of the offset types. + */ + + qName := generateName("start consuming with different offset types") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.name).To(Equal(qName)) + + publishMessages(qName, 10) + + consumerOffsetValue, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "offset_value_test", + InitialCredits: 1, + Offset: &OffsetValue{Offset: 5}, + }) + + Expect(err).To(BeNil()) + Expect(consumerOffsetValue).NotTo(BeNil()) + Expect(consumerOffsetValue).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 5; i++ { + dc, err := consumerOffsetValue.Receive(context.Background()) + Expect(err).To(BeNil()) + + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i+5))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + consumerFirst, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + Offset: &OffsetFirst{}, + }) + + Expect(err).To(BeNil()) + Expect(consumerFirst).NotTo(BeNil()) + Expect(consumerFirst).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 10; i++ { + dc, err := consumerFirst.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + // the tests Last and Next can't be deterministic + // but we can check if there is at least one message, and it is not the first one + consumerLast, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumerLast_test", + InitialCredits: 10, + Offset: &OffsetLast{}, + }) + + Expect(err).To(BeNil()) + Expect(consumerLast).NotTo(BeNil()) + Expect(consumerLast).To(BeAssignableToTypeOf(&Consumer{})) + // it should receive at least one message + dc, err := consumerLast.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).NotTo(Equal(fmt.Sprintf("Message #%d", 0))) + Expect(dc.Accept(context.Background())).To(BeNil()) + + consumerNext, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumerNext_next", + InitialCredits: 10, + Offset: &OffsetNext{}, + }) + + Expect(err).To(BeNil()) + Expect(consumerNext).NotTo(BeNil()) + Expect(consumerNext).To(BeAssignableToTypeOf(&Consumer{})) + signal := make(chan struct{}) + go func() { + // it should receive the next message + dc, err = consumerNext.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal("the next message")) + Expect(dc.Accept(context.Background())).To(BeNil()) + signal <- struct{}{} + }() + publishMessages(qName, 1, "the next message") + <-signal + Expect(consumerLast.Close(context.Background())).To(BeNil()) + Expect(consumerOffsetValue.Close(context.Background())).To(BeNil()) + Expect(consumerFirst.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) + + }) +}) diff --git a/pkg/rabbitmq_amqp/amqp_queue_test.go b/pkg/rabbitmq_amqp/amqp_queue_test.go index 944529b..b30d269 100644 --- a/pkg/rabbitmq_amqp/amqp_queue_test.go +++ b/pkg/rabbitmq_amqp/amqp_queue_test.go @@ -216,7 +216,7 @@ var _ = Describe("AMQP Queue test ", func() { }) }) -func publishMessages(queueName string, count int) { +func publishMessages(queueName string, count int, args ...string) { conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil) Expect(err).To(BeNil()) @@ -225,7 +225,12 @@ func publishMessages(queueName string, count int) { Expect(publisher).NotTo(BeNil()) for i := 0; i < count; i++ { - publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i)))) + body := "Message #" + strconv.Itoa(i) + if len(args) > 0 { + body = args[0] + } + + publishResult, err := publisher.Publish(context.TODO(), amqp.NewMessage([]byte(body))) Expect(err).To(BeNil()) Expect(publishResult).NotTo(BeNil()) Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) diff --git a/pkg/rabbitmq_amqp/amqp_types.go b/pkg/rabbitmq_amqp/amqp_types.go index 8faec3c..80988d0 100644 --- a/pkg/rabbitmq_amqp/amqp_types.go +++ b/pkg/rabbitmq_amqp/amqp_types.go @@ -1,6 +1,9 @@ package rabbitmq_amqp -import "github.com/google/uuid" +import ( + "github.com/Azure/go-amqp" + "github.com/google/uuid" +) type linkerName interface { linkName() string @@ -18,15 +21,23 @@ func getLinkName(l linkerName) string { type ConsumerOptions interface { linkName() string initialCredits() int32 + linkFilters() []amqp.LinkFilter } func getInitialCredits(co ConsumerOptions) int32 { - if co == nil { - return 100 + if co == nil || co.initialCredits() == 0 { + return 10 } return co.initialCredits() } +func getLinkFilters(co ConsumerOptions) []amqp.LinkFilter { + if co == nil { + return nil + } + return co.linkFilters() +} + type managementOptions struct { } @@ -38,6 +49,10 @@ func (mo *managementOptions) initialCredits() int32 { return 10 } +func (mo *managementOptions) linkFilters() []amqp.LinkFilter { + return nil +} + type AMQPConsumerOptions struct { ReceiverLinkName string InitialCredits uint32 @@ -51,35 +66,53 @@ func (aco *AMQPConsumerOptions) initialCredits() uint32 { return aco.InitialCredits } +func (aco *AMQPConsumerOptions) linkFilters() []amqp.LinkFilter { + return nil +} + type OffsetSpecification interface { - toMap() map[string]any + toLinkFilter() amqp.LinkFilter } +const rmqStreamFilter = "rabbitmq:stream-filter" +const rmqStreamOffsetSpec = "rabbitmq:stream-offset-spec" +const rmqStreamMatchUnfiltered = "rabbitmq:stream-match-unfiltered" +const offsetFirst = "first" +const offsetNext = "next" +const offsetLast = "last" + type OffsetFirst struct { } -func (of *OffsetFirst) toMap() map[string]any { - return map[string]any{"offset": "first"} +func (of *OffsetFirst) toLinkFilter() amqp.LinkFilter { + return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetFirst) } type OffsetLast struct { } -func (ol *OffsetLast) toMap() map[string]any { - return map[string]any{"offset": "last"} +func (ol *OffsetLast) toLinkFilter() amqp.LinkFilter { + return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetLast) } type OffsetValue struct { Offset uint64 } -func (o *OffsetValue) toMap() map[string]any { - return map[string]any{"offset": o.Offset} +func (ov *OffsetValue) toLinkFilter() amqp.LinkFilter { + return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, ov.Offset) +} + +type OffsetNext struct { +} + +func (on *OffsetNext) toLinkFilter() amqp.LinkFilter { + return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetNext) } type StreamConsumerOptions struct { ReceiverLinkName string - InitialCredits uint32 + InitialCredits int32 Offset OffsetSpecification } @@ -87,10 +120,14 @@ func (sco *StreamConsumerOptions) linkName() string { return sco.ReceiverLinkName } -func (sco *StreamConsumerOptions) initialCredits() uint32 { +func (sco *StreamConsumerOptions) initialCredits() int32 { return sco.InitialCredits } +func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter { + return []amqp.LinkFilter{sco.Offset.toLinkFilter()} +} + ///// ProducerOptions ///// type ProducerOptions interface { diff --git a/pkg/rabbitmq_amqp/amqp_utils.go b/pkg/rabbitmq_amqp/amqp_utils.go index 71f9e65..e360d76 100644 --- a/pkg/rabbitmq_amqp/amqp_utils.go +++ b/pkg/rabbitmq_amqp/amqp_utils.go @@ -63,6 +63,7 @@ func createReceiverLinkOptions(address string, options ConsumerOptions, delivery RequestedSenderSettleMode: receiverSettleMode, ExpiryPolicy: amqp.ExpiryPolicyLinkDetach, Credit: getInitialCredits(options), + Filters: getLinkFilters(options), } } diff --git a/pkg/rabbitmq_amqp/entities.go b/pkg/rabbitmq_amqp/entities.go index b7dfde8..471639b 100644 --- a/pkg/rabbitmq_amqp/entities.go +++ b/pkg/rabbitmq_amqp/entities.go @@ -284,7 +284,44 @@ func (a *AutoGeneratedQueueSpecification) buildArguments() map[string]any { result["x-queue-type"] = a.queueType().String() return result +} + +type StreamQueueSpecification struct { + Name string + MaxLengthBytes int64 + InitialClusterSize int +} + +func (s *StreamQueueSpecification) name() string { + return s.Name +} + +func (s *StreamQueueSpecification) isAutoDelete() bool { + return false +} + +func (s *StreamQueueSpecification) isExclusive() bool { + return false +} +func (s *StreamQueueSpecification) queueType() QueueType { + return QueueType{Type: Stream} +} + +func (s *StreamQueueSpecification) buildArguments() map[string]any { + result := map[string]any{} + + if s.MaxLengthBytes != 0 { + result["x-max-length-bytes"] = s.MaxLengthBytes + } + + if s.InitialClusterSize != 0 { + result["x-stream-initial-cluster-size"] = s.InitialClusterSize + } + + result["x-queue-type"] = s.queueType().String() + + return result } // / **** Exchange **** From 9bc397f32cb59a9f9df4c25830b3d713b109ea4c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 11 Feb 2025 12:11:21 +0100 Subject: [PATCH 3/4] restart form the last offset in case of kill stream connection Signed-off-by: Gabriele Santomaggio --- pkg/rabbitmq_amqp/amqp_consumer.go | 35 ++++++++- .../amqp_consumer_stream_test.go | 73 +++++++++++++++++++ pkg/rabbitmq_amqp/amqp_types.go | 27 +++++-- 3 files changed, 128 insertions(+), 7 deletions(-) diff --git a/pkg/rabbitmq_amqp/amqp_consumer.go b/pkg/rabbitmq_amqp/amqp_consumer.go index a917862..0ebbd1b 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer.go +++ b/pkg/rabbitmq_amqp/amqp_consumer.go @@ -70,6 +70,14 @@ type Consumer struct { options ConsumerOptions destinationAdd string id string + + /* + currentOffset is the current offset of the consumer. It is valid only for the stream consumers. + it is used to keep track of the last message that was consumed by the consumer. + so in case of restart the consumer can start from the last message that was consumed. + For the AMQP queues it is just ignored. + */ + currentOffset int64 } func (c *Consumer) Id() string { @@ -82,7 +90,10 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd id = args[0] } - r := &Consumer{connection: connection, options: options, destinationAdd: destinationAdd, id: id} + r := &Consumer{connection: connection, options: options, + destinationAdd: destinationAdd, + currentOffset: -1, + id: id} connection.entitiesTracker.storeOrReplaceConsumer(r) err := r.createReceiver(ctx) if err != nil { @@ -92,6 +103,22 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd } func (c *Consumer) createReceiver(ctx context.Context) error { + if c.currentOffset >= 0 { + // here it means that the consumer is a stream consumer and there is a restart. + // so we need to set the offset to the last consumed message in order to restart from there. + // In there is not a restart this code won't be executed. + if c.options != nil { + // here we assume it is a stream. So we recreate the options with the offset. + c.options = &StreamConsumerOptions{ + ReceiverLinkName: c.options.linkName(), + InitialCredits: c.options.initialCredits(), + // we increment the offset by one to start from the next message. + // because the current was already consumed. + Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)}, + } + } + } + receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce)) if err != nil { @@ -107,6 +134,12 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) { if err != nil { return nil, err } + + if msg != nil && msg.Annotations != nil && msg.Annotations["x-stream-offset"] != nil { + // keep track of the current offset of the consumer + c.currentOffset = msg.Annotations["x-stream-offset"].(int64) + } + return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil } diff --git a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go index fbe08b1..4d6ca9c 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go @@ -3,8 +3,11 @@ package rabbitmq_amqp import ( "context" "fmt" + "github.com/Azure/go-amqp" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper" + "time" ) var _ = Describe("Consumer stream test", func() { @@ -111,6 +114,76 @@ var _ = Describe("Consumer stream test", func() { Expect(consumerFirst.Close(context.Background())).To(BeNil()) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) + }) + + It("consumer should restart form the last offset in case of disconnection", func() { + /* + Test the consumer should restart form the last offset in case of disconnection + So we send 10 messages. Consume 5 then kill the connection and the consumer should restart form + the offset 5 to consume the messages + */ + + qName := generateName("consumer should restart form the last offset in case of disconnection") + connection, err := Dial(context.Background(), []string{"amqp://"}, &AmqpConnOptions{ + SASLType: amqp.SASLTypeAnonymous(), + ContainerID: qName, + RecoveryConfiguration: &RecoveryConfiguration{ + ActiveRecovery: true, + // reduced the reconnect interval to speed up the test. + // don't use low values in production + BackOffReconnectInterval: 1_001 * time.Millisecond, + MaxReconnectAttempts: 5, + }, + }) + Expect(err).To(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.name).To(Equal(qName)) + publishMessages(qName, 10) + + consumer, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumer should restart form the last offset in case of disconnection", + InitialCredits: 5, + Offset: &OffsetFirst{}, + }) + + Expect(err).To(BeNil()) + Expect(consumer).NotTo(BeNil()) + Expect(consumer).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 5; i++ { + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + Eventually(func() bool { + err := testhelper.DropConnectionContainerID(qName) + return err == nil + }).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue()) + time.Sleep(1 * time.Second) + + Eventually(func() bool { + conn, err := testhelper.GetConnectionByContainerID(qName) + return err == nil && conn != nil + }).WithTimeout(5 * time.Second).WithPolling(400 * time.Millisecond).Should(BeTrue()) + time.Sleep(500 * time.Millisecond) + + // the consumer should restart from the last offset + for i := 5; i < 10; i++ { + dc, err := consumer.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("Message #%d", i))) + } + + Expect(consumer.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) }) }) diff --git a/pkg/rabbitmq_amqp/amqp_types.go b/pkg/rabbitmq_amqp/amqp_types.go index 80988d0..4729117 100644 --- a/pkg/rabbitmq_amqp/amqp_types.go +++ b/pkg/rabbitmq_amqp/amqp_types.go @@ -10,17 +10,24 @@ type linkerName interface { } func getLinkName(l linkerName) string { - if l == nil { + if l == nil || l.linkName() == "" { return uuid.New().String() } return l.linkName() } -/// ConsumerOptions /// +/// ConsumerOptions interface for the AMQP and Stream consumer/// type ConsumerOptions interface { + // linkName returns the name of the link + // if not set it will return a random UUID linkName() string + // initialCredits returns the initial credits for the link + // if not set it will return 10 initialCredits() int32 + + // linkFilters returns the link filters for the link. + // It is mostly used for the stream consumers. linkFilters() []amqp.LinkFilter } @@ -54,15 +61,17 @@ func (mo *managementOptions) linkFilters() []amqp.LinkFilter { } type AMQPConsumerOptions struct { + //ReceiverLinkName: see the ConsumerOptions interface ReceiverLinkName string - InitialCredits uint32 + //InitialCredits: see the ConsumerOptions interface + InitialCredits int32 } func (aco *AMQPConsumerOptions) linkName() string { return aco.ReceiverLinkName } -func (aco *AMQPConsumerOptions) initialCredits() uint32 { +func (aco *AMQPConsumerOptions) initialCredits() int32 { return aco.InitialCredits } @@ -110,10 +119,16 @@ func (on *OffsetNext) toLinkFilter() amqp.LinkFilter { return amqp.NewLinkFilter(rmqStreamOffsetSpec, 0, offsetNext) } +/* +StreamConsumerOptions represents the options that can be used to create a stream consumer. +It is mandatory in case of creating a stream consumer. +*/ type StreamConsumerOptions struct { + //ReceiverLinkName: see the ConsumerOptions interface ReceiverLinkName string - InitialCredits int32 - Offset OffsetSpecification + //InitialCredits: see the ConsumerOptions interface + InitialCredits int32 + Offset OffsetSpecification } func (sco *StreamConsumerOptions) linkName() string { From ea9b9f16de5836ed1224c5d1a2cfe2923b86d459 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 13 Feb 2025 11:36:49 +0100 Subject: [PATCH 4/4] Add filters Signed-off-by: Gabriele Santomaggio --- .../amqp_consumer_stream_test.go | 129 ++++++++++++++++++ pkg/rabbitmq_amqp/amqp_types.go | 29 +++- pkg/rabbitmq_amqp/amqp_utils.go | 3 +- 3 files changed, 155 insertions(+), 6 deletions(-) diff --git a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go index 4d6ca9c..10af86e 100644 --- a/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go +++ b/pkg/rabbitmq_amqp/amqp_consumer_stream_test.go @@ -7,9 +7,34 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" testhelper "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/test-helper" + "strconv" "time" ) +func publishMessagesWithStreamTag(queueName string, filterValue string, count int) { + conn, err := Dial(context.TODO(), []string{"amqp://guest:guest@localhost"}, nil) + Expect(err).To(BeNil()) + + publisher, err := conn.NewPublisher(context.TODO(), &QueueAddress{Queue: queueName}, "producer_filter_stream") + Expect(err).To(BeNil()) + Expect(publisher).NotTo(BeNil()) + + for i := 0; i < count; i++ { + body := filterValue + " #" + strconv.Itoa(i) + msg := amqp.NewMessage([]byte(body)) + msg.Annotations = amqp.Annotations{ + "x-stream-filter-value": filterValue, + } + publishResult, err := publisher.Publish(context.TODO(), msg) + Expect(err).To(BeNil()) + Expect(publishResult).NotTo(BeNil()) + Expect(publishResult.Outcome).To(Equal(&amqp.StateAccepted{})) + } + err = conn.Close(context.TODO()) + Expect(err).To(BeNil()) + +} + var _ = Describe("Consumer stream test", func() { It("start consuming with different offset types", func() { @@ -184,6 +209,110 @@ var _ = Describe("Consumer stream test", func() { Expect(consumer.Close(context.Background())).To(BeNil()) Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) Expect(connection.Close(context.Background())).To(BeNil()) + }) + It("consumer should filter messages based on x-stream-filter", func() { + qName := generateName("consumer should filter messages based on x-stream-filter") + connection, err := Dial(context.Background(), []string{"amqp://"}, nil) + Expect(err).To(BeNil()) + queueInfo, err := connection.Management().DeclareQueue(context.Background(), &StreamQueueSpecification{ + Name: qName, + }) + Expect(err).To(BeNil()) + Expect(queueInfo).NotTo(BeNil()) + Expect(queueInfo.name).To(Equal(qName)) + publishMessagesWithStreamTag(qName, "banana", 10) + publishMessagesWithStreamTag(qName, "apple", 10) + publishMessagesWithStreamTag(qName, "", 10) + + consumerBanana, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumer banana should filter messages based on x-stream-filter", + InitialCredits: 200, + Offset: &OffsetFirst{}, + Filters: []string{"banana"}, + }) + + Expect(err).To(BeNil()) + Expect(consumerBanana).NotTo(BeNil()) + Expect(consumerBanana).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 10; i++ { + dc, err := consumerBanana.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + consumerApple, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter", + InitialCredits: 200, + Offset: &OffsetFirst{}, + Filters: []string{"apple"}, + FilterMatchUnfiltered: true, + }) + + Expect(err).To(BeNil()) + Expect(consumerApple).NotTo(BeNil()) + Expect(consumerApple).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 10; i++ { + dc, err := consumerApple.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + consumerAppleAndBanana, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumer apple and banana should filter messages based on x-stream-filter", + InitialCredits: 200, + Offset: &OffsetFirst{}, + Filters: []string{"apple", "banana"}, + }) + + Expect(err).To(BeNil()) + Expect(consumerAppleAndBanana).NotTo(BeNil()) + Expect(consumerAppleAndBanana).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 20; i++ { + dc, err := consumerAppleAndBanana.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + if i < 10 { + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("banana #%d", i))) + } else { + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i-10))) + } + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + consumerAppleMatchUnfiltered, err := connection.NewConsumer(context.Background(), qName, &StreamConsumerOptions{ + ReceiverLinkName: "consumer apple should filter messages based on x-stream-filter and FilterMatchUnfiltered true", + InitialCredits: 200, + Offset: &OffsetFirst{}, + Filters: []string{"apple"}, + FilterMatchUnfiltered: true, + }) + + Expect(err).To(BeNil()) + Expect(consumerAppleMatchUnfiltered).NotTo(BeNil()) + Expect(consumerAppleMatchUnfiltered).To(BeAssignableToTypeOf(&Consumer{})) + for i := 0; i < 20; i++ { + dc, err := consumerAppleMatchUnfiltered.Receive(context.Background()) + Expect(err).To(BeNil()) + Expect(dc.Message()).NotTo(BeNil()) + if i < 10 { + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf("apple #%d", i))) + + } else { + Expect(fmt.Sprintf("%s", dc.Message().GetData())).To(Equal(fmt.Sprintf(" #%d", i-10))) + } + Expect(dc.Accept(context.Background())).To(BeNil()) + } + + Expect(consumerApple.Close(context.Background())).To(BeNil()) + Expect(consumerBanana.Close(context.Background())).To(BeNil()) + Expect(consumerAppleAndBanana.Close(context.Background())).To(BeNil()) + Expect(consumerAppleMatchUnfiltered.Close(context.Background())).To(BeNil()) + Expect(connection.Management().DeleteQueue(context.Background(), qName)).To(BeNil()) + Expect(connection.Close(context.Background())).To(BeNil()) }) + }) diff --git a/pkg/rabbitmq_amqp/amqp_types.go b/pkg/rabbitmq_amqp/amqp_types.go index 4729117..4dca596 100644 --- a/pkg/rabbitmq_amqp/amqp_types.go +++ b/pkg/rabbitmq_amqp/amqp_types.go @@ -23,7 +23,7 @@ type ConsumerOptions interface { // if not set it will return a random UUID linkName() string // initialCredits returns the initial credits for the link - // if not set it will return 10 + // if not set it will return 256 initialCredits() int32 // linkFilters returns the link filters for the link. @@ -33,7 +33,7 @@ type ConsumerOptions interface { func getInitialCredits(co ConsumerOptions) int32 { if co == nil || co.initialCredits() == 0 { - return 10 + return 256 } return co.initialCredits() } @@ -53,7 +53,8 @@ func (mo *managementOptions) linkName() string { } func (mo *managementOptions) initialCredits() int32 { - return 10 + // by default i 256 but here we set it to 100. For the management is enough. + return 100 } func (mo *managementOptions) linkFilters() []amqp.LinkFilter { @@ -128,7 +129,14 @@ type StreamConsumerOptions struct { ReceiverLinkName string //InitialCredits: see the ConsumerOptions interface InitialCredits int32 - Offset OffsetSpecification + // The offset specification for the stream consumer + // see the interface implementations + Offset OffsetSpecification + // Filter values. + // See: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions for more details + Filters []string + // + FilterMatchUnfiltered bool } func (sco *StreamConsumerOptions) linkName() string { @@ -140,7 +148,18 @@ func (sco *StreamConsumerOptions) initialCredits() int32 { } func (sco *StreamConsumerOptions) linkFilters() []amqp.LinkFilter { - return []amqp.LinkFilter{sco.Offset.toLinkFilter()} + var filters []amqp.LinkFilter + filters = append(filters, sco.Offset.toLinkFilter()) + if sco.Filters != nil { + l := []any{} + for _, f := range sco.Filters { + l = append(l, f) + } + + filters = append(filters, amqp.NewLinkFilter(rmqStreamFilter, 0, l)) + filters = append(filters, amqp.NewLinkFilter(rmqStreamMatchUnfiltered, 0, sco.FilterMatchUnfiltered)) + } + return filters } ///// ProducerOptions ///// diff --git a/pkg/rabbitmq_amqp/amqp_utils.go b/pkg/rabbitmq_amqp/amqp_utils.go index e360d76..888f981 100644 --- a/pkg/rabbitmq_amqp/amqp_utils.go +++ b/pkg/rabbitmq_amqp/amqp_utils.go @@ -52,7 +52,7 @@ func createReceiverLinkOptions(address string, options ConsumerOptions, delivery receiverSettleMode = amqp.SenderSettleModeUnsettled.Ptr() } - return &amqp.ReceiverOptions{ + result := &amqp.ReceiverOptions{ TargetAddress: address, DynamicAddress: false, Name: getLinkName(options), @@ -65,6 +65,7 @@ func createReceiverLinkOptions(address string, options ConsumerOptions, delivery Credit: getInitialCredits(options), Filters: getLinkFilters(options), } + return result } func random(max int) int {