Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/run-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ runs:

# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.2.x"]="8.2-M01-pre"
["8.2.x"]="8.2-RC1-pre"
["8.0.x"]="8.0.2"
["7.4.x"]="rs-7.4.0-v5"
["7.2.x"]="rs-7.2.0-v17"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

# Mapping of redis version to redis testing containers
declare -A redis_version_mapping=(
["8.2.x"]="8.2-M01-pre"
["8.2.x"]="8.2-RC1-pre"
["8.0.x"]="8.0.2"
["7.4.x"]="rs-7.4.0-v5"
)
Expand Down
59 changes: 59 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMaxLenMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMaxLenApproxMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMinIDMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XTrimMinIDApproxMode", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(BeNumerically(">=", 0))
})

It("should XAdd", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
Expand Down Expand Up @@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

It("should XAckDel", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
// First, create a consumer group
err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err()
Expect(err).NotTo(HaveOccurred())

// Read messages to create pending entries
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "testgroup",
Consumer: "testconsumer",
Streams: []string{"stream", ">"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Test XAckDel with KEEPREF mode
n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(HaveLen(2))

// Clean up
client.XGroupDestroy(ctx, "stream", "testgroup")
})

It("should XDelEx", func() {
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
// Test XDelEx with KEEPREF mode
n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(HaveLen(2))
})

It("should XLen", func() {
n, err := client.XLen(ctx, "stream").Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
10 changes: 5 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

services:
redis:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.2-RC1-pre}
platform: linux/amd64
container_name: redis-standalone
environment:
Expand All @@ -23,7 +23,7 @@ services:
- all

osscluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.2-RC1-pre}
platform: linux/amd64
container_name: redis-osscluster
environment:
Expand All @@ -40,7 +40,7 @@ services:
- all

sentinel-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.2-RC1-pre}
platform: linux/amd64
container_name: redis-sentinel-cluster
network_mode: "host"
Expand All @@ -60,7 +60,7 @@ services:
- all

sentinel:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.2-RC1-pre}
platform: linux/amd64
container_name: redis-sentinel
depends_on:
Expand All @@ -84,7 +84,7 @@ services:
- all

ring-cluster:
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:rs-7.4.0-v2}
image: ${CLIENT_LIBS_TEST_IMAGE:-redislabs/client-libs-test:8.2-RC1-pre}
platform: linux/amd64
container_name: redis-ring-cluster
environment:
Expand Down
69 changes: 69 additions & 0 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package redis

import (
"context"
"strconv"
"time"
)

type StreamCmdable interface {
XAdd(ctx context.Context, a *XAddArgs) *StringCmd
XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd
XDel(ctx context.Context, stream string, ids ...string) *IntCmd
XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd
XLen(ctx context.Context, stream string) *IntCmd
XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
Expand All @@ -31,8 +34,12 @@ type StreamCmdable interface {
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
Expand All @@ -54,6 +61,7 @@ type XAddArgs struct {
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
Approx bool
Limit int64
Mode string
ID string
Values interface{}
}
Expand Down Expand Up @@ -81,6 +89,11 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
if a.Limit > 0 {
args = append(args, "limit", a.Limit)
}

if a.Mode != "" {
args = append(args, a.Mode)
}

if a.ID != "" {
args = append(args, a.ID)
} else {
Expand All @@ -93,6 +106,16 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
return cmd
}

func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd {
args := []interface{}{"xackdel", stream, group, mode, "ids", strconv.Itoa(len(ids))}
for _, id := range ids {
args = append(args, id)
}
cmd := NewSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
args := []interface{}{"xdel", stream}
for _, id := range ids {
Expand All @@ -103,6 +126,16 @@ func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd
return cmd
}

func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd {
args := []interface{}{"xdelex", stream, mode, "ids", strconv.Itoa(len(ids))}
for _, id := range ids {
args = append(args, id)
}
cmd := NewSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
cmd := NewIntCmd(ctx, "xlen", stream)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -418,6 +451,42 @@ func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string,
return c.xTrim(ctx, key, "minid", true, minID, limit)
}

func (c cmdable) xTrimMode(
ctx context.Context, key, strategy string,
approx bool, threshold interface{}, limit int64,
mode string,
) *IntCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xtrim", key, strategy)
if approx {
args = append(args, "~")
}
args = append(args, threshold)
if limit > 0 {
args = append(args, "limit", limit)
}
args = append(args, mode)
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
}

func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
}

func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
}

func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
}

func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
cmd := NewXInfoConsumersCmd(ctx, key, group)
_ = c(ctx, cmd)
Expand Down
Loading