Skip to content

Commit 12fa11f

Browse files
authored
Added WithRetryPeriodFunc option (#63)
* Added WithRetryPeriodFunc option * Satisfy linter * Switched to simple implementation * atomic operations * Revert "atomic operations" This reverts commit b94219c. * Remove defer
1 parent 350c562 commit 12fa11f

File tree

2 files changed

+49
-34
lines changed

2 files changed

+49
-34
lines changed

consumer/consumer.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ type Consumer struct {
4747

4848
worker Worker
4949

50-
retryPeriod time.Duration
51-
initFunc func(conn AMQPConnection) (AMQPChannel, error)
52-
ctx context.Context
53-
cancelFunc context.CancelFunc
54-
logger logger.Logger
55-
closeCh chan struct{}
50+
nextRetryPeriod func(attemptNumber int) time.Duration
51+
initFunc func(conn AMQPConnection) (AMQPChannel, error)
52+
ctx context.Context
53+
cancelFunc context.CancelFunc
54+
logger logger.Logger
55+
closeCh chan struct{}
5656

5757
mu sync.Mutex
5858
stateChs []chan State
@@ -79,6 +79,8 @@ type Consumer struct {
7979
noLocal bool
8080
noWait bool
8181
args amqp.Table
82+
83+
retryCounter int
8284
}
8385

8486
func New(
@@ -113,8 +115,10 @@ func New(
113115
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
114116
}
115117

116-
if c.retryPeriod == 0 {
117-
c.retryPeriod = time.Second * 5
118+
if c.nextRetryPeriod == nil {
119+
c.nextRetryPeriod = func(_ int) time.Duration {
120+
return time.Second * 5
121+
}
118122
}
119123

120124
if c.logger == nil {
@@ -157,8 +161,14 @@ func WithContext(ctx context.Context) Option {
157161
}
158162

159163
func WithRetryPeriod(dur time.Duration) Option {
164+
return WithRetryPeriodFunc(func(_ int) time.Duration {
165+
return dur
166+
})
167+
}
168+
169+
func WithRetryPeriodFunc(durFunc func(retryCount int) time.Duration) Option {
160170
return func(c *Consumer) {
161-
c.retryPeriod = dur
171+
c.nextRetryPeriod = durFunc
162172
}
163173
}
164174

@@ -384,7 +394,8 @@ func (c *Consumer) consumeState(ch AMQPChannel, queue string, connCloseCh <-chan
384394
c.logger.Printf("[DEBUG] consumer ready")
385395

386396
state := c.notifyReady(queue)
387-
397+
c.retryCounter = 0
398+
388399
go func() {
389400
defer close(workerDoneCh)
390401
c.worker.Serve(workerCtx, c.handler, msgCh)
@@ -419,14 +430,17 @@ func (c *Consumer) consumeState(ch AMQPChannel, queue string, connCloseCh <-chan
419430
}
420431

421432
func (c *Consumer) waitRetry(err error) error {
422-
timer := time.NewTimer(c.retryPeriod)
433+
timer := time.NewTimer(c.nextRetryPeriod(c.retryCounter))
423434
defer func() {
424435
timer.Stop()
425436
select {
426437
case <-timer.C:
427438
default:
428439
}
429440
}()
441+
442+
c.retryCounter++
443+
430444
state := c.notifyUnready(err)
431445
for {
432446
select {

consumer/mock_consumer/mocks.go

Lines changed: 24 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)