Skip to content

Commit b4d8781

Browse files
committed
connection: refactor Connect to use Option more
Extend the existing `Option` functionality for connect to be more flexible. Configuring how `connect` behaves relies on new functions like `withMetrics` or `withTimeout` that need to be passed as options at the caller level. This will make it easier to add new configuration options without breaking the api.
1 parent 0f69380 commit b4d8781

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

connection/connection.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func SetMaxGRPCLogLength(characterCount int) {
5656
//
5757
// The function tries to connect for 30 seconds, and returns an error if no connection has been established at that point.
5858
// The function automatically disables TLS and adds interceptor for logging of all gRPC messages at level 5.
59+
// If the metricsManager is 'nil', no metrics will be recorded on the gRPC calls.
5960
//
6061
// For a connection to a Unix Domain socket, the behavior after
6162
// loosing the connection is configurable. The default is to
@@ -70,12 +71,11 @@ func SetMaxGRPCLogLength(characterCount int) {
7071
// For other connections, the default behavior from gRPC is used and
7172
// loss of connection is not detected reliably.
7273
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
73-
return connect(address, metricsManager, []grpc.DialOption{grpc.WithTimeout(time.Second * 30)}, options)
74-
}
75-
76-
// ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded.
77-
func ConnectWithoutMetrics(address string, options ...Option) (*grpc.ClientConn, error) {
78-
return connect(address, nil, []grpc.DialOption{grpc.WithTimeout(time.Second * 30)}, options)
74+
options = append(options, withTimeout(time.Second*30))
75+
if metricsManager != nil {
76+
options = append(options, withMetrics(metricsManager))
77+
}
78+
return connect(address, options)
7979
}
8080

8181
// Option is the type of all optional parameters for Connect.
@@ -105,29 +105,48 @@ func ExitOnConnectionLoss() func() bool {
105105
}
106106
}
107107

108+
// withTimeout adds a configurable timeout on the gRPC calls.
109+
func withTimeout(timeout time.Duration) Option {
110+
return func(o *options) {
111+
o.timeout = timeout
112+
}
113+
}
114+
115+
// withMetrics enables the recording of metrics on the gRPC calls with the provided CSIMetricsManager.
116+
func withMetrics(metricsManager metrics.CSIMetricsManager) Option {
117+
return func(o *options) {
118+
o.metricsManager = metricsManager
119+
}
120+
}
121+
108122
type options struct {
109-
reconnect func() bool
123+
reconnect func() bool
124+
timeout time.Duration
125+
metricsManager metrics.CSIMetricsManager
110126
}
111127

112128
// connect is the internal implementation of Connect. It has more options to enable testing.
113129
func connect(
114130
address string,
115-
metricsManager metrics.CSIMetricsManager,
116-
dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
131+
connectOptions []Option) (*grpc.ClientConn, error) {
117132
var o options
118133
for _, option := range connectOptions {
119134
option(&o)
120135
}
121136

122-
dialOptions = append(dialOptions,
137+
dialOptions := []grpc.DialOption{
123138
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
124139
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
125140
grpc.WithBlock(), // Block until connection succeeds.
126-
)
141+
}
142+
143+
if o.timeout > 0 {
144+
dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout))
145+
}
127146

128147
interceptors := []grpc.UnaryClientInterceptor{LogGRPC}
129-
if metricsManager != nil {
130-
interceptors = append(interceptors, ExtendedCSIMetricsManager{metricsManager}.RecordMetricsClientInterceptor)
148+
if o.metricsManager != nil {
149+
interceptors = append(interceptors, ExtendedCSIMetricsManager{o.metricsManager}.RecordMetricsClientInterceptor)
131150
}
132151
dialOptions = append(dialOptions, grpc.WithChainUnaryInterceptor(interceptors...))
133152

connection/connection_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func TestConnectWithoutMetrics(t *testing.T) {
138138
addr, stopServer := startServer(t, tmp, nil, nil, nil)
139139
defer stopServer()
140140

141-
conn, err := ConnectWithoutMetrics("unix:///" + addr)
141+
conn, err := Connect("unix:///"+addr, nil)
142142
if assert.NoError(t, err, "connect with unix:/// prefix") &&
143143
assert.NotNil(t, conn, "got a connection") {
144144
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
@@ -191,13 +191,13 @@ func TestWaitForServer(t *testing.T) {
191191
}
192192
}
193193

194-
func TestTimout(t *testing.T) {
194+
func TestTimeout(t *testing.T) {
195195
tmp := tmpDir(t)
196196
defer os.RemoveAll(tmp)
197197

198198
startTime := time.Now()
199199
timeout := 5 * time.Second
200-
conn, err := connect(path.Join(tmp, "no-such.sock"), metrics.NewCSIMetricsManager("fake.csi.driver.io"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
200+
conn, err := connect(path.Join(tmp, "no-such.sock"), []Option{withTimeout(timeout)})
201201
endTime := time.Now()
202202
if assert.Error(t, err, "connection should fail") {
203203
assert.InEpsilon(t, timeout, endTime.Sub(startTime), 1, "connection timeout")

0 commit comments

Comments
 (0)