Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions docs/examples/getting_started/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ func main() {
// Create a consumer to receive messages from the queue
// you need to build the address of the queue, but you can use the helper function

consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "getting-started-consumer")
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
Expand Down Expand Up @@ -115,7 +113,7 @@ func main() {
return
}

for i := 0; i < 1_000; i++ {
for i := 0; i < 100; i++ {
// Publish a message to the exchange
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions docs/examples/reliable/reliable.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func main() {
return
}

consumer, err := amqpConnection.NewConsumer(context.Background(), &rabbitmq_amqp.QueueAddress{
Queue: queueName,
}, "reliable-consumer")
consumer, err := amqpConnection.NewConsumer(context.Background(), queueName, nil)
if err != nil {
rabbitmq_amqp.Error("Error creating consumer", err)
return
Expand Down
9 changes: 6 additions & 3 deletions pkg/rabbitmq_amqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,17 @@ func (a *AmqpConnection) NewPublisher(ctx context.Context, destination TargetAdd
}

// NewConsumer creates a new Consumer that listens to the provided destination. Destination is a QueueAddress.
func (a *AmqpConnection) NewConsumer(ctx context.Context, destination *QueueAddress, linkName string) (*Consumer, error) {
func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, options ConsumerOptions) (*Consumer, error) {
destination := &QueueAddress{
Queue: queueName,
}

destinationAdd, err := destination.toAddress()
if err != nil {
return nil, err
}
err = validateAddress(destinationAdd)

return newConsumer(ctx, a, destinationAdd, linkName)
return newConsumer(ctx, a, destinationAdd, options)
}

// Dial connect to the AMQP 1.0 server using the provided connectionSettings
Expand Down
5 changes: 2 additions & 3 deletions pkg/rabbitmq_amqp/amqp_connection_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ var _ = Describe("Recovery connection test", func() {
Expect(err).To(BeNil())
Expect(queueInfo).NotTo(BeNil())

consumer, err := connection.NewConsumer(context.Background(), &QueueAddress{
Queue: qName,
}, "test")
consumer, err := connection.NewConsumer(context.Background(),
qName, nil)

publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{
Queue: qName,
Expand Down
43 changes: 39 additions & 4 deletions pkg/rabbitmq_amqp/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,33 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
type Consumer struct {
receiver atomic.Pointer[amqp.Receiver]
connection *AmqpConnection
linkName string
options ConsumerOptions
destinationAdd string
id string

/*
currentOffset is the current offset of the consumer. It is valid only for the stream consumers.
it is used to keep track of the last message that was consumed by the consumer.
so in case of restart the consumer can start from the last message that was consumed.
For the AMQP queues it is just ignored.
*/
currentOffset int64
}

func (c *Consumer) Id() string {
return c.id
}

func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, linkName string, args ...string) (*Consumer, error) {
func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd string, options ConsumerOptions, args ...string) (*Consumer, error) {
id := fmt.Sprintf("consumer-%s", uuid.New().String())
if len(args) > 0 {
id = args[0]
}
r := &Consumer{connection: connection, linkName: linkName, destinationAdd: destinationAdd, id: id}

r := &Consumer{connection: connection, options: options,
destinationAdd: destinationAdd,
currentOffset: -1,
id: id}
connection.entitiesTracker.storeOrReplaceConsumer(r)
err := r.createReceiver(ctx)
if err != nil {
Expand All @@ -91,7 +103,24 @@ func newConsumer(ctx context.Context, connection *AmqpConnection, destinationAdd
}

func (c *Consumer) createReceiver(ctx context.Context) error {
receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd, createReceiverLinkOptions(c.destinationAdd, c.linkName, AtLeastOnce))
if c.currentOffset >= 0 {
// here it means that the consumer is a stream consumer and there is a restart.
// so we need to set the offset to the last consumed message in order to restart from there.
// In there is not a restart this code won't be executed.
if c.options != nil {
// here we assume it is a stream. So we recreate the options with the offset.
c.options = &StreamConsumerOptions{
ReceiverLinkName: c.options.linkName(),
InitialCredits: c.options.initialCredits(),
// we increment the offset by one to start from the next message.
// because the current was already consumed.
Offset: &OffsetValue{Offset: uint64(c.currentOffset + 1)},
}
}
}

receiver, err := c.connection.session.NewReceiver(ctx, c.destinationAdd,
createReceiverLinkOptions(c.destinationAdd, c.options, AtLeastOnce))
if err != nil {
return err
}
Expand All @@ -105,6 +134,12 @@ func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
if err != nil {
return nil, err
}

if msg != nil && msg.Annotations != nil && msg.Annotations["x-stream-offset"] != nil {
// keep track of the current offset of the consumer
c.currentOffset = msg.Annotations["x-stream-offset"].(int64)
}

return &DeliveryContext{receiver: c.receiver.Load(), message: msg}, nil
}

Expand Down
Loading