Skip to content

Commit a8d4b16

Browse files
authored
Go: Add command XRange and XRevRange (valkey-io#2989)
* Go: Add command XRange and XRevRange Signed-off-by: TJ Zhang <[email protected]>
1 parent 448689c commit a8d4b16

File tree

7 files changed

+480
-8
lines changed

7 files changed

+480
-8
lines changed

go/api/base_client.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6614,3 +6614,205 @@ func (client *baseClient) CopyWithOptions(
66146614
}
66156615
return handleBoolResponse(result)
66166616
}
6617+
6618+
// Returns stream entries matching a given range of IDs.
6619+
//
6620+
// See [valkey.io] for details.
6621+
//
6622+
// Parameters:
6623+
//
6624+
// key - The key of the stream.
6625+
// start - The start position.
6626+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6627+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6628+
// end - The end position.
6629+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6630+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6631+
//
6632+
// Return value:
6633+
//
6634+
// A `map` of key to stream entry data, where entry data is an array of
6635+
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
6636+
//
6637+
// Example:
6638+
//
6639+
// // Retrieve all stream entries
6640+
// res, err := client.XRange(
6641+
// "key",
6642+
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
6643+
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
6644+
// )
6645+
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
6646+
//
6647+
// // Retrieve exactly one stream entry by id
6648+
// res, err := client.XRange(
6649+
// "key",
6650+
// options.NewStreamBoundary(streamId, true),
6651+
// options.NewStreamBoundary(streamId, true),
6652+
// )
6653+
// fmt.Println(res) // map[key:[["field1", "entry1"]]
6654+
//
6655+
// [valkey.io]: https://valkey.io/commands/xrange/
6656+
func (client *baseClient) XRange(
6657+
key string,
6658+
start options.StreamBoundary,
6659+
end options.StreamBoundary,
6660+
) (map[string][][]string, error) {
6661+
return client.XRangeWithOptions(key, start, end, nil)
6662+
}
6663+
6664+
// Returns stream entries matching a given range of IDs.
6665+
//
6666+
// See [valkey.io] for details.
6667+
//
6668+
// Parameters:
6669+
//
6670+
// key - The key of the stream.
6671+
// start - The start position.
6672+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6673+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6674+
// end - The end position.
6675+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6676+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6677+
// opts - Stream range options.
6678+
//
6679+
// Return value:
6680+
//
6681+
// A `map` of key to stream entry data, where entry data is an array of
6682+
// pairings with format `[[field, entry], [field, entry], ...]`. Returns `nil` if `count` is non-positive.
6683+
//
6684+
// Example:
6685+
//
6686+
// // Retrieve all stream entries
6687+
// res, err := client.XRangeWithOptions(
6688+
// "key",
6689+
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
6690+
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
6691+
// options.NewStreamRangeOptions().SetCount(10),
6692+
// )
6693+
// fmt.Println(res) // map[key:[["field1", "entry1"], ["field2", "entry2"]]]
6694+
//
6695+
// // Retrieve exactly one stream entry by id
6696+
// res, err := client.XRangeWithOptions(
6697+
// "key",
6698+
// options.NewStreamBoundary(streamId, true),
6699+
// options.NewStreamBoundary(streamId, true),
6700+
// options.NewStreamRangeOptions().SetCount(1),
6701+
// )
6702+
// fmt.Println(res) // map[key:[["field1", "entry1"]]
6703+
//
6704+
// [valkey.io]: https://valkey.io/commands/xrange/
6705+
func (client *baseClient) XRangeWithOptions(
6706+
key string,
6707+
start options.StreamBoundary,
6708+
end options.StreamBoundary,
6709+
opts *options.StreamRangeOptions,
6710+
) (map[string][][]string, error) {
6711+
args := []string{key, string(start), string(end)}
6712+
if opts != nil {
6713+
optionArgs, err := opts.ToArgs()
6714+
if err != nil {
6715+
return nil, err
6716+
}
6717+
args = append(args, optionArgs...)
6718+
}
6719+
result, err := client.executeCommand(C.XRange, args)
6720+
if err != nil {
6721+
return nil, err
6722+
}
6723+
return handleMapOfArrayOfStringArrayOrNilResponse(result)
6724+
}
6725+
6726+
// Returns stream entries matching a given range of IDs in reverse order.
6727+
// Equivalent to `XRange` but returns entries in reverse order.
6728+
//
6729+
// See [valkey.io] for details.
6730+
//
6731+
// Parameters:
6732+
//
6733+
// key - The key of the stream.
6734+
// start - The start position.
6735+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6736+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6737+
// end - The end position.
6738+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6739+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6740+
//
6741+
// Return value:
6742+
//
6743+
// A `map` of key to stream entry data, where entry data is an array of
6744+
// pairings with format `[[field, entry], [field, entry], ...]`.
6745+
//
6746+
// Example:
6747+
//
6748+
// // Retrieve all stream entries
6749+
// res, err := client.XRevRange(
6750+
// "key",
6751+
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
6752+
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
6753+
// )
6754+
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
6755+
//
6756+
// [valkey.io]: https://valkey.io/commands/xrevrange/
6757+
func (client *baseClient) XRevRange(
6758+
key string,
6759+
start options.StreamBoundary,
6760+
end options.StreamBoundary,
6761+
) (map[string][][]string, error) {
6762+
return client.XRevRangeWithOptions(key, start, end, nil)
6763+
}
6764+
6765+
// Returns stream entries matching a given range of IDs in reverse order.
6766+
// Equivalent to `XRange` but returns entries in reverse order.
6767+
//
6768+
// See [valkey.io] for details.
6769+
//
6770+
// Parameters:
6771+
//
6772+
// key - The key of the stream.
6773+
// start - The start position.
6774+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6775+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6776+
// end - The end position.
6777+
// Use `options.NewStreamBoundary()` to specify a stream entry ID and its inclusive/exclusive status.
6778+
// Use `options.NewInfiniteStreamBoundary()` to specify an infinite stream boundary.
6779+
// opts - Stream range options.
6780+
//
6781+
// Return value:
6782+
//
6783+
// A `map` of key to stream entry data, where entry data is an array of
6784+
// pairings with format `[[field, entry], [field, entry], ...]`.
6785+
// Returns `nil` if `count` is non-positive.
6786+
//
6787+
// Example:
6788+
//
6789+
// // Retrieve all stream entries
6790+
// res, err := client.XRevRangeWithOptions(
6791+
// "key",
6792+
// options.NewInfiniteStreamBoundary(options.PositiveInfinity),
6793+
// options.NewInfiniteStreamBoundary(options.NegativeInfinity),
6794+
// options.NewStreamRangeOptions().SetCount(10),
6795+
// )
6796+
// fmt.Println(res) // map[key:[["field2", "entry2"], ["field1", "entry1"]]]
6797+
//
6798+
// [valkey.io]: https://valkey.io/commands/xrevrange/
6799+
func (client *baseClient) XRevRangeWithOptions(
6800+
key string,
6801+
start options.StreamBoundary,
6802+
end options.StreamBoundary,
6803+
opts *options.StreamRangeOptions,
6804+
) (map[string][][]string, error) {
6805+
args := []string{key, string(start), string(end)}
6806+
if opts != nil {
6807+
optionArgs, err := opts.ToArgs()
6808+
if err != nil {
6809+
return nil, err
6810+
}
6811+
args = append(args, optionArgs...)
6812+
}
6813+
result, err := client.executeCommand(C.XRevRange, args)
6814+
if err != nil {
6815+
return nil, err
6816+
}
6817+
return handleMapOfArrayOfStringArrayOrNilResponse(result)
6818+
}

go/api/options/constants.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,12 @@ const (
1010
NoScores string = "NOSCORES" // Valkey API keyword for the no scores option for zscan command.
1111
WithValues string = "WITHVALUES" // Valkey API keyword to query hash values along their names in `HRANDFIELD`.
1212
)
13+
14+
type InfBoundary string
15+
16+
const (
17+
// The highest bound in the sorted set
18+
PositiveInfinity InfBoundary = "+"
19+
// The lowest bound in the sorted set
20+
NegativeInfinity InfBoundary = "-"
21+
)

go/api/options/stream_options.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,3 +398,45 @@ func (sco *StreamClaimOptions) ToArgs() ([]string, error) {
398398

399399
return optionArgs, nil
400400
}
401+
402+
type StreamBoundary string
403+
404+
// Create a new stream boundary.
405+
func NewStreamBoundary(streamId string, isInclusive bool) StreamBoundary {
406+
if !isInclusive {
407+
return StreamBoundary("(" + streamId)
408+
}
409+
return StreamBoundary(streamId)
410+
}
411+
412+
// Create a new stream boundary defined by an infinity.
413+
func NewInfiniteStreamBoundary(bound InfBoundary) StreamBoundary {
414+
return StreamBoundary(string(bound))
415+
}
416+
417+
// Optional arguments for `XRange` and `XRevRange` in [StreamCommands]
418+
type StreamRangeOptions struct {
419+
count int64
420+
countIsSet bool
421+
}
422+
423+
func NewStreamRangeOptions() *StreamRangeOptions {
424+
return &StreamRangeOptions{}
425+
}
426+
427+
// Set the count.
428+
func (sro *StreamRangeOptions) SetCount(count int64) *StreamRangeOptions {
429+
sro.count = count
430+
sro.countIsSet = true
431+
return sro
432+
}
433+
434+
func (sro *StreamRangeOptions) ToArgs() ([]string, error) {
435+
var args []string
436+
437+
if sro.countIsSet {
438+
args = append(args, "COUNT", utils.IntToString(sro.count))
439+
}
440+
441+
return args, nil
442+
}

go/api/options/zrange_options.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,10 @@ type RangeByLex struct {
3939
}
4040

4141
type (
42-
InfBoundary string
4342
scoreBoundary string
4443
lexBoundary string
4544
)
4645

47-
const (
48-
// The highest bound in the sorted set
49-
PositiveInfinity InfBoundary = "+"
50-
// The lowest bound in the sorted set
51-
NegativeInfinity InfBoundary = "-"
52-
)
53-
5446
// Create a new inclusive score boundary.
5547
func NewInclusiveScoreBoundary(bound float64) scoreBoundary {
5648
return scoreBoundary(utils.FloatToString(bound))

go/api/response_handlers.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,14 @@ func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (
711711
return claimedEntries, nil
712712
}
713713

714+
func handleMapOfArrayOfStringArrayOrNilResponse(response *C.struct_CommandResponse) (map[string][][]string, error) {
715+
if response.response_type == uint32(C.Null) {
716+
return nil, nil
717+
}
718+
719+
return handleMapOfArrayOfStringArrayResponse(response)
720+
}
721+
714722
func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
715723
defer C.free_command_response(response)
716724
var null XAutoClaimResponse // default response

go/api/stream_commands.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,22 @@ type StreamCommands interface {
108108
ids []string,
109109
options *options.StreamClaimOptions,
110110
) ([]string, error)
111+
112+
XRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
113+
114+
XRangeWithOptions(
115+
key string,
116+
start options.StreamBoundary,
117+
end options.StreamBoundary,
118+
options *options.StreamRangeOptions,
119+
) (map[string][][]string, error)
120+
121+
XRevRange(key string, start options.StreamBoundary, end options.StreamBoundary) (map[string][][]string, error)
122+
123+
XRevRangeWithOptions(
124+
key string,
125+
start options.StreamBoundary,
126+
end options.StreamBoundary,
127+
options *options.StreamRangeOptions,
128+
) (map[string][][]string, error)
111129
}

0 commit comments

Comments
 (0)