Skip to content

Added new stream commands #3450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMaxLenMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMaxLenApproxMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMinIDMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMinIDApproxMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XAdd", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
Expand Down Expand Up @@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

It("should XAckDel", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
// First, create a consumer group
err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err()
Expect(err).NotTo(HaveOccurred())

// Read messages to create pending entries
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "testgroup",
Consumer: "testconsumer",
Streams: []string{"stream", ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Test XAckDel with KEEPREF mode
n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(HaveLen(2))

// Clean up
client.XGroupDestroy(ctx, "stream", "testgroup")
})

It("should XDelEx", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
// Test XDelEx with KEEPREF mode
n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(HaveLen(2))
})

It("should XLen", func() {
n, err := client.XLen(ctx, "stream").Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
70 changes: 70 additions & 0 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

type StreamCmdable interface {
XAdd(ctx context.Context, a *XAddArgs) *StringCmd
XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd
XDel(ctx context.Context, stream string, ids ...string) *IntCmd
XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd
XLen(ctx context.Context, stream string) *IntCmd
XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
Expand All @@ -31,8 +33,12 @@ type StreamCmdable interface {
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
Expand All @@ -54,6 +60,7 @@ type XAddArgs struct {
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
Limit int64
Mode string
ID string
Values interface{}
}
Expand Down Expand Up @@ -81,6 +88,11 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
if a.Limit > 0 {
args = append(args, "limit", a.Limit)
}

if a.Mode != "" {
args = append(args, a.Mode)
}

if a.ID != "" {
args = append(args, a.ID)
} else {
Expand All @@ -93,6 +105,16 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
return cmd
}

func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd {
args := []interface{}{"xackdel", stream, group, mode, "ids", len(ids)}
for _, id := range ids {
args = append(args, id)
}
cmd := NewSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
args := []interface{}{"xdel", stream}
for _, id := range ids {
Expand All @@ -103,6 +125,16 @@ func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd
return cmd
}

func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd {
args := []interface{}{"xdelex", stream, mode, "ids", len(ids)}
for _, id := range ids {
args = append(args, id)
}
cmd := NewSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
cmd := NewIntCmd(ctx, "xlen", stream)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -375,6 +407,8 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
return args
}

// TODO: refactor xTrim, xTrimMode and the wrappers over the functions

// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
// example:
//
Expand Down Expand Up @@ -418,6 +452,42 @@ func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string,
return c.xTrim(ctx, key, "minid", true, minID, limit)
}

func (c cmdable) xTrimMode(
ctx context.Context, key, strategy string,
approx bool, threshold interface{}, limit int64,
mode string,
) *IntCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xtrim", key, strategy)
if approx {
args = append(args, "~")
}
args = append(args, threshold)
if limit > 0 {
args = append(args, "limit", limit)
}
args = append(args, mode)
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
}

func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
}

func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
}

func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
}

func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
cmd := NewXInfoConsumersCmd(ctx, key, group)
_ = c(ctx, cmd)
Expand Down
Loading