Skip to content

Commit a8704c3

Browse files
authored
let XReadGroup skip empty message and process next message (#1243)
* let XReadGroup skip empty message and process next message
1 parent 8a0ab1a commit a8704c3

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

command.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1004,10 +1004,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
10041004
}
10051005

10061006
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
1007-
if err != nil {
1007+
if err != nil && err != proto.Nil {
10081008
return nil, err
10091009
}
10101010

1011+
if v == nil || err == proto.Nil {
1012+
v = make(map[string]interface{})
1013+
}
1014+
10111015
msgs[i] = XMessage{
10121016
ID: id,
10131017
Values: v.(map[string]interface{}),

commands_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3623,6 +3623,27 @@ var _ = Describe("Commands", func() {
36233623
Expect(n).To(Equal(int64(1)))
36243624
})
36253625

3626+
It("should XReadGroup skip empty", func() {
3627+
n, err := client.XDel("stream", "2-0").Result()
3628+
Expect(err).NotTo(HaveOccurred())
3629+
Expect(n).To(Equal(int64(1)))
3630+
3631+
res, err := client.XReadGroup(&redis.XReadGroupArgs{
3632+
Group: "group",
3633+
Consumer: "consumer",
3634+
Streams: []string{"stream", "0"},
3635+
}).Result()
3636+
Expect(err).NotTo(HaveOccurred())
3637+
Expect(res).To(Equal([]redis.XStream{{
3638+
Stream: "stream",
3639+
Messages: []redis.XMessage{
3640+
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
3641+
{ID: "2-0", Values: map[string]interface{}{}},
3642+
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
3643+
}},
3644+
}))
3645+
})
3646+
36263647
It("should XGroupCreateMkStream", func() {
36273648
err := client.XGroupCreateMkStream("stream2", "group", "0").Err()
36283649
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)