Skip to content

Commit 43ec146

Browse files
authored
xgroup/xadd/xtrim supports new options (#1787)
* support cmd option XGROUP CREATECONSUMER XTRIM MINID LIMIT XADD NOMKSTREAM MINID LIMIT Signed-off-by: monkey <[email protected]> * add XAddArgs.Approx doc Signed-off-by: monkey92t <[email protected]>
1 parent 14d82a2 commit 43ec146

File tree

2 files changed

+157
-16
lines changed

2 files changed

+157
-16
lines changed

commands.go

Lines changed: 103 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,22 @@ type Cmdable interface {
226226
XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd
227227
XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd
228228
XGroupDestroy(ctx context.Context, stream, group string) *IntCmd
229+
XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
229230
XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd
230231
XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd
231232
XAck(ctx context.Context, stream, group string, ids ...string) *IntCmd
232233
XPending(ctx context.Context, stream, group string) *XPendingCmd
233234
XPendingExt(ctx context.Context, a *XPendingExtArgs) *XPendingExtCmd
234235
XClaim(ctx context.Context, a *XClaimArgs) *XMessageSliceCmd
235236
XClaimJustID(ctx context.Context, a *XClaimArgs) *StringSliceCmd
237+
238+
// TODO: XTrim and XTrimApprox remove in v9.
236239
XTrim(ctx context.Context, key string, maxLen int64) *IntCmd
237240
XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd
241+
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
242+
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
243+
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
244+
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
238245
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
239246
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
240247
XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd
@@ -1621,22 +1628,50 @@ func (c cmdable) SUnionStore(ctx context.Context, destination string, keys ...st
16211628
// - XAddArgs.Values = map[string]interface{}{"key1": "value1", "key2": "value2"}
16221629
//
16231630
// Note that map will not preserve the order of key-value pairs.
1631+
// MaxLen/MaxLenApprox and MinID are in conflict, only one of them can be used.
16241632
type XAddArgs struct {
1625-
Stream string
1626-
MaxLen int64 // MAXLEN N
1633+
Stream string
1634+
NoMkStream bool
1635+
MaxLen int64 // MAXLEN N
1636+
1637+
// Deprecated: use MaxLen+Approx, remove in v9.
16271638
MaxLenApprox int64 // MAXLEN ~ N
1628-
ID string
1629-
Values interface{}
1639+
1640+
MinID string
1641+
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
1642+
Approx bool
1643+
Limit int64
1644+
ID string
1645+
Values interface{}
16301646
}
16311647

1648+
// XAdd a.Limit has a bug, please confirm it and use it.
1649+
// issue: https://github.com/redis/redis/issues/9046
16321650
func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
1633-
args := make([]interface{}, 0, 8)
1634-
args = append(args, "xadd")
1635-
args = append(args, a.Stream)
1636-
if a.MaxLen > 0 {
1637-
args = append(args, "maxlen", a.MaxLen)
1638-
} else if a.MaxLenApprox > 0 {
1651+
args := make([]interface{}, 0, 11)
1652+
args = append(args, "xadd", a.Stream)
1653+
if a.NoMkStream {
1654+
args = append(args, "nomkstream")
1655+
}
1656+
switch {
1657+
case a.MaxLen > 0:
1658+
if a.Approx {
1659+
args = append(args, "maxlen", "~", a.MaxLen)
1660+
} else {
1661+
args = append(args, "maxlen", a.MaxLen)
1662+
}
1663+
case a.MaxLenApprox > 0:
1664+
// TODO remove in v9.
16391665
args = append(args, "maxlen", "~", a.MaxLenApprox)
1666+
case a.MinID != "":
1667+
if a.Approx {
1668+
args = append(args, "minid", "~", a.MinID)
1669+
} else {
1670+
args = append(args, "minid", a.MinID)
1671+
}
1672+
}
1673+
if a.Limit > 0 {
1674+
args = append(args, "limit", a.Limit)
16401675
}
16411676
if a.ID != "" {
16421677
args = append(args, a.ID)
@@ -1757,6 +1792,12 @@ func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCm
17571792
return cmd
17581793
}
17591794

1795+
func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
1796+
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
1797+
_ = c(ctx, cmd)
1798+
return cmd
1799+
}
1800+
17601801
func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
17611802
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
17621803
_ = c(ctx, cmd)
@@ -1914,16 +1955,63 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
19141955
return args
19151956
}
19161957

1917-
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
1918-
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", maxLen)
1958+
// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
1959+
// example:
1960+
// XTRIM key MAXLEN/MINID threshold LIMIT limit.
1961+
// XTRIM key MAXLEN/MINID ~ threshold LIMIT limit.
1962+
// The redis-server version is lower than 6.2, please set limit to 0.
1963+
func (c cmdable) xTrim(
1964+
ctx context.Context, key, strategy string,
1965+
approx bool, threshold interface{}, limit int64,
1966+
) *IntCmd {
1967+
args := make([]interface{}, 0, 7)
1968+
args = append(args, "xtrim", key, strategy)
1969+
if approx {
1970+
args = append(args, "~")
1971+
}
1972+
args = append(args, threshold)
1973+
if limit > 0 {
1974+
args = append(args, "limit", limit)
1975+
}
1976+
cmd := NewIntCmd(ctx, args...)
19191977
_ = c(ctx, cmd)
19201978
return cmd
19211979
}
19221980

1981+
// Deprecated: use XTrimMaxLen, remove in v9.
1982+
func (c cmdable) XTrim(ctx context.Context, key string, maxLen int64) *IntCmd {
1983+
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
1984+
}
1985+
1986+
// Deprecated: use XTrimMaxLenApprox, remove in v9.
19231987
func (c cmdable) XTrimApprox(ctx context.Context, key string, maxLen int64) *IntCmd {
1924-
cmd := NewIntCmd(ctx, "xtrim", key, "maxlen", "~", maxLen)
1925-
_ = c(ctx, cmd)
1926-
return cmd
1988+
return c.xTrim(ctx, key, "maxlen", true, maxLen, 0)
1989+
}
1990+
1991+
// XTrimMaxLen No `~` rules are used, `limit` cannot be used.
1992+
// cmd: XTRIM key MAXLEN maxLen
1993+
func (c cmdable) XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd {
1994+
return c.xTrim(ctx, key, "maxlen", false, maxLen, 0)
1995+
}
1996+
1997+
// XTrimMaxLenApprox LIMIT has a bug, please confirm it and use it.
1998+
// issue: https://github.com/redis/redis/issues/9046
1999+
// cmd: XTRIM key MAXLEN ~ maxLen LIMIT limit
2000+
func (c cmdable) XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd {
2001+
return c.xTrim(ctx, key, "maxlen", true, maxLen, limit)
2002+
}
2003+
2004+
// XTrimMinID No `~` rules are used, `limit` cannot be used.
2005+
// cmd: XTRIM key MINID minID
2006+
func (c cmdable) XTrimMinID(ctx context.Context, key string, minID string) *IntCmd {
2007+
return c.xTrim(ctx, key, "minid", false, minID, 0)
2008+
}
2009+
2010+
// XTrimMinIDApprox LIMIT has a bug, please confirm it and use it.
2011+
// issue: https://github.com/redis/redis/issues/9046
2012+
// cmd: XTRIM key MINID ~ minID LIMIT limit
2013+
func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd {
2014+
return c.xTrim(ctx, key, "minid", true, minID, limit)
19272015
}
19282016

19292017
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {

commands_test.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4104,18 +4104,47 @@ var _ = Describe("Commands", func() {
41044104
Expect(id).To(Equal("3-0"))
41054105
})
41064106

4107+
// TODO remove in v9.
41074108
It("should XTrim", func() {
41084109
n, err := client.XTrim(ctx, "stream", 0).Result()
41094110
Expect(err).NotTo(HaveOccurred())
41104111
Expect(n).To(Equal(int64(3)))
41114112
})
41124113

4114+
// TODO remove in v9.
41134115
It("should XTrimApprox", func() {
41144116
n, err := client.XTrimApprox(ctx, "stream", 0).Result()
41154117
Expect(err).NotTo(HaveOccurred())
41164118
Expect(n).To(Equal(int64(3)))
41174119
})
41184120

4121+
// TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter.
4122+
// TODO Don't test it for now.
4123+
// TODO link: https://github.com/redis/redis/issues/9046
4124+
It("should XTrimMaxLen", func() {
4125+
n, err := client.XTrimMaxLen(ctx, "stream", 0).Result()
4126+
Expect(err).NotTo(HaveOccurred())
4127+
Expect(n).To(Equal(int64(3)))
4128+
})
4129+
4130+
It("should XTrimMaxLenApprox", func() {
4131+
n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result()
4132+
Expect(err).NotTo(HaveOccurred())
4133+
Expect(n).To(Equal(int64(3)))
4134+
})
4135+
4136+
It("should XTrimMinID", func() {
4137+
n, err := client.XTrimMinID(ctx, "stream", "4-0").Result()
4138+
Expect(err).NotTo(HaveOccurred())
4139+
Expect(n).To(Equal(int64(3)))
4140+
})
4141+
4142+
It("should XTrimMinIDApprox", func() {
4143+
n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result()
4144+
Expect(err).NotTo(HaveOccurred())
4145+
Expect(n).To(Equal(int64(3)))
4146+
})
4147+
41194148
It("should XAdd", func() {
41204149
id, err := client.XAdd(ctx, &redis.XAddArgs{
41214150
Stream: "stream",
@@ -4133,6 +4162,9 @@ var _ = Describe("Commands", func() {
41334162
}))
41344163
})
41354164

4165+
// TODO XAdd There is a bug in the limit parameter.
4166+
// TODO Don't test it for now.
4167+
// TODO link: https://github.com/redis/redis/issues/9046
41364168
It("should XAdd with MaxLen", func() {
41374169
id, err := client.XAdd(ctx, &redis.XAddArgs{
41384170
Stream: "stream",
@@ -4148,6 +4180,21 @@ var _ = Describe("Commands", func() {
41484180
}))
41494181
})
41504182

4183+
It("should XAdd with MinID", func() {
4184+
id, err := client.XAdd(ctx, &redis.XAddArgs{
4185+
Stream: "stream",
4186+
MinID: "5-0",
4187+
ID: "4-0",
4188+
Values: map[string]interface{}{"quatro": "quatre"},
4189+
}).Result()
4190+
Expect(err).NotTo(HaveOccurred())
4191+
Expect(id).To(Equal("4-0"))
4192+
4193+
vals, err := client.XRange(ctx, "stream", "-", "+").Result()
4194+
Expect(err).NotTo(HaveOccurred())
4195+
Expect(vals).To(HaveLen(0))
4196+
})
4197+
41514198
It("should XDel", func() {
41524199
n, err := client.XDel(ctx, "stream", "1-0", "2-0", "3-0").Result()
41534200
Expect(err).NotTo(HaveOccurred())
@@ -4380,8 +4427,14 @@ var _ = Describe("Commands", func() {
43804427
infoExt, err = client.XPendingExt(ctx, args).Result()
43814428
Expect(err).NotTo(HaveOccurred())
43824429
Expect(infoExt).To(HaveLen(0))
4430+
})
4431+
4432+
It("should XGroup Create Delete Consumer", func() {
4433+
n, err := client.XGroupCreateConsumer(ctx, "stream", "group", "c1").Result()
4434+
Expect(err).NotTo(HaveOccurred())
4435+
Expect(n).To(Equal(int64(1)))
43834436

4384-
n, err := client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result()
4437+
n, err = client.XGroupDelConsumer(ctx, "stream", "group", "consumer").Result()
43854438
Expect(err).NotTo(HaveOccurred())
43864439
Expect(n).To(Equal(int64(3)))
43874440
})

0 commit comments

Comments
 (0)