Skip to content

Commit b3c33d6

Browse files
added new stream commands
1 parent 23a87a2 commit b3c33d6

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

commands_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() {
61696169
Expect(n).To(Equal(int64(3)))
61706170
})
61716171

6172+
It("should XTrimMaxLenMode", func() {
6173+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6174+
n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result()
6175+
Expect(err).NotTo(HaveOccurred())
6176+
Expect(n).To(BeNumerically(">=", 0))
6177+
})
6178+
6179+
It("should XTrimMaxLenApproxMode", func() {
6180+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6181+
n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result()
6182+
Expect(err).NotTo(HaveOccurred())
6183+
Expect(n).To(BeNumerically(">=", 0))
6184+
})
6185+
6186+
It("should XTrimMinIDMode", func() {
6187+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6188+
n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result()
6189+
Expect(err).NotTo(HaveOccurred())
6190+
Expect(n).To(BeNumerically(">=", 0))
6191+
})
6192+
6193+
It("should XTrimMinIDApproxMode", func() {
6194+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6195+
n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result()
6196+
Expect(err).NotTo(HaveOccurred())
6197+
Expect(n).To(BeNumerically(">=", 0))
6198+
})
6199+
61726200
It("should XAdd", func() {
61736201
id, err := client.XAdd(ctx, &redis.XAddArgs{
61746202
Stream: "stream",
@@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() {
62226250
Expect(n).To(Equal(int64(3)))
62236251
})
62246252

6253+
It("should XAckDel", func() {
6254+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6255+
// First, create a consumer group
6256+
err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err()
6257+
Expect(err).NotTo(HaveOccurred())
6258+
6259+
// Read messages to create pending entries
6260+
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
6261+
Group: "testgroup",
6262+
Consumer: "testconsumer",
6263+
Streams: []string{"stream", ">"},
6264+
}).Result()
6265+
Expect(err).NotTo(HaveOccurred())
6266+
6267+
// Test XAckDel with KEEPREF mode
6268+
n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result()
6269+
Expect(err).NotTo(HaveOccurred())
6270+
Expect(n).To(BeNumerically(">=", 0))
6271+
6272+
// Clean up
6273+
client.XGroupDestroy(ctx, "stream", "testgroup")
6274+
})
6275+
6276+
It("should XDelEx", func() {
6277+
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
6278+
// Test XDelEx with KEEPREF mode
6279+
n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result()
6280+
Expect(err).NotTo(HaveOccurred())
6281+
Expect(n).To(BeNumerically(">=", 0))
6282+
})
6283+
62256284
It("should XLen", func() {
62266285
n, err := client.XLen(ctx, "stream").Result()
62276286
Expect(err).NotTo(HaveOccurred())

stream_commands.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package redis
22

33
import (
44
"context"
5+
"strconv"
56
"time"
67
)
78

89
type StreamCmdable interface {
910
XAdd(ctx context.Context, a *XAddArgs) *StringCmd
11+
XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *IntCmd
1012
XDel(ctx context.Context, stream string, ids ...string) *IntCmd
13+
XDelEx(ctx context.Context, stream string, mode string, ids ...string) *IntCmd
1114
XLen(ctx context.Context, stream string) *IntCmd
1215
XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
1316
XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
@@ -31,8 +34,12 @@ type StreamCmdable interface {
3134
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
3235
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
3336
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
37+
XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
38+
XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
3439
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
3540
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
41+
XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
42+
XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
3643
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
3744
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
3845
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
@@ -54,6 +61,7 @@ type XAddArgs struct {
5461
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
5562
Approx bool
5663
Limit int64
64+
Mode string
5765
ID string
5866
Values interface{}
5967
}
@@ -81,6 +89,11 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
8189
if a.Limit > 0 {
8290
args = append(args, "limit", a.Limit)
8391
}
92+
93+
if a.Mode != "" {
94+
args = append(args, a.Mode)
95+
}
96+
8497
if a.ID != "" {
8598
args = append(args, a.ID)
8699
} else {
@@ -93,6 +106,16 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
93106
return cmd
94107
}
95108

109+
func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *IntCmd {
110+
args := []interface{}{"xdelex", stream, group, mode, "ids", strconv.Itoa(len(ids))}
111+
for _, id := range ids {
112+
args = append(args, id)
113+
}
114+
cmd := NewIntCmd(ctx, args...)
115+
_ = c(ctx, cmd)
116+
return cmd
117+
}
118+
96119
func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
97120
args := []interface{}{"xdel", stream}
98121
for _, id := range ids {
@@ -103,6 +126,16 @@ func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd
103126
return cmd
104127
}
105128

129+
func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *IntCmd {
130+
args := []interface{}{"xdelex", stream, mode, "ids", strconv.Itoa(len(ids))}
131+
for _, id := range ids {
132+
args = append(args, id)
133+
}
134+
cmd := NewIntCmd(ctx, args...)
135+
_ = c(ctx, cmd)
136+
return cmd
137+
}
138+
106139
func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
107140
cmd := NewIntCmd(ctx, "xlen", stream)
108141
_ = c(ctx, cmd)
@@ -418,6 +451,42 @@ func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string,
418451
return c.xTrim(ctx, key, "minid", true, minID, limit)
419452
}
420453

454+
func (c cmdable) xTrimMode(
455+
ctx context.Context, key, strategy string,
456+
approx bool, threshold interface{}, limit int64,
457+
mode string,
458+
) *IntCmd {
459+
args := make([]interface{}, 0, 7)
460+
args = append(args, "xtrim", key, strategy)
461+
if approx {
462+
args = append(args, "~")
463+
}
464+
args = append(args, threshold)
465+
if limit > 0 {
466+
args = append(args, "limit", limit)
467+
}
468+
args = append(args, mode)
469+
cmd := NewIntCmd(ctx, args...)
470+
_ = c(ctx, cmd)
471+
return cmd
472+
}
473+
474+
func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
475+
return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
476+
}
477+
478+
func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
479+
return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
480+
}
481+
482+
func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
483+
return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
484+
}
485+
486+
func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
487+
return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
488+
}
489+
421490
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
422491
cmd := NewXInfoConsumersCmd(ctx, key, group)
423492
_ = c(ctx, cmd)

0 commit comments

Comments
 (0)