Skip to content

Commit 0f20aff

Browse files
authored
Merge pull request wagslane#26 from victorct-pronto/feature/stop_consuming
Make the property AutoAcknowledge available to be changed, and added function to stop consumers.
2 parents f115a8d + e946a18 commit 0f20aff

File tree

2 files changed

+31
-5
lines changed

2 files changed

+31
-5
lines changed

consume.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@ package rabbitmq
22

33
import (
44
"fmt"
5-
"time"
6-
75
"github.com/streadway/amqp"
6+
"time"
87
)
98

109
// Consumer allows you to create and connect to queues for data consumption.
@@ -107,13 +106,32 @@ func (consumer Consumer) StartConsuming(
107106
return nil
108107
}
109108

110-
// StopConsuming stops the consumption of messages.
111-
// The consumer should be discarded as it's not safe for re-use
112-
func (consumer Consumer) StopConsuming() {
109+
// Disconnect disconnects both the channel and the connection.
110+
// This method doesn't throw a reconnect, and should be used when finishing a program.
111+
// IMPORTANT: If this method is executed before StopConsuming, it could cause unexpected behavior
112+
// such as messages being processed, but not being acknowledged, thus being requeued by the broker
113+
func (consumer Consumer) Disconnect() {
113114
consumer.chManager.channel.Close()
114115
consumer.chManager.connection.Close()
115116
}
116117

118+
// StopConsuming stops the consumption of messages.
119+
// The consumer should be discarded as it's not safe for re-use.
120+
// This method sends a basic.cancel notification.
121+
// The consumerName is the name or delivery tag of the amqp consumer we want to cancel.
122+
// When noWait is true, do not wait for the server to acknowledge the cancel.
123+
// Only use this when you are certain there are no deliveries in flight that
124+
// require an acknowledgment, otherwise they will arrive and be dropped in the
125+
// client without an ack, and will not be redelivered to other consumers.
126+
// IMPORTANT: Since the streadway library doesn't provide a way to retrieve the consumer's tag after the creation
127+
// it's imperative for you to set the name when creating the consumer, if you want to use this function later
128+
// a simple uuid4 should do the trick, since it should be unique.
129+
// If you start many consumers, you should store the name of the consumers when creating them, such that you can
130+
// use them in a for to stop all the consumers.
131+
func (consumer Consumer) StopConsuming(consumerName string, noWait bool) {
132+
consumer.chManager.channel.Cancel(consumerName, noWait)
133+
}
134+
117135
// startGoroutinesWithRetries attempts to start consuming on a channel
118136
// with an exponential backoff
119137
func (consumer Consumer) startGoroutinesWithRetries(

consume_options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
189189
}
190190
}
191191

192+
// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
193+
// if unset the default will be used (false)
194+
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {
195+
return func(options *ConsumeOptions) {
196+
options.ConsumerAutoAck = autoAck
197+
}
198+
}
199+
192200
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
193201
// the server will ensure that this is the sole consumer
194202
// from this queue. When exclusive is false, the server will fairly distribute

0 commit comments

Comments
 (0)