Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,8 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
if err != nil && err != Nil {
return err
} else if err == Nil {
group.Lag = -1
}
default:
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
Expand Down
30 changes: 30 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6772,6 +6772,36 @@ var _ = Describe("Commands", func() {
}))
})

It("should return -1 for nil lag in XINFO GROUPS", func() {
_, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result()
Expect(err).NotTo(HaveOccurred())

client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
Expect(err).NotTo(HaveOccurred())
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})
Expect(err).NotTo(HaveOccurred())

err = client.XGroupCreate(ctx, "s", "g", "0").Err()
Expect(err).NotTo(HaveOccurred())
err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
Expect(err).NotTo(HaveOccurred())

client.XDel(ctx, "s", "0-2")

res, err := client.XInfoGroups(ctx, "s").Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XInfoGroup{
{
Name: "g",
Consumers: 1,
Pending: 1,
LastDeliveredID: "0-1",
EntriesRead: 1,
Lag: -1, // nil lag from Redis is reported as -1
},
}))
})

It("should XINFO CONSUMERS", func() {
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading