Skip to content

Commit 3b2dd4f

Browse files
authored
Merge pull request #1503 from icholy/xinfo_stream
Add XINFO STREAM
2 parents 2b9cfd3 + a2b0227 commit 3b2dd4f

File tree

2 files changed

+127
-25
lines changed

2 files changed

+127
-25
lines changed

command.go

Lines changed: 120 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,34 +1081,43 @@ func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
10811081
return nil
10821082
}
10831083

1084-
// xMessageSliceParser implements proto.MultiBulkParse.
1085-
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
1086-
msgs := make([]XMessage, n)
1087-
for i := 0; i < len(msgs); i++ {
1088-
i := i
1089-
_, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
1090-
id, err := rd.ReadString()
1091-
if err != nil {
1092-
return nil, err
1093-
}
1084+
func readXMessage(rd *proto.Reader) (XMessage, error) {
1085+
n, err := rd.ReadArrayLen()
1086+
if err != nil {
1087+
return XMessage{}, err
1088+
}
1089+
if n != 2 {
1090+
return XMessage{}, fmt.Errorf("got %d, wanted 2", n)
1091+
}
10941092

1095-
var values map[string]interface{}
1093+
id, err := rd.ReadString()
1094+
if err != nil {
1095+
return XMessage{}, err
1096+
}
10961097

1097-
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
1098-
if err != nil {
1099-
if err != proto.Nil {
1100-
return nil, err
1101-
}
1102-
} else {
1103-
values = v.(map[string]interface{})
1104-
}
1098+
var values map[string]interface{}
11051099

1106-
msgs[i] = XMessage{
1107-
ID: id,
1108-
Values: values,
1109-
}
1110-
return nil, nil
1111-
})
1100+
v, err := rd.ReadArrayReply(stringInterfaceMapParser)
1101+
if err != nil {
1102+
if err != proto.Nil {
1103+
return XMessage{}, err
1104+
}
1105+
} else {
1106+
values = v.(map[string]interface{})
1107+
}
1108+
1109+
return XMessage{
1110+
ID: id,
1111+
Values: values,
1112+
}, nil
1113+
}
1114+
1115+
// xMessageSliceParser implements proto.MultiBulkParse.
1116+
func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
1117+
msgs := make([]XMessage, n)
1118+
for i := int64(0); i < n; i++ {
1119+
var err error
1120+
msgs[i], err = readXMessage(rd)
11121121
if err != nil {
11131122
return nil, err
11141123
}
@@ -1493,6 +1502,92 @@ func readXGroupInfo(rd *proto.Reader) (XInfoGroup, error) {
14931502

14941503
//------------------------------------------------------------------------------
14951504

1505+
type XInfoStreamCmd struct {
1506+
baseCmd
1507+
val *XInfoStream
1508+
}
1509+
1510+
type XInfoStream struct {
1511+
Length int64
1512+
RadixTreeKeys int64
1513+
RadixTreeNodes int64
1514+
Groups int64
1515+
LastGeneratedID string
1516+
FirstEntry XMessage
1517+
LastEntry XMessage
1518+
}
1519+
1520+
var _ Cmder = (*XInfoStreamCmd)(nil)
1521+
1522+
func NewXInfoStreamCmd(ctx context.Context, stream string) *XInfoStreamCmd {
1523+
return &XInfoStreamCmd{
1524+
baseCmd: baseCmd{
1525+
ctx: ctx,
1526+
args: []interface{}{"xinfo", "stream", stream},
1527+
},
1528+
}
1529+
}
1530+
1531+
func (cmd *XInfoStreamCmd) Val() *XInfoStream {
1532+
return cmd.val
1533+
}
1534+
1535+
func (cmd *XInfoStreamCmd) Result() (*XInfoStream, error) {
1536+
return cmd.val, cmd.err
1537+
}
1538+
1539+
func (cmd *XInfoStreamCmd) String() string {
1540+
return cmdString(cmd, cmd.val)
1541+
}
1542+
1543+
func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
1544+
v, err := rd.ReadReply(xStreamInfoParser)
1545+
if err != nil {
1546+
return err
1547+
}
1548+
cmd.val = v.(*XInfoStream)
1549+
return nil
1550+
}
1551+
1552+
func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
1553+
if n != 14 {
1554+
return nil, fmt.Errorf("redis: got %d elements in XINFO STREAM reply,"+
1555+
"wanted 14", n)
1556+
}
1557+
var info XInfoStream
1558+
for i := 0; i < 7; i++ {
1559+
key, err := rd.ReadString()
1560+
if err != nil {
1561+
return nil, err
1562+
}
1563+
switch key {
1564+
case "length":
1565+
info.Length, err = rd.ReadIntReply()
1566+
case "radix-tree-keys":
1567+
info.RadixTreeKeys, err = rd.ReadIntReply()
1568+
case "radix-tree-nodes":
1569+
info.RadixTreeNodes, err = rd.ReadIntReply()
1570+
case "groups":
1571+
info.Groups, err = rd.ReadIntReply()
1572+
case "last-generated-id":
1573+
info.LastGeneratedID, err = rd.ReadString()
1574+
case "first-entry":
1575+
info.FirstEntry, err = readXMessage(rd)
1576+
case "last-entry":
1577+
info.LastEntry, err = readXMessage(rd)
1578+
default:
1579+
return nil, fmt.Errorf("redis: unexpected content %s "+
1580+
"in XINFO STREAM reply", key)
1581+
}
1582+
if err != nil {
1583+
return nil, err
1584+
}
1585+
}
1586+
return &info, nil
1587+
}
1588+
1589+
//------------------------------------------------------------------------------
1590+
14961591
type ZSliceCmd struct {
14971592
baseCmd
14981593

commands.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ type Cmdable interface {
219219
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
220220
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
221221
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
222+
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
222223

223224
BZPopMax(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
224225
BZPopMin(ctx context.Context, timeout time.Duration, keys ...string) *ZWithKeyCmd
@@ -1699,6 +1700,12 @@ func (c cmdable) XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd {
16991700
return cmd
17001701
}
17011702

1703+
func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd {
1704+
cmd := NewXInfoStreamCmd(ctx, key)
1705+
_ = c(ctx, cmd)
1706+
return cmd
1707+
}
1708+
17021709
//------------------------------------------------------------------------------
17031710

17041711
// Z represents sorted set member.

0 commit comments

Comments
 (0)