Skip to content

Commit 031a2ac

Browse files
authored
add stream support (#24)
* Implement the stream support and filtering for the stream --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 707fe72 commit 031a2ac

File tree

12 files changed

+601
-31
lines changed

12 files changed

+601
-31
lines changed

docs/examples/getting_started/main.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@ func main() {
7070
// Create a consumer to receive messages from the queue
7171
// you need to build the address of the queue, but you can use the helper function
7272

73-
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
74-
Queue: queueName,
75-
}, "getting-started-consumer")
73+
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
7674
if err != nil {
7775
rabbitmq_amqp.Error("Error creating consumer", err)
7876
return
@@ -115,7 +113,7 @@ func main() {
115113
return
116114
}
117115

118-
for i := 0; i < 1_000; i++ {
116+
for i := 0; i < 100; i++ {
119117
// Publish a message to the exchange
120118
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
121119
if err != nil {

docs/examples/reliable/reliable.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ func main() {
7777
return
7878
}
7979

80-
consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
81-
Queue: queueName,
82-
}, "reliable-consumer")
80+
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
8381
if err != nil {
8482
rabbitmq_amqp.Error("Error creating consumer", err)
8583
return

pkg/rabbitmq_amqp/amqp_connection.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,17 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAdd
8787
}
8888

8989
// NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
90-
func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddress, linkName string) (*Consumer, error) {
90+
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error) {
91+
destination := &QueueAddress{
92+
Queue: queueName,
93+
}
94+
9195
destinationAdd, err := destination.toAddress()
9296
if err != nil {
9397
return nil, err
9498
}
95-
err = validateAddress(destinationAdd)
9699

97-
return newConsumer(ctx, a, destinationAdd, linkName)
100+
return newConsumer(ctx, a, destinationAdd, options)
98101
}
99102

100103
// Dial connect to the AMQP 1.0 server using the provided connectionSettings

pkg/rabbitmq_amqp/amqp_connection_recovery_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ var _ = Describe("Recovery connection test", func() {
4343
Expect(err).To(BeNil())
4444
Expect(queueInfo).NotTo(BeNil())
4545

46-
consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{
47-
Queue: qName,
48-
}, "test")
46+
consumer, err := connection.NewConsumer(context.Background(),
47+
qName, nil)
4948

5049
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
5150
Queue: qName,

pkg/rabbitmq_amqp/amqp_consumer.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,33 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
6767
type Consumer struct {
6868
receiver atomic.Pointer[amqp.Receiver]
6969
connection *AmqpConnection
70-
linkName string
70+
options ConsumerOptions
7171
destinationAdd string
7272
id string
73+
74+
/*
75+
currentOffset is the current offset of the consumer. It is valid only for the stream consumers.
76+
it is used to keep track of the last message that was consumed by the consumer.
77+
so in case of restart the consumer can start from the last message that was consumed.
78+
For the AMQP queues it is just ignored.
79+
*/
80+
currentOffset int64
7381
}
7482

7583
func (c *Consumer) Id() string {
7684
return c.id
7785
}
7886

79-
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, linkName string, args ...string) (*Consumer, error) {
87+
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options ConsumerOptions, args ...string) (*Consumer, error) {
8088
id := fmt.Sprintf("consumer-%s", uuid.New().String())
8189
if len(args) > 0 {
8290
id = args[0]
8391
}
84-
r := &Consumer{connection: connection, linkName: linkName, destinationAdd: destinationAdd, id: id}
92+
93+
r := &Consumer{connection: connection, options: options,
94+
destinationAdd: destinationAdd,
95+
currentOffset: -1,
96+
id: id}
8597
connection.entitiesTracker.storeOrReplaceConsumer(r)
8698
err := r.createReceiver(ctx)
8799
if err != nil {
@@ -91,7 +103,24 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd
91103
}
92104

93105
func (c *Consumer) createReceiver(ctx context.Context) error {
94-
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, createReceiverLinkOptions(c.destinationAdd, c.linkName, AtLeastOnce))
106+
if c.currentOffset >= 0 {
107+
// here it means that the consumer is a stream consumer and there is a restart.
108+
// so we need to set the offset to the last consumed message in order to restart from there.
109+
// In there is not a restart this code won't be executed.
110+
if c.options != nil {
111+
// here we assume it is a stream. So we recreate the options with the offset.
112+
c.options = &StreamConsumerOptions{
113+
ReceiverLinkName: c.options.linkName(),
114+
InitialCredits: c.options.initialCredits(),
115+
// we increment the offset by one to start from the next message.
116+
// because the current was already consumed.
117+
Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)},
118+
}
119+
}
120+
}
121+
122+
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd,
123+
createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce))
95124
if err != nil {
96125
return err
97126
}
@@ -105,6 +134,12 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
105134
if err != nil {
106135
return nil, err
107136
}
137+
138+
if msg != nil && msg.Annotations != nil && msg.Annotations["x-stream-offset"] != nil {
139+
// keep track of the current offset of the consumer
140+
c.currentOffset = msg.Annotations["x-stream-offset"].(int64)
141+
}
142+
108143
return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil
109144
}
110145

0 commit comments

Comments
 (0)