Skip to content

Commit ca8e8ad

Browse files
Go: XGROUP CREATE.
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
1 parent 9995c23 commit ca8e8ad

File tree

4 files changed

+148
-0
lines changed

4 files changed

+148
-0
lines changed

go/api/base_client.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,3 +1995,64 @@ func (client *baseClient) XPendingWithOptions(
19951995
}
19961996
return handleXPendingDetailResponse(result)
19971997
}
1998+
1999+
// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
2000+
//
2001+
// See [valkey.io] for details.
2002+
//
2003+
// Parameters:
2004+
//
2005+
// key - The key of the stream.
2006+
// group - The newly created consumer group name.
2007+
// id - Stream entry ID that specifies the last delivered entry in the stream from the new
2008+
// group’s perspective. The special ID `"$"` can be used to specify the last entry in the stream.
2009+
//
2010+
// Return value:
2011+
//
2012+
// `"OK"`.
2013+
//
2014+
// Example:
2015+
//
2016+
// client.XGroupCreate("mystream", "mygroup", "0-0")
2017+
//
2018+
// [valkey.io]: https://valkey.io/commands/xgroup-create/
2019+
func (client *baseClient) XGroupCreate(key string, group string, id string) (string, error) {
2020+
return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions())
2021+
}
2022+
2023+
// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
2024+
//
2025+
// See [valkey.io] for details.
2026+
//
2027+
// Parameters:
2028+
//
2029+
// key - The key of the stream.
2030+
// group - The newly created consumer group name.
2031+
// id - Stream entry ID that specifies the last delivered entry in the stream from the new
2032+
// group's perspective. The special ID `"$"` can be used to specify the last entry in the stream.
2033+
// opts - The options for the command. See [options.XGroupCreateOptions] for details.
2034+
//
2035+
// Return value:
2036+
//
2037+
// `"OK"`.
2038+
//
2039+
// Example:
2040+
//
2041+
// opts := options.NewXGroupCreateOptions().SetMakeStream()
2042+
// client.XGroupCreateWithOptions("mystream", "mygroup", "0-0", opts)
2043+
//
2044+
// [valkey.io]: https://valkey.io/commands/xgroup-create/
2045+
func (client *baseClient) XGroupCreateWithOptions(
2046+
key string,
2047+
group string,
2048+
id string,
2049+
opts *options.XGroupCreateOptions,
2050+
) (string, error) {
2051+
optionArgs, _ := opts.ToArgs()
2052+
args := append([]string{key, group, id}, optionArgs...)
2053+
result, err := client.executeCommand(C.XGroupCreate, args)
2054+
if err != nil {
2055+
return defaultStringResponse, err
2056+
}
2057+
return handleStringResponse(result)
2058+
}

go/api/options/stream_options.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,43 @@ func (xpo *XPendingOptions) ToArgs() ([]string, error) {
203203

204204
return args, nil
205205
}
206+
207+
// Optional arguments for `XGroupCreate` in [StreamCommands]
208+
type XGroupCreateOptions struct {
209+
mkStream bool
210+
entriesRead int64
211+
}
212+
213+
// Create new empty `XGroupCreateOptions`
214+
func NewXGroupCreateOptions() *XGroupCreateOptions {
215+
return &XGroupCreateOptions{false, -1}
216+
}
217+
218+
// Once set and if the stream doesn't exist, creates a new stream with a length of `0`.
219+
func (xgco *XGroupCreateOptions) SetMakeStream() *XGroupCreateOptions {
220+
xgco.mkStream = true
221+
return xgco
222+
}
223+
224+
// A value representing the number of stream entries already read by the group.
225+
//
226+
// Since Valkey version 7.0.0.
227+
func (xgco *XGroupCreateOptions) SetEntriesRead(entriesRead int64) *XGroupCreateOptions {
228+
xgco.entriesRead = entriesRead
229+
return xgco
230+
}
231+
232+
func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) {
233+
var args []string
234+
235+
// if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime
236+
if xgco.mkStream {
237+
args = append(args, "MKSTREAM")
238+
}
239+
240+
if xgco.entriesRead > -1 {
241+
args = append(args, "ENTRIESREAD", utils.IntToString(xgco.entriesRead))
242+
}
243+
244+
return args, nil
245+
}

go/api/stream_commands.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,8 @@ type StreamCommands interface {
111111
XPending(key string, group string) (XPendingSummary, error)
112112

113113
XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error)
114+
115+
XGroupCreate(key string, group string, id string) (string, error)
116+
117+
XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)
114118
}

go/integTest/shared_commands_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5716,3 +5716,46 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
57165716
}
57175717
})
57185718
}
5719+
5720+
// TODO add XGroupDestroy tests there
5721+
func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() {
5722+
suite.runWithDefaultClients(func(client api.BaseClient) {
5723+
key := uuid.NewString()
5724+
group1 := uuid.NewString()
5725+
group2 := uuid.NewString()
5726+
id := "0-1"
5727+
5728+
// Stream not created results in error
5729+
_, err := client.XGroupCreate(key, group1, id)
5730+
assert.Error(suite.T(), err)
5731+
assert.IsType(suite.T(), &api.RequestError{}, err)
5732+
5733+
// Stream with option to create creates stream & Group
5734+
opts := options.NewXGroupCreateOptions().SetMakeStream()
5735+
suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts))
5736+
5737+
// ...and again results in BUSYGROUP error, because group names must be unique
5738+
_, err = client.XGroupCreate(key, group1, id)
5739+
assert.ErrorContains(suite.T(), err, "BUSYGROUP")
5740+
assert.IsType(suite.T(), &api.RequestError{}, err)
5741+
5742+
// TODO add XGroupDestroy tests there
5743+
5744+
// ENTRIESREAD option was added in valkey 7.0.0
5745+
opts = options.NewXGroupCreateOptions().SetEntriesRead(100)
5746+
if suite.serverVersion >= "7.0.0" {
5747+
suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts))
5748+
} else {
5749+
_, err = client.XGroupCreateWithOptions(key, group2, id, opts)
5750+
assert.Error(suite.T(), err)
5751+
assert.IsType(suite.T(), &api.RequestError{}, err)
5752+
}
5753+
5754+
// key is not a stream
5755+
key = uuid.NewString()
5756+
suite.verifyOK(client.Set(key, id))
5757+
_, err = client.XGroupCreate(key, group1, id)
5758+
assert.Error(suite.T(), err)
5759+
assert.IsType(suite.T(), &api.RequestError{}, err)
5760+
})
5761+
}

0 commit comments

Comments
 (0)