Skip to content

Commit 2cd229f

Browse files
Go: XINFO CONSUMERS. (valkey-io#3120)
* Go: `XINFO CONSUMERS`. Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
1 parent bac33e5 commit 2cd229f

File tree

7 files changed

+251
-0
lines changed

7 files changed

+251
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Go: Add ZRangeStore ([3105](https://github.com/valkey-io/valkey-glide/pull/3105))
55
* Go: Add `ZUNION` ([#3119](https://github.com/valkey-io/valkey-glide/pull/3119))
66
* Go: Add `ZUNIONSTORE` ([#3136](https://github.com/valkey-io/valkey-glide/pull/3136))
7+
* Go: Add `XINFO CONSUMERS` ([#3120](https://github.com/valkey-io/valkey-glide/pull/3120))
78
* Go: Add `XINFO GROUPS` ([#3106](https://github.com/valkey-io/valkey-glide/pull/3106))
89
* Go: Add `ZInterCard` ([#3078](https://github.com/valkey-io/valkey-glide/issues/3078))
910

go/api/base_client.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5813,6 +5813,30 @@ func (client *baseClient) XInfoStreamFullWithOptions(
58135813
return handleStringToAnyMapResponse(result)
58145814
}
58155815

5816+
// Returns the list of all consumers and their attributes for the given consumer group of the
5817+
// stream stored at `key`.
5818+
//
5819+
// See [valkey.io] for details.
5820+
//
5821+
// Parameters:
5822+
//
5823+
// key - The key of the stream.
5824+
// group - The consumer group name.
5825+
//
5826+
// Return value:
5827+
//
5828+
// An array of [api.XInfoConsumerInfo], where each element contains the attributes
5829+
// of a consumer for the given consumer group of the stream at `key`.
5830+
//
5831+
// [valkey.io]: https://valkey.io/commands/xinfo-consumers/
5832+
func (client *baseClient) XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error) {
5833+
response, err := client.executeCommand(C.XInfoConsumers, []string{key, group})
5834+
if err != nil {
5835+
return nil, err
5836+
}
5837+
return handleXInfoConsumersResponse(response)
5838+
}
5839+
58165840
// Returns the list of all consumer groups and their attributes for the stream stored at `key`.
58175841
//
58185842
// See [valkey.io] for details.

go/api/response_handlers.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,49 @@ func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendin
11781178
return pendingDetails, nil
11791179
}
11801180

1181+
func handleXInfoConsumersResponse(response *C.struct_CommandResponse) ([]XInfoConsumerInfo, error) {
1182+
defer C.free_command_response(response)
1183+
1184+
typeErr := checkResponseType(response, C.Array, false)
1185+
if typeErr != nil {
1186+
return nil, typeErr
1187+
}
1188+
arrData, err := parseArray(response)
1189+
if err != nil {
1190+
return nil, err
1191+
}
1192+
converted, err := arrayConverter[map[string]interface{}]{
1193+
nil,
1194+
false,
1195+
}.convert(arrData)
1196+
if err != nil {
1197+
return nil, err
1198+
}
1199+
arr, ok := converted.([]map[string]interface{})
1200+
if !ok {
1201+
return nil, &errors.RequestError{Msg: fmt.Sprintf("unexpected type: %T", converted)}
1202+
}
1203+
1204+
result := make([]XInfoConsumerInfo, 0, len(arr))
1205+
1206+
for _, group := range arr {
1207+
info := XInfoConsumerInfo{
1208+
Name: group["name"].(string),
1209+
Pending: group["pending"].(int64),
1210+
Idle: group["idle"].(int64),
1211+
}
1212+
switch inactive := group["inactive"].(type) {
1213+
case int64:
1214+
info.Inactive = CreateInt64Result(inactive)
1215+
default:
1216+
info.Inactive = CreateNilInt64Result()
1217+
}
1218+
result = append(result, info)
1219+
}
1220+
1221+
return result, nil
1222+
}
1223+
11811224
func handleXInfoGroupsResponse(response *C.struct_CommandResponse) ([]XInfoGroupInfo, error) {
11821225
defer C.free_command_response(response)
11831226

go/api/response_types.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,21 @@ func CreateNilXPendingSummary() XPendingSummary {
244244
return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)}
245245
}
246246

247+
// XInfoConsumerInfo represents a group information returned by `XInfoConsumers` command.
248+
type XInfoConsumerInfo struct {
249+
// The consumer's name.
250+
Name string
251+
// The number of entries in the PEL: pending messages for the consumer, which are messages that were delivered but are yet
252+
// to be acknowledged.
253+
Pending int64
254+
// The number of milliseconds that have passed since the consumer's last attempted interaction (Examples: XREADGROUP,
255+
// XCLAIM, XAUTOCLAIM).
256+
Idle int64
257+
// The number of milliseconds that have passed since the consumer's last successful interaction (Examples: XREADGROUP that
258+
// actually read some entries into the PEL, XCLAIM/XAUTOCLAIM that actually claimed some entries).
259+
Inactive Result[int64]
260+
}
261+
247262
// XInfoGroupInfo represents a group information returned by `XInfoGroups` command.
248263
type XInfoGroupInfo struct {
249264
// The consumer group's name.

go/api/stream_commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ type StreamCommands interface {
113113

114114
XInfoStreamFullWithOptions(key string, options *options.XInfoStreamOptions) (map[string]any, error)
115115

116+
XInfoConsumers(key string, group string) ([]XInfoConsumerInfo, error)
117+
116118
XInfoGroups(key string) ([]XInfoGroupInfo, error)
117119

118120
XRange(key string, start options.StreamBoundary, end options.StreamBoundary) ([]XRangeResponse, error)

go/api/stream_commands_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"regexp"
99
"strings"
10+
"time"
1011

1112
"github.com/google/uuid"
1213
"github.com/valkey-io/valkey-glide/go/api/options"
@@ -1778,6 +1779,71 @@ func ExampleGlideClusterClient_XInfoStreamFullWithOptions() {
17781779
// }
17791780
}
17801781

1782+
func ExampleGlideClient_XInfoConsumers() {
1783+
var client *GlideClient = getExampleGlideClient() // example helper function
1784+
key := uuid.NewString()
1785+
group := "myGroup"
1786+
1787+
// create an empty stream with a group
1788+
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
1789+
// add couple of entries
1790+
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
1791+
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
1792+
// read them
1793+
client.XReadGroup(group, "myConsumer", map[string]string{key: ">"})
1794+
// get the info
1795+
time.Sleep(100 * time.Millisecond)
1796+
response, err := client.XInfoConsumers(key, group)
1797+
if err != nil {
1798+
fmt.Println("Glide example failed with an error: ", err)
1799+
}
1800+
1801+
// Expanded:
1802+
fmt.Printf("Consumer name: %s\n", response[0].Name)
1803+
fmt.Printf("PEL count: %d\n", response[0].Pending)
1804+
// exact values of `Idle` and `Inactive` depend on timing
1805+
fmt.Printf("Idle > 0: %t\n", response[0].Idle > 0)
1806+
fmt.Printf("Inactive > 0: %t\n", response[0].Inactive.Value() > 0) // Added in version 7.0.0
1807+
// Output:
1808+
// Consumer name: myConsumer
1809+
// PEL count: 2
1810+
// Idle > 0: true
1811+
// Inactive > 0: true
1812+
}
1813+
1814+
func ExampleGlideClusterClient_XInfoConsumers() {
1815+
var client *GlideClusterClient = getExampleGlideClusterClient() // example helper function
1816+
key := uuid.NewString()
1817+
group := "myGroup"
1818+
consumer := "myConsumer"
1819+
1820+
// create an empty stream with a group
1821+
client.XGroupCreateWithOptions(key, group, "0-0", *options.NewXGroupCreateOptions().SetMakeStream())
1822+
// add couple of entries
1823+
client.XAddWithOptions(key, [][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}}, *options.NewXAddOptions().SetId("0-1"))
1824+
client.XAddWithOptions(key, [][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}}, *options.NewXAddOptions().SetId("0-2"))
1825+
// read them
1826+
client.XReadGroupWithOptions(group, consumer, map[string]string{key: ">"}, *options.NewXReadGroupOptions().SetCount(1))
1827+
// get the info
1828+
time.Sleep(100 * time.Millisecond)
1829+
response, err := client.XInfoConsumers(key, group)
1830+
if err != nil {
1831+
fmt.Println("Glide example failed with an error: ", err)
1832+
}
1833+
1834+
// Expanded:
1835+
fmt.Printf("Consumer name: %s\n", response[0].Name)
1836+
fmt.Printf("PEL count: %d\n", response[0].Pending)
1837+
// exact values of `Idle` and `Inactive` depend on timing
1838+
fmt.Printf("Idle > 0: %t\n", response[0].Idle > 0)
1839+
fmt.Printf("Inactive > 0: %t\n", response[0].Inactive.Value() > 0) // Added in version 7.0.0
1840+
// Output:
1841+
// Consumer name: myConsumer
1842+
// PEL count: 1
1843+
// Idle > 0: true
1844+
// Inactive > 0: true
1845+
}
1846+
17811847
func ExampleGlideClient_XInfoGroups() {
17821848
var client *GlideClient = getExampleGlideClient() // example helper function
17831849
key := uuid.NewString()

go/integTest/shared_commands_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7124,6 +7124,106 @@ func (suite *GlideTestSuite) TestXInfoStream() {
71247124
})
71257125
}
71267126

7127+
func (suite *GlideTestSuite) TestXInfoConsumers() {
7128+
suite.runWithDefaultClients(func(client api.BaseClient) {
7129+
key := uuid.NewString()
7130+
group := uuid.NewString()
7131+
consumer1 := uuid.NewString()
7132+
consumer2 := uuid.NewString()
7133+
7134+
xadd, err := client.XAddWithOptions(
7135+
key,
7136+
[][]string{{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
7137+
*options.NewXAddOptions().SetId("0-1"),
7138+
)
7139+
assert.NoError(suite.T(), err)
7140+
assert.Equal(suite.T(), "0-1", xadd.Value())
7141+
xadd, err = client.XAddWithOptions(
7142+
key,
7143+
[][]string{{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
7144+
*options.NewXAddOptions().SetId("0-2"),
7145+
)
7146+
assert.NoError(suite.T(), err)
7147+
assert.Equal(suite.T(), "0-2", xadd.Value())
7148+
xadd, err = client.XAddWithOptions(key, [][]string{{"e3_f1", "e3_v1"}}, *options.NewXAddOptions().SetId("0-3"))
7149+
assert.NoError(suite.T(), err)
7150+
assert.Equal(suite.T(), "0-3", xadd.Value())
7151+
7152+
suite.verifyOK(client.XGroupCreate(key, group, "0-0"))
7153+
7154+
xReadGroup, err := client.XReadGroupWithOptions(
7155+
group,
7156+
consumer1,
7157+
map[string]string{key: ">"},
7158+
*options.NewXReadGroupOptions().SetCount(1),
7159+
)
7160+
assert.NoError(suite.T(), err)
7161+
expectedResult := map[string]map[string][][]string{
7162+
key: {
7163+
"0-1": {{"e1_f1", "e1_v1"}, {"e1_f2", "e1_v2"}},
7164+
},
7165+
}
7166+
assert.Equal(suite.T(), expectedResult, xReadGroup)
7167+
7168+
// Sleep to ensure the idle time value and inactive time value returned by xinfo_consumers is > 0
7169+
time.Sleep(2000 * time.Millisecond)
7170+
info, err := client.XInfoConsumers(key, group)
7171+
assert.NoError(suite.T(), err)
7172+
assert.Len(suite.T(), info, 1)
7173+
assert.Equal(suite.T(), consumer1, info[0].Name)
7174+
assert.Equal(suite.T(), int64(1), info[0].Pending)
7175+
assert.Greater(suite.T(), info[0].Idle, int64(0))
7176+
if suite.serverVersion > "7.2.0" {
7177+
assert.False(suite.T(), info[0].Inactive.IsNil())
7178+
assert.Greater(suite.T(), info[0].Inactive.Value(), int64(0))
7179+
} else {
7180+
assert.True(suite.T(), info[0].Inactive.IsNil())
7181+
}
7182+
7183+
respBool, err := client.XGroupCreateConsumer(key, group, consumer2)
7184+
assert.NoError(suite.T(), err)
7185+
assert.True(suite.T(), respBool)
7186+
7187+
xReadGroup, err = client.XReadGroup(group, consumer2, map[string]string{key: ">"})
7188+
assert.NoError(suite.T(), err)
7189+
expectedResult = map[string]map[string][][]string{
7190+
key: {
7191+
"0-2": {{"e2_f1", "e2_v1"}, {"e2_f2", "e2_v2"}},
7192+
"0-3": {{"e3_f1", "e3_v1"}},
7193+
},
7194+
}
7195+
assert.Equal(suite.T(), expectedResult, xReadGroup)
7196+
7197+
// Verify that xinfo_consumers contains info for 2 consumers now
7198+
info, err = client.XInfoConsumers(key, group)
7199+
assert.NoError(suite.T(), err)
7200+
assert.Len(suite.T(), info, 2)
7201+
7202+
// Passing a non-existing key raises an error
7203+
key = uuid.NewString()
7204+
_, err = client.XInfoConsumers(key, "_")
7205+
assert.IsType(suite.T(), &errors.RequestError{}, err)
7206+
7207+
// key exists, but it is not a stream
7208+
suite.verifyOK(client.Set(key, key))
7209+
_, err = client.XInfoConsumers(key, "_")
7210+
assert.IsType(suite.T(), &errors.RequestError{}, err)
7211+
7212+
// Passing a non-existing group raises an error
7213+
key = uuid.NewString()
7214+
_, err = client.XAdd(key, [][]string{{"a", "b"}})
7215+
assert.NoError(suite.T(), err)
7216+
_, err = client.XInfoConsumers(key, "_")
7217+
assert.IsType(suite.T(), &errors.RequestError{}, err)
7218+
7219+
// no consumers yet
7220+
suite.verifyOK(client.XGroupCreate(key, group, "0-0"))
7221+
info, err = client.XInfoConsumers(key, group)
7222+
assert.NoError(suite.T(), err)
7223+
assert.Empty(suite.T(), info)
7224+
})
7225+
}
7226+
71277227
func (suite *GlideTestSuite) TestXInfoGroups() {
71287228
suite.runWithDefaultClients(func(client api.BaseClient) {
71297229
key := uuid.NewString()

0 commit comments

Comments
 (0)