Skip to content

Commit 0ccaa1e

Browse files
committed
- max delivery count
- minid, limit parameters support in the produces level - minid, limit parameters support in the enqueue level
1 parent 70e2b0e commit 0ccaa1e

File tree

6 files changed

+119
-16
lines changed

6 files changed

+119
-16
lines changed

consumer.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redisqueue
22

33
import (
44
"context"
5+
"log/slog"
56
"net"
67
"os"
78
"sync"
@@ -53,6 +54,12 @@ type ConsumerOptions struct {
5354
BufferSize int
5455
// Concurrency dictates how many goroutines to spawn to handle the messages.
5556
Concurrency int
57+
58+
// MaxDeliveryCount is the maximum number of times a message can be delivered
59+
// before it is considered failed. If this is set to 0, the message will be
60+
// retried indefinitely
61+
MaxDeliveryCount int64
62+
5663
// RedisClient supersedes the RedisOptions field, and allows you to inject
5764
// an already-made Redis Client for use in the consumer. This may be either
5865
// the standard client or a cluster client.
@@ -276,6 +283,15 @@ func (c *Consumer) reclaim(ctx context.Context) {
276283
msgs := make([]string, 0)
277284

278285
for _, r := range res {
286+
slog.Info("pending message", "id", r.ID, "count", r.RetryCount, "max", c.options.MaxDeliveryCount)
287+
if c.options.MaxDeliveryCount > 0 && r.RetryCount >= c.options.MaxDeliveryCount {
288+
slog.Info("message exceeded delivery count limit", "id", r.ID, "count", r.RetryCount, "max", c.options.MaxDeliveryCount)
289+
err = c.redis.XAck(ctx, stream, c.options.GroupName, r.ID).Err()
290+
if err != nil {
291+
c.Errors <- errors.Wrapf(err, "error acknowledging after retry count exceeded for %q stream and %q message, ", stream, r.ID)
292+
continue
293+
}
294+
}
279295
if r.Idle >= c.options.VisibilityTimeout {
280296
claimres, err := c.redis.XClaim(ctx, &redis.XClaimArgs{
281297
Stream: stream,
@@ -331,7 +347,7 @@ func (c *Consumer) poll(ctx context.Context) {
331347
case <-c.stopPoll:
332348
// once the polling has stopped (i.e. there will be no more messages
333349
// put onto c.queue), stop all of the workers
334-
for i := 0; i < c.options.Concurrency; i++ {
350+
for range c.options.Concurrency {
335351
c.stopWorkers <- struct{}{}
336352
}
337353
return

consumer_test.go

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package redisqueue
22

33
import (
44
"context"
5+
"fmt"
6+
"log/slog"
57
"os"
68
"testing"
79
"time"
@@ -249,6 +251,67 @@ func TestRun(t *testing.T) {
249251
c.Run(ctx)
250252
})
251253

254+
t.Run("reclaims pending messages maximum MaxDeliveryCount", func(tt *testing.T) {
255+
ctx := context.Background()
256+
var maxDeliveryCount int64 = 3
257+
var visibilityTimeout time.Duration = 5 * time.Millisecond
258+
var reclaimInterval time.Duration = 1 * time.Millisecond
259+
260+
// create a consumer
261+
c, err := NewConsumerWithOptions(ctx, &ConsumerOptions{
262+
VisibilityTimeout: visibilityTimeout,
263+
BlockingTimeout: 10 * time.Millisecond,
264+
ReclaimInterval: 1 * time.Millisecond,
265+
BufferSize: 100,
266+
Concurrency: 10,
267+
MaxDeliveryCount: maxDeliveryCount,
268+
})
269+
require.NoError(tt, err)
270+
271+
// create a producer
272+
p, err := NewProducer()
273+
require.NoError(tt, err)
274+
275+
// create consumer group
276+
c.redis.XGroupDestroy(ctx, tt.Name(), c.options.GroupName)
277+
c.redis.XGroupCreateMkStream(ctx, tt.Name(), c.options.GroupName, "$")
278+
279+
// enqueue a message
280+
msg := &Message{
281+
Stream: tt.Name(),
282+
Values: map[string]interface{}{"test": "value"},
283+
}
284+
err = p.Enqueue(ctx, msg)
285+
require.NoError(tt, err)
286+
287+
var deliveryCount int64 = 0
288+
289+
// register a handler that will assert the message and then shut down
290+
// the consumer
291+
c.Register(tt.Name(), func(m *Message) error {
292+
slog.Info("message received", "id", m.ID, "delivery count", deliveryCount)
293+
294+
deliveryCount++
295+
assert.Equal(tt, msg.ID, m.ID)
296+
return fmt.Errorf("dummy error")
297+
})
298+
299+
// // watch for consumer errors
300+
// go func() {
301+
// <-c.Errors
302+
// }()
303+
304+
// run the consumer
305+
go c.Run(ctx)
306+
307+
// wait for more than VisibilityTimeout + (ReclaimInterval*number_higher_than_max_dlivery_count) to ensure that
308+
// the message was reclaimed more than MaxDeliveryCount
309+
310+
time.Sleep(visibilityTimeout + (reclaimInterval * 10) + 1*time.Millisecond)
311+
c.Shutdown()
312+
assert.Equal(tt, maxDeliveryCount, deliveryCount)
313+
})
314+
252315
t.Run("doesn't reclaim if there is no VisibilityTimeout set", func(tt *testing.T) {
253316
ctx := context.Background()
254317

@@ -263,8 +326,8 @@ func TestRun(t *testing.T) {
263326

264327
// create a producer
265328
p, err := NewProducerWithOptions(ctx, &ProducerOptions{
266-
StreamMaxLength: 2,
267-
ApproximateMaxLength: false,
329+
StreamMaxLength: 2,
330+
UseApproximate: false,
268331
})
269332
require.NoError(tt, err)
270333

@@ -345,8 +408,8 @@ func TestRun(t *testing.T) {
345408

346409
// create a producer
347410
p, err := NewProducerWithOptions(ctx, &ProducerOptions{
348-
StreamMaxLength: 1,
349-
ApproximateMaxLength: false,
411+
StreamMaxLength: 1,
412+
UseApproximate: false,
350413
})
351414
require.NoError(tt, err)
352415

message.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ package redisqueue
44
// When enqueuing, it's recommended to leave ID empty and let Redis generate it,
55
// unless you know what you're doing.
66
type Message struct {
7-
ID string
8-
Stream string
9-
Values map[string]interface{}
7+
ID string
8+
Stream string
9+
StreamMaxLength int64
10+
StreamMinId string
11+
TrimLimit int64
12+
Values map[string]interface{}
1013
}

mise.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[tools]
2+
go = "1.23"

producer.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redisqueue
22

33
import (
4+
"cmp"
45
"context"
56

67
"github.com/redis/go-redis/v9"
@@ -17,10 +18,20 @@ type ProducerOptions struct {
1718
// So ideally, you'll set this number to be as high as you can makee it.
1819
// More info here: https://redis.io/commands/xadd#capped-streams.
1920
StreamMaxLength int64
20-
// ApproximateMaxLength determines whether to use the ~ with the MAXLEN
21+
22+
// StreamMinId sets the minimum ID that will be used when calling XADD. This
23+
// is useful when you want to ensure that the stream is trimmed to a certain
24+
// point. More info here: https://redis.io/commands/xadd#capped-streams.
25+
StreamMinId string
26+
27+
// UseApproximate determines whether to use the ~ with the MAXLEN and MINID
2128
// option. This allows the stream trimming to done in a more efficient
2229
// manner. More info here: https://redis.io/commands/xadd#capped-streams.
23-
ApproximateMaxLength bool
30+
UseApproximate bool
31+
32+
// TrimLimit sets LIMIT for XADD
33+
TrimLimit int64
34+
2435
// RedisClient supersedes the RedisOptions field, and allows you to inject
2536
// an already-made Redis Client for use in the consumer. This may be either
2637
// the standard client or a cluster client.
@@ -41,12 +52,12 @@ type Producer struct {
4152
}
4253

4354
var defaultProducerOptions = &ProducerOptions{
44-
StreamMaxLength: 1000,
45-
ApproximateMaxLength: true,
55+
StreamMaxLength: 1000,
56+
UseApproximate: true,
4657
}
4758

4859
// NewProducer uses a default set of options to create a Producer. It sets
49-
// StreamMaxLength to 1000 and ApproximateMaxLength to true. In most production
60+
// StreamMaxLength to 1000 and UseApproximate to true. In most production
5061
// environments, you'll want to use NewProducerWithOptions.
5162
func NewProducer() (*Producer, error) {
5263
return NewProducerWithOptions(context.Background(), defaultProducerOptions)
@@ -77,12 +88,20 @@ func NewProducerWithOptions(ctx context.Context, options *ProducerOptions) (*Pro
7788
// should let Redis auto-generate the ID. If an ID is auto-generated, it will be
7889
// set on msg.ID for your reference. msg.Values is also required.
7990
func (p *Producer) Enqueue(ctx context.Context, msg *Message) error {
91+
maxLen := cmp.Or(msg.StreamMaxLength, p.options.StreamMaxLength)
92+
minId := cmp.Or(msg.StreamMinId, p.options.StreamMinId)
93+
if maxLen > 0 {
94+
minId = ""
95+
}
96+
8097
args := &redis.XAddArgs{
8198
ID: msg.ID,
8299
Stream: msg.Stream,
83100
Values: msg.Values,
84-
MaxLen: p.options.StreamMaxLength,
85-
Approx: p.options.ApproximateMaxLength,
101+
MaxLen: maxLen,
102+
MinID: minId,
103+
Approx: p.options.UseApproximate,
104+
Limit: cmp.Or(msg.TrimLimit, p.options.TrimLimit),
86105
}
87106
id, err := p.redis.XAdd(ctx, args).Result()
88107
if err != nil {

producer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestEnqueue(t *testing.T) {
6767

6868
t.Run("bubbles up errors", func(tt *testing.T) {
6969
ctx := context.Background()
70-
p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{ApproximateMaxLength: true})
70+
p, err := NewProducerWithOptions(context.Background(), &ProducerOptions{UseApproximate: true})
7171
require.NoError(t, err)
7272

7373
msg := &Message{

0 commit comments

Comments
 (0)