Skip to content

Commit a3020e2

Browse files
committed
bugfix: NackRedeliveryDelay may cause unexpected retry
1 parent 8dd18cd commit a3020e2

File tree

1 file changed

+23
-5
lines changed

1 file changed

+23
-5
lines changed

delayqueue.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) {
332332
Intercepted: true,
333333
State: StateReady,
334334
}, nil
335-
}
335+
}
336336
// try to intercept at pending
337337
removed, err = q.redisCli.ZRem(q.pendingKey, []string{id})
338338
if err != nil {
@@ -349,7 +349,7 @@ func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) {
349349
// message may be being consumed or has been successfully consumed
350350
// if the message has been successfully consumed, the following action will cause nothing
351351
// if the message is being consumed,the following action will prevent it from being retried
352-
q.redisCli.HDel(q.retryCountKey, []string{id})
352+
q.redisCli.HDel(q.retryCountKey, []string{id})
353353
q.redisCli.LRem(q.retryKey, 0, id)
354354

355355
return &InterceptResult{
@@ -512,11 +512,29 @@ func (q *DelayQueue) ack(idStr string) error {
512512
return nil
513513
}
514514

515+
// updateZSetScoreScript update score of a zset member if it exists
516+
// KEYS[1]: zset
517+
// ARGV[1]: score
518+
// ARGV[2]: member
519+
const updateZSetScoreScript = `
520+
if redis.call('zrank', KEYS[1], ARGV[2]) ~= nil then
521+
return redis.call('zadd', KEYS[1], ARGV[1], ARGV[2])
522+
else
523+
return 0
524+
end
525+
`
526+
527+
func (q *DelayQueue) updateZSetScore(key string, score float64, member string) error {
528+
scoreStr := strconv.FormatFloat(score, 'f', -1, 64)
529+
_, err := q.eval(updateZSetScoreScript, []string{key}, []interface{}{scoreStr, member})
530+
return err
531+
}
532+
515533
func (q *DelayQueue) nack(idStr string) error {
516534
atomic.AddInt32(&q.fetchCount, -1)
517-
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
518-
idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()),
519-
})
535+
retryTime := float64(time.Now().Add(q.nackRedeliveryDelay).Unix())
536+
// if message consumption has not reach deadlin (still in unAckKey), then update its retry time
537+
err := q.updateZSetScore(q.unAckKey, retryTime, idStr)
520538
if err != nil {
521539
return fmt.Errorf("negative ack failed: %v", err)
522540
}

0 commit comments

Comments
 (0)