@@ -1512,7 +1512,7 @@ type XInfoConsumer struct {
1512
1512
Idle int64
1513
1513
}
1514
1514
1515
- var _ Cmder = (* XInfoGroupsCmd )(nil )
1515
+ var _ Cmder = (* XInfoConsumersCmd )(nil )
1516
1516
1517
1517
func NewXInfoConsumersCmd (ctx context.Context , stream string , group string ) * XInfoConsumersCmd {
1518
1518
return & XInfoConsumersCmd {
@@ -1784,6 +1784,299 @@ func xStreamInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
1784
1784
1785
1785
//------------------------------------------------------------------------------
1786
1786
1787
+ type XInfoStreamFullCmd struct {
1788
+ baseCmd
1789
+ val * XInfoStreamFull
1790
+ }
1791
+
1792
+ type XInfoStreamFull struct {
1793
+ Length int64
1794
+ RadixTreeKeys int64
1795
+ RadixTreeNodes int64
1796
+ LastGeneratedID string
1797
+ Entries []XMessage
1798
+ Groups []XInfoStreamGroup
1799
+ }
1800
+
1801
+ type XInfoStreamGroup struct {
1802
+ Name string
1803
+ LastDeliveredID string
1804
+ PelCount int64
1805
+ Pending []XInfoStreamGroupPending
1806
+ Consumers []XInfoStreamConsumer
1807
+ }
1808
+
1809
+ type XInfoStreamGroupPending struct {
1810
+ ID string
1811
+ Consumer string
1812
+ DeliveryTime time.Time
1813
+ DeliveryCount int64
1814
+ }
1815
+
1816
+ type XInfoStreamConsumer struct {
1817
+ Name string
1818
+ SeenTime time.Time
1819
+ PelCount int64
1820
+ Pending []XInfoStreamConsumerPending
1821
+ }
1822
+
1823
+ type XInfoStreamConsumerPending struct {
1824
+ ID string
1825
+ DeliveryTime time.Time
1826
+ DeliveryCount int64
1827
+ }
1828
+
1829
+ var _ Cmder = (* XInfoStreamFullCmd )(nil )
1830
+
1831
+ func NewXInfoStreamFullCmd (ctx context.Context , args ... interface {}) * XInfoStreamFullCmd {
1832
+ return & XInfoStreamFullCmd {
1833
+ baseCmd : baseCmd {
1834
+ ctx : ctx ,
1835
+ args : args ,
1836
+ },
1837
+ }
1838
+ }
1839
+
1840
+ func (cmd * XInfoStreamFullCmd ) Val () * XInfoStreamFull {
1841
+ return cmd .val
1842
+ }
1843
+
1844
+ func (cmd * XInfoStreamFullCmd ) Result () (* XInfoStreamFull , error ) {
1845
+ return cmd .val , cmd .err
1846
+ }
1847
+
1848
+ func (cmd * XInfoStreamFullCmd ) String () string {
1849
+ return cmdString (cmd , cmd .val )
1850
+ }
1851
+
1852
+ func (cmd * XInfoStreamFullCmd ) readReply (rd * proto.Reader ) error {
1853
+ n , err := rd .ReadArrayLen ()
1854
+ if err != nil {
1855
+ return err
1856
+ }
1857
+ if n != 12 {
1858
+ return fmt .Errorf ("redis: got %d elements in XINFO STREAM FULL reply," +
1859
+ "wanted 12" , n )
1860
+ }
1861
+
1862
+ cmd .val = & XInfoStreamFull {}
1863
+
1864
+ for i := 0 ; i < 6 ; i ++ {
1865
+ key , err := rd .ReadString ()
1866
+ if err != nil {
1867
+ return err
1868
+ }
1869
+
1870
+ switch key {
1871
+ case "length" :
1872
+ cmd .val .Length , err = rd .ReadIntReply ()
1873
+ case "radix-tree-keys" :
1874
+ cmd .val .RadixTreeKeys , err = rd .ReadIntReply ()
1875
+ case "radix-tree-nodes" :
1876
+ cmd .val .RadixTreeNodes , err = rd .ReadIntReply ()
1877
+ case "last-generated-id" :
1878
+ cmd .val .LastGeneratedID , err = rd .ReadString ()
1879
+ case "entries" :
1880
+ cmd .val .Entries , err = readXMessageSlice (rd )
1881
+ case "groups" :
1882
+ groups , err := rd .ReadReply (readStreamGroups )
1883
+ if err != nil {
1884
+ return err
1885
+ }
1886
+ cmd .val .Groups = groups .([]XInfoStreamGroup )
1887
+ default :
1888
+ return fmt .Errorf ("redis: unexpected content %s " +
1889
+ "in XINFO STREAM reply" , key )
1890
+ }
1891
+ if err != nil {
1892
+ return err
1893
+ }
1894
+ }
1895
+ return nil
1896
+ }
1897
+
1898
+ func readStreamGroups (rd * proto.Reader , n int64 ) (interface {}, error ) {
1899
+ groups := make ([]XInfoStreamGroup , 0 , n )
1900
+ for i := int64 (0 ); i < n ; i ++ {
1901
+ nn , err := rd .ReadArrayLen ()
1902
+ if err != nil {
1903
+ return nil , err
1904
+ }
1905
+ if nn != 10 {
1906
+ return nil , fmt .Errorf ("redis: got %d elements in XINFO STREAM FULL reply," +
1907
+ "wanted 10" , nn )
1908
+ }
1909
+ key , err := rd .ReadString ()
1910
+ if err != nil {
1911
+ return nil , err
1912
+ }
1913
+
1914
+ group := XInfoStreamGroup {}
1915
+
1916
+ switch key {
1917
+ case "name" :
1918
+ group .Name , err = rd .ReadString ()
1919
+ case "last-delivered-id" :
1920
+ group .LastDeliveredID , err = rd .ReadString ()
1921
+ case "pel-count" :
1922
+ group .PelCount , err = rd .ReadIntReply ()
1923
+ case "pending" :
1924
+ group .Pending , err = readXInfoStreamGroupPending (rd )
1925
+ case "consumers" :
1926
+ group .Consumers , err = readXInfoStreamConsumers (rd )
1927
+ default :
1928
+ return nil , fmt .Errorf ("redis: unexpected content %s " +
1929
+ "in XINFO STREAM reply" , key )
1930
+ }
1931
+
1932
+ if err != nil {
1933
+ return nil , err
1934
+ }
1935
+
1936
+ groups = append (groups , group )
1937
+ }
1938
+
1939
+ return groups , nil
1940
+ }
1941
+
1942
+ func readXInfoStreamGroupPending (rd * proto.Reader ) ([]XInfoStreamGroupPending , error ) {
1943
+ n , err := rd .ReadArrayLen ()
1944
+ if err != nil {
1945
+ return nil , err
1946
+ }
1947
+
1948
+ pending := make ([]XInfoStreamGroupPending , 0 , n )
1949
+
1950
+ for i := 0 ; i < n ; i ++ {
1951
+ nn , err := rd .ReadArrayLen ()
1952
+ if err != nil {
1953
+ return nil , err
1954
+ }
1955
+ if nn != 4 {
1956
+ return nil , fmt .Errorf ("redis: got %d elements in XINFO STREAM FULL reply," +
1957
+ "wanted 4" , nn )
1958
+ }
1959
+
1960
+ p := XInfoStreamGroupPending {}
1961
+
1962
+ p .ID , err = rd .ReadString ()
1963
+ if err != nil {
1964
+ return nil , err
1965
+ }
1966
+
1967
+ p .Consumer , err = rd .ReadString ()
1968
+ if err != nil {
1969
+ return nil , err
1970
+ }
1971
+
1972
+ delivery , err := rd .ReadIntReply ()
1973
+ if err != nil {
1974
+ return nil , err
1975
+ }
1976
+ p .DeliveryTime = time .Unix (delivery / 1000 , delivery % 1000 * int64 (time .Millisecond ))
1977
+
1978
+ p .DeliveryCount , err = rd .ReadIntReply ()
1979
+ if err != nil {
1980
+ return nil , err
1981
+ }
1982
+
1983
+ pending = append (pending , p )
1984
+ }
1985
+
1986
+ return pending , nil
1987
+ }
1988
+
1989
+ func readXInfoStreamConsumers (rd * proto.Reader ) ([]XInfoStreamConsumer , error ) {
1990
+ n , err := rd .ReadArrayLen ()
1991
+ if err != nil {
1992
+ return nil , err
1993
+ }
1994
+
1995
+ consumers := make ([]XInfoStreamConsumer , 0 , n )
1996
+
1997
+ for i := 0 ; i < n ; i ++ {
1998
+ nn , err := rd .ReadArrayLen ()
1999
+ if err != nil {
2000
+ return nil , err
2001
+ }
2002
+ if nn != 8 {
2003
+ return nil , fmt .Errorf ("redis: got %d elements in XINFO STREAM FULL reply," +
2004
+ "wanted 8" , nn )
2005
+ }
2006
+
2007
+ cKey , err := rd .ReadString ()
2008
+ if err != nil {
2009
+ return nil , err
2010
+ }
2011
+
2012
+ c := XInfoStreamConsumer {}
2013
+
2014
+ switch cKey {
2015
+ case "name" :
2016
+ c .Name , err = rd .ReadString ()
2017
+ case "seen-time" :
2018
+ seen , err := rd .ReadIntReply ()
2019
+ if err != nil {
2020
+ return nil , err
2021
+ }
2022
+ c .SeenTime = time .Unix (seen / 1000 , seen % 1000 * int64 (time .Millisecond ))
2023
+ case "pel-count" :
2024
+ c .PelCount , err = rd .ReadIntReply ()
2025
+ case "pending" :
2026
+ pendingNumber , err := rd .ReadArrayLen ()
2027
+ if err != nil {
2028
+ return nil , err
2029
+ }
2030
+
2031
+ c .Pending = make ([]XInfoStreamConsumerPending , 0 , pendingNumber )
2032
+
2033
+ for f := 0 ; f < pendingNumber ; f ++ {
2034
+ nn , err := rd .ReadArrayLen ()
2035
+ if err != nil {
2036
+ return nil , err
2037
+ }
2038
+ if nn != 3 {
2039
+ return nil , fmt .Errorf ("redis: got %d elements in XINFO STREAM reply," +
2040
+ "wanted 3" , nn )
2041
+ }
2042
+
2043
+ p := XInfoStreamConsumerPending {}
2044
+
2045
+ p .ID , err = rd .ReadString ()
2046
+ if err != nil {
2047
+ return nil , err
2048
+ }
2049
+
2050
+ delivery , err := rd .ReadIntReply ()
2051
+ if err != nil {
2052
+ return nil , err
2053
+ }
2054
+ p .DeliveryTime = time .Unix (delivery / 1000 , delivery % 1000 * int64 (time .Millisecond ))
2055
+
2056
+ p .DeliveryCount , err = rd .ReadIntReply ()
2057
+ if err != nil {
2058
+ return nil , err
2059
+ }
2060
+
2061
+ c .Pending = append (c .Pending , p )
2062
+ }
2063
+ default :
2064
+ return nil , fmt .Errorf ("redis: unexpected content %s " +
2065
+ "in XINFO STREAM reply" , cKey )
2066
+ }
2067
+
2068
+ if err != nil {
2069
+ return nil , err
2070
+ }
2071
+
2072
+ consumers = append (consumers , c )
2073
+ }
2074
+
2075
+ return consumers , nil
2076
+ }
2077
+
2078
+ //------------------------------------------------------------------------------
2079
+
1787
2080
type ZSliceCmd struct {
1788
2081
baseCmd
1789
2082
0 commit comments