Skip to content

Commit 828cdbe

Browse files
committed
support NackRedeliveryDelay
1 parent 04a15cb commit 828cdbe

File tree

4 files changed

+30
-3
lines changed

4 files changed

+30
-3
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,15 @@ use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg
192192
queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3))
193193
```
194194

195+
### Nack Redelivery Delay
196+
197+
```go
198+
WithNackRedeliveryDelay(d time.Duration) *DelayQueue
199+
```
200+
201+
WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
202+
But if consumption exceeded deadline, the message will be redelivered immediately.
203+
195204
### Script Preload
196205

197206
```go

README_CN.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ WithDefaultRetryCount(count uint)
183183
queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3))
184184
```
185185

186+
### 设置 nack 后重试间隔
187+
188+
```go
189+
WithNackRedeliveryDelay(d time.Duration) *DelayQueue
190+
```
191+
192+
WithNackRedeliveryDelay 可以设置 nack (callback 函数返回 false) 之后到重新投递的间隔。
193+
但是如果消费超时,消息会被立即重新投递。
194+
186195
### 预加载脚本
187196

188197
```go

delayqueue.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ type DelayQueue struct {
4343
// for batch consume
4444
consumeBuffer chan string
4545

46-
eventListener EventListener
46+
eventListener EventListener
47+
nackRedeliveryDelay time.Duration
4748
}
4849

4950
// NilErr represents redis nil
@@ -205,6 +206,13 @@ func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
205206
return q
206207
}
207208

209+
// WithNackRedeliveryDelay customizes the interval between redelivery and nack (callback returns false)
210+
// If consumption exceeded deadline, the message will be redelivered immediately
211+
func (q *DelayQueue) WithNackRedeliveryDelay(d time.Duration) *DelayQueue {
212+
q.nackRedeliveryDelay = d
213+
return q
214+
}
215+
208216
func (q *DelayQueue) genMsgKey(idStr string) string {
209217
if q.useHashTag {
210218
return "{dp:" + q.name + "}" + ":msg:" + idStr
@@ -427,7 +435,7 @@ func (q *DelayQueue) nack(idStr string) error {
427435
atomic.AddInt32(&q.fetchCount, -1)
428436
// update retry time as now, unack2Retry will move it to retry immediately
429437
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
430-
idStr: float64(time.Now().Unix()),
438+
idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()),
431439
})
432440
if err != nil {
433441
return fmt.Errorf("negative ack failed: %v", err)

delayqueue_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ func TestDelayQueue_consume(t *testing.T) {
3030
WithFetchInterval(time.Millisecond * 50).
3131
WithMaxConsumeDuration(0).
3232
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
33-
WithFetchLimit(2)
33+
WithFetchLimit(2).
34+
WithNackRedeliveryDelay(time.Second)
3435

3536
for i := 0; i < size; i++ {
3637
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))

0 commit comments

Comments
 (0)