Skip to content

Commit ac37c7f

Browse files
authored
Go: pubsub shardchannels (valkey-io#3695)
* Add PubSubHandler interface and GetQueue method to BaseClient; enhance PubSubMessageQueue with signal channel support; refactor integration tests to utilize new client creation methods and improve message handling. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor PubSub integration tests to consolidate message receipt patterns into a single comprehensive test function, improving maintainability and readability. This change introduces a parameterized approach to test various client types and message reading methods. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor and enhance PubSub integration tests by consolidating message verification logic and introducing parameterized tests for various client types and message reading methods. This update improves test maintainability and readability while ensuring comprehensive coverage of PubSub functionality. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubChannels and PubSubChannelsWithPattern methods to baseClient for enhanced channel management. Update PubSubCommands interface to include new methods. Introduce example utility functions for standalone and cluster clients with subscription capabilities, ensuring a clean state before tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Enhance client management by adding strong reference checks in close_client function. Update Go module dependencies to include new libraries for improved functionality. Refactor baseClient methods for PubSubChannels and PubSubChannelsWithPattern, ensuring proper error handling and client closure in tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix formatting Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubNumPat method to baseClient for counting unique subscribed patterns. Update PubSubCommands interface and implement integration tests for standalone and cluster clients. Add example usage for documentation. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSub NumSub Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Refactor PubSubNumSub method in baseClient to simplify response handling and improve readability. Update related tests for clarity and consistency in naming conventions. Add new integration tests for PubSubNumSub functionality across standalone and cluster clients. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix merge errors Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubChannels and PubSubChannelsWithPattern methods to baseClient for enhanced channel management. Update PubSubCommands interface to include new methods. Introduce example utility functions for standalone and cluster clients with subscription capabilities, ensuring a clean state before tests. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix formatting Signed-off-by: jbrinkman <joe.brinkman@improving.com> * Add PubSubNumPat method to baseClient for counting unique subscribed patterns. Update PubSubCommands interface and implement integration tests for standalone and cluster clients. Add example usage for documentation. Signed-off-by: jbrinkman <joe.brinkman@improving.com> * sharded publish Signed-off-by: jbrinkman <joe.brinkman@improving.com> * implement sharded mode support in pubsub tests for subscriptions and publishing Signed-off-by: jbrinkman <joe.brinkman@improving.com> * add pubsub shardchannels command Signed-off-by: jbrinkman <joe.brinkman@improving.com> * add guard for server version in integration test Signed-off-by: jbrinkman <joe.brinkman@improving.com> * fix merge errors Signed-off-by: jbrinkman <joe.brinkman@improving.com> --------- Signed-off-by: jbrinkman <joe.brinkman@improving.com>
1 parent f193f1e commit ac37c7f

File tree

5 files changed

+145
-5
lines changed

5 files changed

+145
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
* Go: Add `PubSubChannels` ([#3665](https://github.com/valkey-io/valkey-glide/pull/3665))
5454
* Go: Add `PubSubNumPat` ([#3666](https://github.com/valkey-io/valkey-glide/pull/3666))
5555
* Go: Add `PubSubNumSub` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
56-
* Go: Add `Sharded Publish` ([#3667](https://github.com/valkey-io/valkey-glide/pull/3667))
56+
* Go: Add `Sharded Publish` ([#3692](https://github.com/valkey-io/valkey-glide/pull/3692))
57+
* Go: Add `PubSub ShardChannels` ([#](https://github.com/valkey-io/valkey-glide/pull/))
5758
* Go: Add `Config Rewrite` ([#3156](https://github.com/valkey-io/valkey-glide/pull/3156))
5859
* Go: Add `Random Key` ([#3358](https://github.com/valkey-io/valkey-glide/pull/3358))
5960
* Go: Add Function Load, Function Flush, FCall and FCall_RO ([#3474](https://github.com/valkey-io/valkey-glide/pull/3474))

go/api/glide_cluster_client.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,3 +1659,51 @@ func (client *GlideClusterClient) Publish(channel string, message string, sharde
16591659

16601660
return handleIntResponse(result)
16611661
}
1662+
1663+
// Returns a list of all sharded channels.
1664+
//
1665+
// Since:
1666+
//
1667+
// Valkey 7.0 and above.
1668+
//
1669+
// See [valkey.io] for details.
1670+
//
1671+
// Return value:
1672+
//
1673+
// A list of shard channels.
1674+
//
1675+
// [valkey.io]: https://valkey.io/commands/pubsub-shard-channels
1676+
func (client *GlideClusterClient) PubSubShardChannels() ([]string, error) {
1677+
result, err := client.executeCommand(C.PubSubShardChannels, []string{})
1678+
if err != nil {
1679+
return nil, err
1680+
}
1681+
1682+
return handleStringArrayResponse(result)
1683+
}
1684+
1685+
// Returns a list of all sharded channels that match the given pattern.
1686+
//
1687+
// Since:
1688+
//
1689+
// Valkey 7.0 and above.
1690+
//
1691+
// See [valkey.io] for details.
1692+
//
1693+
// Parameters:
1694+
//
1695+
// pattern - A glob-style pattern to match active shard channels.
1696+
//
1697+
// Return value:
1698+
//
1699+
// A list of shard channels that match the given pattern.
1700+
//
1701+
// [valkey.io]: https://valkey.io/commands/pubsub-shard-channels-with-pattern
1702+
func (client *GlideClusterClient) PubSubShardChannelsWithPattern(pattern string) ([]string, error) {
1703+
result, err := client.executeCommand(C.PubSubShardChannels, []string{pattern})
1704+
if err != nil {
1705+
return nil, err
1706+
}
1707+
1708+
return handleStringArrayResponse(result)
1709+
}

go/api/pubsub_commands.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type PubSubStandaloneCommands interface {
2323
type PubSubClusterCommands interface {
2424
// Publish publishes a message to a channel. Returns the number of clients that received the message.
2525
Publish(channel string, message string, sharded bool) (int64, error)
26+
PubSubShardChannels() ([]string, error)
27+
PubSubShardChannelsWithPattern(pattern string) ([]string, error)
2628
}
2729

2830
type PubSubHandler interface {

go/api/pubsub_commands_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,48 @@ func ExampleGlideClusterClient_PubSubChannelsWithPattern() {
155155
// Output: [news.sports news.weather]
156156
}
157157

158+
func ExampleGlideClusterClient_PubSubShardChannels() {
159+
var publisher *GlideClusterClient = getExampleGlideClusterClient() // example helper function
160+
defer closeAllClients()
161+
162+
// Create subscribers with subscriptions
163+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "channel1")
164+
165+
// Allow subscriptions to establish
166+
time.Sleep(100 * time.Millisecond)
167+
168+
result, err := publisher.PubSubShardChannels()
169+
if err != nil {
170+
fmt.Println("Glide example failed with an error: ", err)
171+
}
172+
fmt.Println(result)
173+
174+
// Output: [channel1]
175+
}
176+
177+
func ExampleGlideClusterClient_PubSubShardChannelsWithPattern() {
178+
var publisher *GlideClusterClient = getExampleGlideClusterClient() // example helper function
179+
defer closeAllClients()
180+
181+
// Create subscribers with subscriptions to different channels
182+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "news.sports")
183+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "news.weather")
184+
getExampleGlideClusterClientWithSubscription(ShardedClusterChannelMode, "events.local")
185+
// Allow subscriptions to establish
186+
time.Sleep(100 * time.Millisecond)
187+
188+
// Get channels matching the "news.*" pattern
189+
result, err := publisher.PubSubShardChannelsWithPattern("news.*")
190+
if err != nil {
191+
fmt.Println("Glide example failed with an error: ", err)
192+
}
193+
194+
sort.Strings(result)
195+
fmt.Println(result)
196+
197+
// Output: [news.sports news.weather]
198+
}
199+
158200
func ExampleGlideClient_PubSubNumPat() {
159201
var publisher *GlideClient = getExampleGlideClient() // example helper function
160202
defer closeAllClients()

go/integTest/pubsub_commands_test.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,57 +19,91 @@ func (suite *GlideTestSuite) TestPubSub_Commands_Channels() {
1919
channelNames []string
2020
pattern string
2121
expectedNames []string
22+
sharded bool
2223
}{
2324
{
2425
name: "Standalone Empty Pattern",
2526
clientType: GlideClient,
2627
channelNames: []string{"news.sports", "news.weather", "events.local"},
2728
pattern: "",
2829
expectedNames: []string{"news.sports", "news.weather", "events.local"},
30+
sharded: false,
2931
},
3032
{
3133
name: "Standalone Exact Match",
3234
clientType: GlideClient,
3335
channelNames: []string{"news.sports", "news.weather", "events.local"},
3436
pattern: "news.sports",
3537
expectedNames: []string{"news.sports"},
38+
sharded: false,
3639
},
3740
{
3841
name: "Standalone Glob Pattern",
3942
clientType: GlideClient,
4043
channelNames: []string{"news.sports", "news.weather", "events.local"},
4144
pattern: "news.*",
4245
expectedNames: []string{"news.sports", "news.weather"},
46+
sharded: false,
4347
},
4448
{
4549
name: "Cluster Empty Pattern",
4650
clientType: GlideClusterClient,
4751
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
4852
pattern: "",
4953
expectedNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
54+
sharded: false,
5055
},
5156
{
5257
name: "Cluster Exact Match",
5358
clientType: GlideClusterClient,
5459
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
5560
pattern: "cluster.news.sports",
5661
expectedNames: []string{"cluster.news.sports"},
62+
sharded: false,
5763
},
5864
{
5965
name: "Cluster Glob Pattern",
6066
clientType: GlideClusterClient,
6167
channelNames: []string{"cluster.news.sports", "cluster.news.weather", "cluster.events.local"},
6268
pattern: "cluster.news.*",
6369
expectedNames: []string{"cluster.news.sports", "cluster.news.weather"},
70+
sharded: false,
71+
},
72+
{
73+
name: "Cluster Sharded Empty Pattern",
74+
clientType: GlideClusterClient,
75+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
76+
pattern: "",
77+
expectedNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
78+
sharded: true,
79+
},
80+
{
81+
name: "Cluster Sharded Exact Match",
82+
clientType: GlideClusterClient,
83+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
84+
pattern: "cluster.shard.news.sports",
85+
expectedNames: []string{"cluster.shard.news.sports"},
86+
sharded: true,
87+
},
88+
{
89+
name: "Cluster Sharded Glob Pattern",
90+
clientType: GlideClusterClient,
91+
channelNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather", "cluster.shard.events.local"},
92+
pattern: "cluster.shard.news.*",
93+
expectedNames: []string{"cluster.shard.news.sports", "cluster.shard.news.weather"},
94+
sharded: true,
6495
},
6596
}
6697

6798
for _, tt := range tests {
6899
suite.T().Run(tt.name, func(t *testing.T) {
100+
if tt.sharded {
101+
suite.SkipIfServerVersionLowerThanBy("7.0.0", t)
102+
}
69103
// Create channel definitions for all channels
70104
channels := make([]ChannelDefn, len(tt.channelNames))
71105
for i, channelName := range tt.channelNames {
72-
channels[i] = ChannelDefn{Channel: channelName, Mode: 0} // ExactMode
106+
channels[i] = ChannelDefn{Channel: channelName, Mode: getChannelMode(tt.sharded)}
73107
}
74108

75109
// Create a client with subscriptions
@@ -82,10 +116,23 @@ func (suite *GlideTestSuite) TestPubSub_Commands_Channels() {
82116
// Get active channels
83117
var activeChannels []string
84118
var err error
85-
if tt.pattern == "" {
86-
activeChannels, err = receiver.PubSubChannels()
119+
if tt.sharded {
120+
// For sharded channels, we need to use the cluster-specific methods
121+
clusterClient, ok := receiver.(*api.GlideClusterClient)
122+
if !ok {
123+
t.Fatal("Expected GlideClusterClient for sharded channels")
124+
}
125+
if tt.pattern == "" {
126+
activeChannels, err = clusterClient.PubSubShardChannels()
127+
} else {
128+
activeChannels, err = clusterClient.PubSubShardChannelsWithPattern(tt.pattern)
129+
}
87130
} else {
88-
activeChannels, err = receiver.PubSubChannelsWithPattern(tt.pattern)
131+
if tt.pattern == "" {
132+
activeChannels, err = receiver.PubSubChannels()
133+
} else {
134+
activeChannels, err = receiver.PubSubChannelsWithPattern(tt.pattern)
135+
}
89136
}
90137
assert.NoError(t, err)
91138

0 commit comments

Comments
 (0)