Skip to content

Commit f594401

Browse files
authored
support "XINFO CONSUMERS" (#1649)
* support "XINFO CONSUMERS" * add "xinfo" test
1 parent 27df231 commit f594401

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed

command.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,103 @@ func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
14451445

14461446
//------------------------------------------------------------------------------
14471447

1448+
type XInfoConsumersCmd struct {
1449+
baseCmd
1450+
val []XInfoConsumer
1451+
}
1452+
1453+
type XInfoConsumer struct {
1454+
Name string
1455+
Pending int64
1456+
Idle int64
1457+
}
1458+
1459+
var _ Cmder = (*XInfoGroupsCmd)(nil)
1460+
1461+
func NewXInfoConsumersCmd(ctx context.Context, stream string, group string) *XInfoConsumersCmd {
1462+
return &XInfoConsumersCmd{
1463+
baseCmd: baseCmd{
1464+
ctx: ctx,
1465+
args: []interface{}{"xinfo", "consumers", stream, group},
1466+
},
1467+
}
1468+
}
1469+
1470+
func (cmd *XInfoConsumersCmd) Val() []XInfoConsumer {
1471+
return cmd.val
1472+
}
1473+
1474+
func (cmd *XInfoConsumersCmd) Result() ([]XInfoConsumer, error) {
1475+
return cmd.val, cmd.err
1476+
}
1477+
1478+
func (cmd *XInfoConsumersCmd) String() string {
1479+
return cmdString(cmd, cmd.val)
1480+
}
1481+
1482+
func (cmd *XInfoConsumersCmd) readReply(rd *proto.Reader) error {
1483+
n, err := rd.ReadArrayLen()
1484+
if err != nil {
1485+
return err
1486+
}
1487+
1488+
cmd.val = make([]XInfoConsumer, n)
1489+
1490+
for i := 0; i < n; i++ {
1491+
cmd.val[i], err = readXConsumerInfo(rd)
1492+
if err != nil {
1493+
return err
1494+
}
1495+
}
1496+
1497+
return nil
1498+
}
1499+
1500+
func readXConsumerInfo(rd *proto.Reader) (XInfoConsumer, error) {
1501+
var consumer XInfoConsumer
1502+
1503+
n, err := rd.ReadArrayLen()
1504+
if err != nil {
1505+
return consumer, err
1506+
}
1507+
if n != 6 {
1508+
return consumer, fmt.Errorf("redis: got %d elements in XINFO CONSUMERS reply, wanted 6", n)
1509+
}
1510+
1511+
for i := 0; i < 3; i++ {
1512+
key, err := rd.ReadString()
1513+
if err != nil {
1514+
return consumer, err
1515+
}
1516+
1517+
val, err := rd.ReadString()
1518+
if err != nil {
1519+
return consumer, err
1520+
}
1521+
1522+
switch key {
1523+
case "name":
1524+
consumer.Name = val
1525+
case "pending":
1526+
consumer.Pending, err = strconv.ParseInt(val, 0, 64)
1527+
if err != nil {
1528+
return consumer, err
1529+
}
1530+
case "idle":
1531+
consumer.Idle, err = strconv.ParseInt(val, 0, 64)
1532+
if err != nil {
1533+
return consumer, err
1534+
}
1535+
default:
1536+
return consumer, fmt.Errorf("redis: unexpected content %s in XINFO CONSUMERS reply", key)
1537+
}
1538+
}
1539+
1540+
return consumer, nil
1541+
}
1542+
1543+
//------------------------------------------------------------------------------
1544+
14481545
type XInfoGroupsCmd struct {
14491546
baseCmd
14501547
val []XInfoGroup

commands.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,6 +1752,12 @@ func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *Int
17521752
return cmd
17531753
}
17541754

1755+
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
1756+
cmd := NewXInfoConsumersCmd(ctx, key, group)
1757+
_ = c(ctx, cmd)
1758+
return cmd
1759+
}
1760+
17551761
func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd {
17561762
cmd := NewXInfoGroupsCmd(ctx, key)
17571763
_ = c(ctx, cmd)

commands_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3891,6 +3891,111 @@ var _ = Describe("Commands", func() {
38913891
Expect(n).To(Equal(int64(2)))
38923892
})
38933893
})
3894+
3895+
Describe("xinfo", func() {
3896+
BeforeEach(func() {
3897+
err := client.XGroupCreate(ctx, "stream", "group1", "0").Err()
3898+
Expect(err).NotTo(HaveOccurred())
3899+
3900+
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
3901+
Group: "group1",
3902+
Consumer: "consumer1",
3903+
Streams: []string{"stream", ">"},
3904+
Count: 2,
3905+
}).Result()
3906+
Expect(err).NotTo(HaveOccurred())
3907+
Expect(res).To(Equal([]redis.XStream{
3908+
{
3909+
Stream: "stream",
3910+
Messages: []redis.XMessage{
3911+
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
3912+
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
3913+
},
3914+
},
3915+
}))
3916+
3917+
res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
3918+
Group: "group1",
3919+
Consumer: "consumer2",
3920+
Streams: []string{"stream", ">"},
3921+
}).Result()
3922+
Expect(err).NotTo(HaveOccurred())
3923+
Expect(res).To(Equal([]redis.XStream{
3924+
{
3925+
Stream: "stream",
3926+
Messages: []redis.XMessage{
3927+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
3928+
},
3929+
},
3930+
}))
3931+
3932+
err = client.XGroupCreate(ctx, "stream", "group2", "1-0").Err()
3933+
Expect(err).NotTo(HaveOccurred())
3934+
3935+
res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
3936+
Group: "group2",
3937+
Consumer: "consumer1",
3938+
Streams: []string{"stream", ">"},
3939+
}).Result()
3940+
Expect(err).NotTo(HaveOccurred())
3941+
Expect(res).To(Equal([]redis.XStream{
3942+
{
3943+
Stream: "stream",
3944+
Messages: []redis.XMessage{
3945+
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
3946+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
3947+
},
3948+
},
3949+
}))
3950+
})
3951+
3952+
AfterEach(func() {
3953+
n, err := client.XGroupDestroy(ctx, "stream", "group1").Result()
3954+
Expect(err).NotTo(HaveOccurred())
3955+
Expect(n).To(Equal(int64(1)))
3956+
n, err = client.XGroupDestroy(ctx, "stream", "group2").Result()
3957+
Expect(err).NotTo(HaveOccurred())
3958+
Expect(n).To(Equal(int64(1)))
3959+
})
3960+
3961+
It("should XINFO STREAM", func() {
3962+
res, err := client.XInfoStream(ctx, "stream").Result()
3963+
Expect(err).NotTo(HaveOccurred())
3964+
res.RadixTreeKeys = 0
3965+
res.RadixTreeNodes = 0
3966+
3967+
Expect(res).To(Equal(&redis.XInfoStream{
3968+
Length: 3,
3969+
RadixTreeKeys: 0,
3970+
RadixTreeNodes: 0,
3971+
Groups: 2,
3972+
LastGeneratedID: "3-0",
3973+
FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
3974+
LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
3975+
}))
3976+
})
3977+
3978+
It("should XINFO GROUPS", func() {
3979+
res, err := client.XInfoGroups(ctx, "stream").Result()
3980+
Expect(err).NotTo(HaveOccurred())
3981+
Expect(res).To(Equal([]redis.XInfoGroup{
3982+
{Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0"},
3983+
{Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0"},
3984+
}))
3985+
})
3986+
3987+
It("should XINFO CONSUMERS", func() {
3988+
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
3989+
Expect(err).NotTo(HaveOccurred())
3990+
for i := range res {
3991+
res[i].Idle = 0
3992+
}
3993+
Expect(res).To(Equal([]redis.XInfoConsumer{
3994+
{Name: "consumer1", Pending: 2, Idle: 0},
3995+
{Name: "consumer2", Pending: 1, Idle: 0},
3996+
}))
3997+
})
3998+
})
38943999
})
38954000

38964001
Describe("Geo add and radius search", func() {

0 commit comments

Comments
 (0)