Skip to content

Commit fe9bc12

Browse files
monkey92tparvez0vmihailenco
authored
sync master to v9 (#1760)
* Added missing idle args in XPendingExtArgs (#1750) Added missing idle args in XPendingExtArgs * fix #1754 (#1756) * Replace go-pg with bun * fix #1755 Signed-off-by: monkey <[email protected]> * fix read data Signed-off-by: monkey <[email protected]> * fix #1758 (#1759) fix #1758 Co-authored-by: Parvez <[email protected]> Co-authored-by: Vladimir Mihailenco <[email protected]>
1 parent d42071c commit fe9bc12

File tree

5 files changed

+481
-7
lines changed

5 files changed

+481
-7
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: Go
22

33
on:
44
push:
5-
branches: [master]
5+
branches: [master, v9]
66
pull_request:
77
branches: [master, v9]
88

.github/workflows/golangci-lint.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ on:
77
branches:
88
- master
99
- main
10+
- v9
1011
pull_request:
1112

1213
jobs:

command.go

Lines changed: 278 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1520,7 +1520,7 @@ type XInfoConsumer struct {
15201520
Idle int64
15211521
}
15221522

1523-
var _ Cmder = (*XInfoGroupsCmd)(nil)
1523+
var _ Cmder = (*XInfoConsumersCmd)(nil)
15241524

15251525
func NewXInfoConsumersCmd(ctx context.Context, stream string, group string) *XInfoConsumersCmd {
15261526
return &XInfoConsumersCmd{
@@ -1722,8 +1722,14 @@ func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
17221722
cmd.val.LastGeneratedID, err = rd.ReadString()
17231723
case "first-entry":
17241724
cmd.val.FirstEntry, err = readXMessage(rd)
1725+
if err == Nil {
1726+
err = nil
1727+
}
17251728
case "last-entry":
17261729
cmd.val.LastEntry, err = readXMessage(rd)
1730+
if err == Nil {
1731+
err = nil
1732+
}
17271733
default:
17281734
return fmt.Errorf("redis: unexpected content %s "+
17291735
"in XINFO STREAM reply", key)
@@ -1737,6 +1743,277 @@ func (cmd *XInfoStreamCmd) readReply(rd *proto.Reader) error {
17371743

17381744
//------------------------------------------------------------------------------
17391745

1746+
type XInfoStreamFullCmd struct {
1747+
baseCmd
1748+
val *XInfoStreamFull
1749+
}
1750+
1751+
type XInfoStreamFull struct {
1752+
Length int64
1753+
RadixTreeKeys int64
1754+
RadixTreeNodes int64
1755+
LastGeneratedID string
1756+
Entries []XMessage
1757+
Groups []XInfoStreamGroup
1758+
}
1759+
1760+
type XInfoStreamGroup struct {
1761+
Name string
1762+
LastDeliveredID string
1763+
PelCount int64
1764+
Pending []XInfoStreamGroupPending
1765+
Consumers []XInfoStreamConsumer
1766+
}
1767+
1768+
type XInfoStreamGroupPending struct {
1769+
ID string
1770+
Consumer string
1771+
DeliveryTime time.Time
1772+
DeliveryCount int64
1773+
}
1774+
1775+
type XInfoStreamConsumer struct {
1776+
Name string
1777+
SeenTime time.Time
1778+
PelCount int64
1779+
Pending []XInfoStreamConsumerPending
1780+
}
1781+
1782+
type XInfoStreamConsumerPending struct {
1783+
ID string
1784+
DeliveryTime time.Time
1785+
DeliveryCount int64
1786+
}
1787+
1788+
var _ Cmder = (*XInfoStreamFullCmd)(nil)
1789+
1790+
func NewXInfoStreamFullCmd(ctx context.Context, args ...interface{}) *XInfoStreamFullCmd {
1791+
return &XInfoStreamFullCmd{
1792+
baseCmd: baseCmd{
1793+
ctx: ctx,
1794+
args: args,
1795+
},
1796+
}
1797+
}
1798+
1799+
func (cmd *XInfoStreamFullCmd) Val() *XInfoStreamFull {
1800+
return cmd.val
1801+
}
1802+
1803+
func (cmd *XInfoStreamFullCmd) Result() (*XInfoStreamFull, error) {
1804+
return cmd.val, cmd.err
1805+
}
1806+
1807+
func (cmd *XInfoStreamFullCmd) String() string {
1808+
return cmdString(cmd, cmd.val)
1809+
}
1810+
1811+
func (cmd *XInfoStreamFullCmd) readReply(rd *proto.Reader) error {
1812+
if err := rd.ReadFixedMapLen(6); err != nil {
1813+
return err
1814+
}
1815+
1816+
cmd.val = &XInfoStreamFull{}
1817+
1818+
for i := 0; i < 6; i++ {
1819+
key, err := rd.ReadString()
1820+
if err != nil {
1821+
return err
1822+
}
1823+
1824+
switch key {
1825+
case "length":
1826+
cmd.val.Length, err = rd.ReadInt()
1827+
case "radix-tree-keys":
1828+
cmd.val.RadixTreeKeys, err = rd.ReadInt()
1829+
case "radix-tree-nodes":
1830+
cmd.val.RadixTreeNodes, err = rd.ReadInt()
1831+
case "last-generated-id":
1832+
cmd.val.LastGeneratedID, err = rd.ReadString()
1833+
case "entries":
1834+
cmd.val.Entries, err = readXMessageSlice(rd)
1835+
case "groups":
1836+
cmd.val.Groups, err = readStreamGroups(rd)
1837+
default:
1838+
return fmt.Errorf("redis: unexpected content %s "+
1839+
"in XINFO STREAM FULL reply", key)
1840+
}
1841+
if err != nil {
1842+
return err
1843+
}
1844+
}
1845+
return nil
1846+
}
1847+
1848+
func readStreamGroups(rd *proto.Reader) ([]XInfoStreamGroup, error) {
1849+
n, err := rd.ReadArrayLen()
1850+
if err != nil {
1851+
return nil, err
1852+
}
1853+
groups := make([]XInfoStreamGroup, 0, n)
1854+
for i := 0; i < n; i++ {
1855+
if err = rd.ReadFixedMapLen(5); err != nil {
1856+
return nil, err
1857+
}
1858+
1859+
group := XInfoStreamGroup{}
1860+
1861+
for f := 0; f < 5; f++ {
1862+
key, err := rd.ReadString()
1863+
if err != nil {
1864+
return nil, err
1865+
}
1866+
1867+
switch key {
1868+
case "name":
1869+
group.Name, err = rd.ReadString()
1870+
case "last-delivered-id":
1871+
group.LastDeliveredID, err = rd.ReadString()
1872+
case "pel-count":
1873+
group.PelCount, err = rd.ReadInt()
1874+
case "pending":
1875+
group.Pending, err = readXInfoStreamGroupPending(rd)
1876+
case "consumers":
1877+
group.Consumers, err = readXInfoStreamConsumers(rd)
1878+
default:
1879+
return nil, fmt.Errorf("redis: unexpected content %s "+
1880+
"in XINFO STREAM FULL reply", key)
1881+
}
1882+
1883+
if err != nil {
1884+
return nil, err
1885+
}
1886+
}
1887+
1888+
groups = append(groups, group)
1889+
}
1890+
1891+
return groups, nil
1892+
}
1893+
1894+
func readXInfoStreamGroupPending(rd *proto.Reader) ([]XInfoStreamGroupPending, error) {
1895+
n, err := rd.ReadArrayLen()
1896+
if err != nil {
1897+
return nil, err
1898+
}
1899+
1900+
pending := make([]XInfoStreamGroupPending, 0, n)
1901+
1902+
for i := 0; i < n; i++ {
1903+
if err = rd.ReadFixedArrayLen(4); err != nil {
1904+
return nil, err
1905+
}
1906+
1907+
p := XInfoStreamGroupPending{}
1908+
1909+
p.ID, err = rd.ReadString()
1910+
if err != nil {
1911+
return nil, err
1912+
}
1913+
1914+
p.Consumer, err = rd.ReadString()
1915+
if err != nil {
1916+
return nil, err
1917+
}
1918+
1919+
delivery, err := rd.ReadInt()
1920+
if err != nil {
1921+
return nil, err
1922+
}
1923+
p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond))
1924+
1925+
p.DeliveryCount, err = rd.ReadInt()
1926+
if err != nil {
1927+
return nil, err
1928+
}
1929+
1930+
pending = append(pending, p)
1931+
}
1932+
1933+
return pending, nil
1934+
}
1935+
1936+
func readXInfoStreamConsumers(rd *proto.Reader) ([]XInfoStreamConsumer, error) {
1937+
n, err := rd.ReadArrayLen()
1938+
if err != nil {
1939+
return nil, err
1940+
}
1941+
1942+
consumers := make([]XInfoStreamConsumer, 0, n)
1943+
1944+
for i := 0; i < n; i++ {
1945+
if err = rd.ReadFixedMapLen(4); err != nil {
1946+
return nil, err
1947+
}
1948+
1949+
c := XInfoStreamConsumer{}
1950+
1951+
for f := 0; f < 4; f++ {
1952+
cKey, err := rd.ReadString()
1953+
if err != nil {
1954+
return nil, err
1955+
}
1956+
1957+
switch cKey {
1958+
case "name":
1959+
c.Name, err = rd.ReadString()
1960+
case "seen-time":
1961+
seen, err := rd.ReadInt()
1962+
if err != nil {
1963+
return nil, err
1964+
}
1965+
c.SeenTime = time.Unix(seen/1000, seen%1000*int64(time.Millisecond))
1966+
case "pel-count":
1967+
c.PelCount, err = rd.ReadInt()
1968+
case "pending":
1969+
pendingNumber, err := rd.ReadArrayLen()
1970+
if err != nil {
1971+
return nil, err
1972+
}
1973+
1974+
c.Pending = make([]XInfoStreamConsumerPending, 0, pendingNumber)
1975+
1976+
for pn := 0; pn < pendingNumber; pn++ {
1977+
if err = rd.ReadFixedArrayLen(3); err != nil {
1978+
return nil, err
1979+
}
1980+
1981+
p := XInfoStreamConsumerPending{}
1982+
1983+
p.ID, err = rd.ReadString()
1984+
if err != nil {
1985+
return nil, err
1986+
}
1987+
1988+
delivery, err := rd.ReadInt()
1989+
if err != nil {
1990+
return nil, err
1991+
}
1992+
p.DeliveryTime = time.Unix(delivery/1000, delivery%1000*int64(time.Millisecond))
1993+
1994+
p.DeliveryCount, err = rd.ReadInt()
1995+
if err != nil {
1996+
return nil, err
1997+
}
1998+
1999+
c.Pending = append(c.Pending, p)
2000+
}
2001+
default:
2002+
return nil, fmt.Errorf("redis: unexpected content %s "+
2003+
"in XINFO STREAM FULL reply", cKey)
2004+
}
2005+
if err != nil {
2006+
return nil, err
2007+
}
2008+
}
2009+
consumers = append(consumers, c)
2010+
}
2011+
2012+
return consumers, nil
2013+
}
2014+
2015+
//------------------------------------------------------------------------------
2016+
17402017
type ZSliceCmd struct {
17412018
baseCmd
17422019

commands.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ type Cmdable interface {
180180
LInsertAfter(ctx context.Context, key string, pivot, value interface{}) *IntCmd
181181
LLen(ctx context.Context, key string) *IntCmd
182182
LPop(ctx context.Context, key string) *StringCmd
183+
LPopCount(ctx context.Context, key string, count int) *StringSliceCmd
183184
LPos(ctx context.Context, key string, value string, args LPosArgs) *IntCmd
184185
LPosCount(ctx context.Context, key string, value string, count int64, args LPosArgs) *IntSliceCmd
185186
LPush(ctx context.Context, key string, values ...interface{}) *IntCmd
@@ -1336,6 +1337,12 @@ func (c cmdable) LPop(ctx context.Context, key string) *StringCmd {
13361337
return cmd
13371338
}
13381339

1340+
func (c cmdable) LPopCount(ctx context.Context, key string, count int) *StringSliceCmd {
1341+
cmd := NewStringSliceCmd(ctx, "lpop", key, count)
1342+
_ = c(ctx, cmd)
1343+
return cmd
1344+
}
1345+
13391346
type LPosArgs struct {
13401347
Rank, MaxLen int64
13411348
}
@@ -1833,15 +1840,20 @@ func (c cmdable) XPending(ctx context.Context, stream, group string) *XPendingCm
18331840
type XPendingExtArgs struct {
18341841
Stream string
18351842
Group string
1843+
Idle time.Duration
18361844
Start string
18371845
End string
18381846
Count int64
18391847
Consumer string
18401848
}
18411849

18421850
func (c cmdable) XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd {
1843-
args := make([]interface{}, 0, 7)
1844-
args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
1851+
args := make([]interface{}, 0, 9)
1852+
args = append(args, "xpending", a.Stream, a.Group)
1853+
if a.Idle != 0 {
1854+
args = append(args, "idle", formatMs(ctx, a.Idle))
1855+
}
1856+
args = append(args, a.Start, a.End, a.Count)
18451857
if a.Consumer != "" {
18461858
args = append(args, a.Consumer)
18471859
}
@@ -1916,6 +1928,19 @@ func (c cmdable) XInfoStream(ctx context.Context, key string) *XInfoStreamCmd {
19161928
return cmd
19171929
}
19181930

1931+
// XInfoStreamFull XINFO STREAM FULL [COUNT count]
1932+
// redis-server >= 6.0.
1933+
func (c cmdable) XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd {
1934+
args := make([]interface{}, 0, 6)
1935+
args = append(args, "xinfo", "stream", key, "full")
1936+
if count > 0 {
1937+
args = append(args, "count", count)
1938+
}
1939+
cmd := NewXInfoStreamFullCmd(ctx, args...)
1940+
_ = c(ctx, cmd)
1941+
return cmd
1942+
}
1943+
19191944
//------------------------------------------------------------------------------
19201945

19211946
// Z represents sorted set member.

0 commit comments

Comments
 (0)