Skip to content

Commit 8d99de9

Browse files
authored
GO: Lazy connect implementation (valkey-io#4374)
* GO: Lazy connect implementation Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --------- Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent b22632d commit 8d99de9

File tree

5 files changed

+266
-0
lines changed

5 files changed

+266
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#### Fixes
66

77
* Java: Add lazy connection support to Java module ([#4350](https://github.com/valkey-io/valkey-glide/pull/4370))
8+
* Go: Add lazy connection support ([#4374](https://github.com/valkey-io/valkey-glide/pull/4374))
89

910
## 2.0 (2025-06-18)
1011

go/config/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type baseClientConfiguration struct {
101101
clientName string
102102
clientAZ string
103103
reconnectStrategy *BackoffStrategy
104+
lazyConnect bool
104105
}
105106

106107
func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest, error) {
@@ -147,6 +148,10 @@ func (config *baseClientConfiguration) toProtobuf() (*protobuf.ConnectionRequest
147148
request.ConnectionRetryStrategy = config.reconnectStrategy.toProtobuf()
148149
}
149150

151+
if config.lazyConnect {
152+
request.LazyConnect = config.lazyConnect
153+
}
154+
150155
return &request, nil
151156
}
152157

@@ -269,6 +274,14 @@ func (config *ClientConfiguration) WithUseTLS(useTLS bool) *ClientConfiguration
269274
return config
270275
}
271276

277+
// WithLazyConnect configures whether the client should establish connections lazily. When set to true,
278+
// the client will only establish connections when needed for the first operation, rather than
279+
// immediately upon client creation.
280+
func (config *ClientConfiguration) WithLazyConnect(lazyConnect bool) *ClientConfiguration {
281+
config.lazyConnect = lazyConnect
282+
return config
283+
}
284+
272285
// WithCredentials sets the credentials for the authentication process. If none are set, the client will not authenticate
273286
// itself with the server.
274287
func (config *ClientConfiguration) WithCredentials(credentials *ServerCredentials) *ClientConfiguration {
@@ -410,6 +423,14 @@ func (config *ClusterClientConfiguration) WithUseTLS(useTLS bool) *ClusterClient
410423
return config
411424
}
412425

426+
// WithLazyConnect configures whether the client should establish connections lazily. When set to true,
427+
// the client will only establish connections when needed for the first operation, rather than
428+
// immediately upon client creation.
429+
func (config *ClusterClientConfiguration) WithLazyConnect(lazyConnect bool) *ClusterClientConfiguration {
430+
config.lazyConnect = lazyConnect
431+
return config
432+
}
433+
413434
// WithCredentials sets the credentials for the authentication process. If none are set, the client will not authenticate
414435
// itself with the server.
415436
func (config *ClusterClientConfiguration) WithCredentials(

go/config/config_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,47 @@ func TestConfig_InvalidRequestAndConnectionTimeouts(t *testing.T) {
296296
_, err8 := config8.ToProtobuf()
297297
assert.EqualError(t, err8, "setting connection timeout returned an error: invalid duration was specified")
298298
}
299+
300+
func TestConfig_LazyConnect(t *testing.T) {
301+
// Test for ClientConfiguration
302+
clientConfig := NewClientConfiguration().
303+
WithLazyConnect(true)
304+
305+
clientResult, err := clientConfig.ToProtobuf()
306+
if err != nil {
307+
t.Fatalf("Failed to convert client config to protobuf: %v", err)
308+
}
309+
310+
assert.True(t, clientResult.LazyConnect)
311+
312+
// Test for ClusterClientConfiguration
313+
clusterConfig := NewClusterClientConfiguration().
314+
WithLazyConnect(true)
315+
316+
clusterResult, err := clusterConfig.ToProtobuf()
317+
if err != nil {
318+
t.Fatalf("Failed to convert cluster config to protobuf: %v", err)
319+
}
320+
321+
assert.True(t, clusterResult.LazyConnect)
322+
323+
// Test default value (false) for ClientConfiguration
324+
defaultClientConfig := NewClientConfiguration()
325+
326+
defaultClientResult, err := defaultClientConfig.ToProtobuf()
327+
if err != nil {
328+
t.Fatalf("Failed to convert default client config to protobuf: %v", err)
329+
}
330+
331+
assert.False(t, defaultClientResult.LazyConnect)
332+
333+
// Test default value (false) for ClusterClientConfiguration
334+
defaultClusterConfig := NewClusterClientConfiguration()
335+
336+
defaultClusterResult, err := defaultClusterConfig.ToProtobuf()
337+
if err != nil {
338+
t.Fatalf("Failed to convert default cluster config to protobuf: %v", err)
339+
}
340+
341+
assert.False(t, defaultClusterResult.LazyConnect)
342+
}

go/integTest/connection_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package integTest
44

55
import (
66
"context"
7+
"fmt"
78
"strings"
89
"sync"
910
"time"
@@ -14,6 +15,126 @@ import (
1415
"github.com/valkey-io/valkey-glide/go/v2/internal/interfaces"
1516
)
1617

18+
func startDedicatedValkeyServer(suite *GlideTestSuite, clusterMode bool) (string, error) {
19+
// Build command arguments
20+
args := []string{}
21+
args = append(args, "start")
22+
if clusterMode {
23+
args = append(args, "--cluster-mode")
24+
}
25+
26+
args = append(args, fmt.Sprintf("-r %d", 0))
27+
28+
// Execute cluster manager script
29+
output := runClusterManager(suite, args, false)
30+
31+
return output, nil
32+
}
33+
34+
func stopDedicatedValkeyServer(suite *GlideTestSuite, clusterFolder string) {
35+
args := []string{}
36+
args = append(args, "stop", "--cluster-folder", clusterFolder)
37+
38+
runClusterManager(suite, args, false)
39+
}
40+
41+
func createDedicatedClient(
42+
addresses []config.NodeAddress,
43+
clusterMode bool,
44+
lazyConnect bool,
45+
) (interfaces.BaseClientCommands, error) {
46+
if clusterMode {
47+
cfg := config.NewClusterClientConfiguration()
48+
for _, addr := range addresses {
49+
cfg.WithAddress(&addr)
50+
}
51+
52+
cfg.WithRequestTimeout(3 * time.Second)
53+
advCfg := config.NewAdvancedClusterClientConfiguration()
54+
advCfg.WithConnectionTimeout(3 * time.Second)
55+
cfg.WithAdvancedConfiguration(advCfg)
56+
cfg.WithLazyConnect(lazyConnect)
57+
58+
return glide.NewClusterClient(cfg)
59+
}
60+
61+
cfg := config.NewClientConfiguration()
62+
for _, addr := range addresses {
63+
cfg.WithAddress(&addr)
64+
}
65+
66+
cfg.WithRequestTimeout(3 * time.Second)
67+
advCfg := config.NewAdvancedClientConfiguration()
68+
advCfg.WithConnectionTimeout(3 * time.Second)
69+
cfg.WithAdvancedConfiguration(advCfg)
70+
cfg.WithLazyConnect(lazyConnect)
71+
72+
return glide.NewClient(cfg)
73+
}
74+
75+
// getClientListOutputCount parses CLIENT LIST output and returns the number of clients
76+
func getClientListOutputCount(output interface{}) int {
77+
if output == nil {
78+
return 0
79+
}
80+
81+
text := output.(string)
82+
83+
if text = strings.TrimSpace(text); text == "" {
84+
return 0
85+
}
86+
87+
return len(strings.Split(text, "\n"))
88+
}
89+
90+
// getClientCount returns the number of connected clients
91+
func getClientCount(ctx context.Context, client interfaces.BaseClientCommands) (int, error) {
92+
if clusterClient, ok := client.(interfaces.GlideClusterClientCommands); ok {
93+
// For cluster client, execute CLIENT LIST on all nodes
94+
result, err := clusterClient.CustomCommandWithRoute(ctx, []string{"CLIENT", "LIST"}, config.AllNodes)
95+
if err != nil {
96+
return 0, err
97+
}
98+
99+
// Result will be a map with node addresses as keys and CLIENT LIST output as values
100+
totalCount := 0
101+
for _, nodeOutput := range result.MultiValue() {
102+
totalCount += getClientListOutputCount(nodeOutput)
103+
}
104+
return totalCount, nil
105+
}
106+
107+
// For standalone client, execute CLIENT LIST directly
108+
glideClient := client.(interfaces.GlideClientCommands)
109+
result, err := glideClient.CustomCommand(ctx, []string{"CLIENT", "LIST"})
110+
if err != nil {
111+
return 0, err
112+
}
113+
return getClientListOutputCount(result), nil
114+
}
115+
116+
// getExpectedNewConnections returns the expected number of new connections when a lazy client is initialized
117+
func getExpectedNewConnections(ctx context.Context, client interfaces.BaseClientCommands) (int, error) {
118+
if clusterClient, ok := client.(interfaces.GlideClusterClientCommands); ok {
119+
// For cluster, get node count and multiply by 2 (2 connections per node)
120+
result, err := clusterClient.CustomCommand(ctx, []string{"CLUSTER", "NODES"})
121+
if err != nil {
122+
return 0, err
123+
}
124+
125+
nodesInfo := result.SingleValue().(string)
126+
127+
if nodesInfo = strings.TrimSpace(nodesInfo); nodesInfo == "" {
128+
return 0, nil
129+
}
130+
131+
return len(strings.Split(nodesInfo, "\n")) * 2, nil
132+
}
133+
134+
// For standalone, always expect 1 new connection
135+
return 1, nil
136+
}
137+
17138
func (suite *GlideTestSuite) TestStandaloneConnect() {
18139
config := config.NewClientConfiguration().
19140
WithAddress(&suite.standaloneHosts[0])
@@ -156,3 +277,60 @@ func (suite *GlideTestSuite) TestConnectionTimeout() {
156277
}
157278
})
158279
}
280+
281+
func (suite *GlideTestSuite) TestLazyConnectionEstablishesOnFirstCommand() {
282+
// Run test for both standalone and cluster modes
283+
suite.runWithTimeoutClients(func(client interfaces.BaseClientCommands) {
284+
ctx := context.Background()
285+
_, isCluster := client.(interfaces.GlideClusterClientCommands)
286+
287+
// Create a monitoring client (eagerly connected)
288+
output, err := startDedicatedValkeyServer(suite, isCluster)
289+
suite.NoError(err)
290+
clusterFolder := extractClusterFolder(suite, output)
291+
addresses := extractAddresses(suite, output)
292+
defer stopDedicatedValkeyServer(suite, clusterFolder)
293+
monitoringClient, err := createDedicatedClient(addresses, isCluster, false)
294+
suite.NoError(err)
295+
defer monitoringClient.Close()
296+
297+
// Get initial client count
298+
clientsBeforeLazyInit, err := getClientCount(ctx, monitoringClient)
299+
suite.NoError(err)
300+
301+
// Create the "lazy" client
302+
lazyClient, err := createDedicatedClient(addresses, isCluster, true)
303+
suite.NoError(err)
304+
defer lazyClient.Close()
305+
306+
// Check count (should not change)
307+
clientsAfterLazyInit, err := getClientCount(ctx, monitoringClient)
308+
suite.NoError(err)
309+
suite.Equal(clientsBeforeLazyInit, clientsAfterLazyInit,
310+
"Lazy client should not connect before the first command")
311+
312+
// Send the first command using the lazy client
313+
var result interface{}
314+
if isCluster {
315+
clusterClient := lazyClient.(interfaces.GlideClusterClientCommands)
316+
result, err = clusterClient.Ping(ctx)
317+
} else {
318+
glideClient := lazyClient.(interfaces.GlideClientCommands)
319+
result, err = glideClient.Ping(ctx)
320+
}
321+
suite.NoError(err)
322+
323+
// Assert PING success for both modes
324+
suite.Equal("PONG", result)
325+
326+
// Check client count after the first command
327+
clientsAfterFirstCommand, err := getClientCount(ctx, monitoringClient)
328+
suite.NoError(err)
329+
330+
expectedNewConnections, err := getExpectedNewConnections(ctx, monitoringClient)
331+
suite.NoError(err)
332+
333+
suite.Equal(clientsBeforeLazyInit+expectedNewConnections, clientsAfterFirstCommand,
334+
"Lazy client should establish expected number of new connections after the first command")
335+
})
336+
}

go/integTest/glide_test_suite_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,28 @@ func parseHosts(suite *GlideTestSuite, addresses string) []config.NodeAddress {
146146
return result
147147
}
148148

149+
func extractClusterFolder(suite *GlideTestSuite, output string) string {
150+
lines := strings.Split(output, "\n")
151+
foundFolder := false
152+
clusterFolder := ""
153+
154+
for _, line := range lines {
155+
if strings.Contains(line, "CLUSTER_FOLDER=") {
156+
parts := strings.SplitN(line, "CLUSTER_FOLDER=", 2)
157+
if len(parts) != 2 {
158+
suite.T().Fatalf("invalid CLUSTER_FOLDER line format: %s", line)
159+
}
160+
clusterFolder = strings.TrimSpace(parts[1])
161+
foundFolder = true
162+
}
163+
}
164+
165+
if !foundFolder {
166+
suite.T().Fatalf("missing required output fields")
167+
}
168+
return clusterFolder
169+
}
170+
149171
func extractAddresses(suite *GlideTestSuite, output string) []config.NodeAddress {
150172
for _, line := range strings.Split(output, "\n") {
151173
if !strings.HasPrefix(line, "CLUSTER_NODES=") {

0 commit comments

Comments
 (0)