diff --git a/commands_test.go b/commands_test.go index 19548e134..9e1300893 100644 --- a/commands_test.go +++ b/commands_test.go @@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(3))) }) + It("should XTrimMaxLenMode", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 0)) + }) + + It("should XTrimMaxLenApproxMode", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 0)) + }) + + It("should XTrimMinIDMode", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 0)) + }) + + It("should XTrimMinIDApproxMode", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 0)) + }) + It("should XAdd", func() { id, err := client.XAdd(ctx, &redis.XAddArgs{ Stream: "stream", @@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() { Expect(n).To(Equal(int64(3))) }) + It("should XAckDel", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + // First, create a consumer group + err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + // Read messages to create pending entries + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "testgroup", + Consumer: "testconsumer", + Streams: []string{"stream", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + // Test XAckDel with KEEPREF mode + n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(HaveLen(2)) + + // Clean up + client.XGroupDestroy(ctx, "stream", "testgroup") + }) + + It("should XDelEx", func() { + SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images") + // Test XDelEx with KEEPREF mode + n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(HaveLen(2)) + }) + It("should XLen", func() { n, err := client.XLen(ctx, "stream").Result() Expect(err).NotTo(HaveOccurred()) diff --git a/stream_commands.go b/stream_commands.go index 6d7b22922..4b84e00fd 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -7,7 +7,9 @@ import ( type StreamCmdable interface { XAdd(ctx context.Context, a *XAddArgs) *StringCmd + XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd XDel(ctx context.Context, stream string, ids ...string) *IntCmd + XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd XLen(ctx context.Context, stream string) *IntCmd XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd @@ -31,8 +33,12 @@ type StreamCmdable interface { XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd + XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd + XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd XTrimMinID(ctx context.Context, key string, minID string) *IntCmd XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd + XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd + XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd XInfoStream(ctx context.Context, key string) *XInfoStreamCmd XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd @@ -54,6 +60,7 @@ type XAddArgs struct { // Approx causes MaxLen and MinID to use "~" matcher (instead of "="). Approx bool Limit int64 + Mode string ID string Values interface{} } @@ -81,6 +88,11 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd { if a.Limit > 0 { args = append(args, "limit", a.Limit) } + + if a.Mode != "" { + args = append(args, a.Mode) + } + if a.ID != "" { args = append(args, a.ID) } else { @@ -93,6 +105,16 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd { return cmd } +func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd { + args := []interface{}{"xackdel", stream, group, mode, "ids", len(ids)} + for _, id := range ids { + args = append(args, id) + } + cmd := NewSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd { args := []interface{}{"xdel", stream} for _, id := range ids { @@ -103,6 +125,16 @@ func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd return cmd } +func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd { + args := []interface{}{"xdelex", stream, mode, "ids", len(ids)} + for _, id := range ids { + args = append(args, id) + } + cmd := NewSliceCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd { cmd := NewIntCmd(ctx, "xlen", stream) _ = c(ctx, cmd) @@ -375,6 +407,8 @@ func xClaimArgs(a *XClaimArgs) []interface{} { return args } +// TODO: refactor xTrim, xTrimMode and the wrappers over the functions + // xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default). // example: // @@ -418,6 +452,42 @@ func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, return c.xTrim(ctx, key, "minid", true, minID, limit) } +func (c cmdable) xTrimMode( + ctx context.Context, key, strategy string, + approx bool, threshold interface{}, limit int64, + mode string, +) *IntCmd { + args := make([]interface{}, 0, 7) + args = append(args, "xtrim", key, strategy) + if approx { + args = append(args, "~") + } + args = append(args, threshold) + if limit > 0 { + args = append(args, "limit", limit) + } + args = append(args, mode) + cmd := NewIntCmd(ctx, args...) + _ = c(ctx, cmd) + return cmd +} + +func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd { + return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode) +} + +func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd { + return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode) +} + +func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd { + return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode) +} + +func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd { + return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode) +} + func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd { cmd := NewXInfoConsumersCmd(ctx, key, group) _ = c(ctx, cmd)