@@ -29,6 +29,8 @@ import (
2929 "github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
3030 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
3131 "google.golang.org/grpc"
32+ "google.golang.org/grpc/backoff"
33+ "google.golang.org/grpc/credentials/insecure"
3234 "k8s.io/klog/v2"
3335)
3436
@@ -73,21 +75,21 @@ func SetMaxGRPCLogLength(characterCount int) {
7375//
7476// For other connections, the default behavior from gRPC is used and
7577// loss of connection is not detected reliably.
76- func Connect (address string , metricsManager metrics.CSIMetricsManager , options ... Option ) (* grpc.ClientConn , error ) {
78+ func Connect (ctx context. Context , address string , metricsManager metrics.CSIMetricsManager , options ... Option ) (* grpc.ClientConn , error ) {
7779 // Prepend default options
7880 options = append ([]Option {WithTimeout (time .Second * 30 )}, options ... )
7981 if metricsManager != nil {
8082 options = append ([]Option {WithMetrics (metricsManager )}, options ... )
8183 }
82- return connect (address , options )
84+ return connect (ctx , address , options )
8385}
8486
8587// ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded.
8688// This function is deprecated, prefer using Connect with `nil` as the metricsManager.
87- func ConnectWithoutMetrics (address string , options ... Option ) (* grpc.ClientConn , error ) {
89+ func ConnectWithoutMetrics (ctx context. Context , address string , options ... Option ) (* grpc.ClientConn , error ) {
8890 // Prepend default options
8991 options = append ([]Option {WithTimeout (time .Second * 30 )}, options ... )
90- return connect (address , options )
92+ return connect (ctx , address , options )
9193}
9294
9395// Option is the type of all optional parameters for Connect.
@@ -97,27 +99,33 @@ type Option func(o *options)
9799// connection got lost. If that callback returns true, the connection
98100// is reestablished. Otherwise the connection is left as it is and
99101// all future gRPC calls using it will fail with status.Unavailable.
100- func OnConnectionLoss (reconnect func () bool ) Option {
102+ func OnConnectionLoss (reconnect func (context. Context ) bool ) Option {
101103 return func (o * options ) {
102104 o .reconnect = reconnect
103105 }
104106}
105107
106108// ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes
107109// an error to /dev/termination-log and exits.
108- func ExitOnConnectionLoss () func () bool {
109- return func () bool {
110+ func ExitOnConnectionLoss () func (context. Context ) bool {
111+ return func (ctx context. Context ) bool {
110112 terminationMsg := "Lost connection to CSI driver, exiting"
111113 if err := os .WriteFile (terminationLogPath , []byte (terminationMsg ), 0644 ); err != nil {
112- klog .Errorf ( "%s: %s " , terminationLogPath , err )
114+ klog .FromContext ( ctx ). Error ( err , "Failed to write a message to the termination logfile " , " terminationLogPath" , terminationLogPath )
113115 }
114- klog .Exit (terminationMsg )
116+ klog .FromContext (ctx ).Error (nil , terminationMsg )
117+ klog .FlushAndExit (klog .ExitFlushTimeout , 1 )
115118 // Not reached.
116119 return false
117120 }
118121}
119122
120123// WithTimeout adds a configurable timeout on the gRPC calls.
124+ // Note that this timeout also prevents all attempts to reconnect
125+ // because it uses context.WithTimeout internally.
126+ //
127+ // For more details, see https://github.com/grpc/grpc-go/issues/133
128+ // and https://github.com/kubernetes-csi/csi-lib-utils/pull/149#discussion_r1574707477
121129func WithTimeout (timeout time.Duration ) Option {
122130 return func (o * options ) {
123131 o .timeout = timeout
@@ -139,30 +147,36 @@ func WithOtelTracing() Option {
139147}
140148
141149type options struct {
142- reconnect func () bool
150+ reconnect func (context. Context ) bool
143151 timeout time.Duration
144152 metricsManager metrics.CSIMetricsManager
145153 enableOtelTracing bool
146154}
147155
148156// connect is the internal implementation of Connect. It has more options to enable testing.
149157func connect (
158+ ctx context.Context ,
150159 address string ,
151160 connectOptions []Option ) (* grpc.ClientConn , error ) {
161+ logger := klog .FromContext (ctx )
152162 var o options
153163 for _ , option := range connectOptions {
154164 option (& o )
155165 }
156166
167+ bc := backoff .DefaultConfig
168+ bc .MaxDelay = time .Second
157169 dialOptions := []grpc.DialOption {
158- grpc .WithInsecure (), // Don't use TLS, it's usually local Unix domain socket in a container.
159- grpc .WithBackoffMaxDelay ( time . Second ), // Retry every second after failure.
170+ grpc .WithTransportCredentials ( insecure . NewCredentials ()), // Don't use TLS, it's usually local Unix domain socket in a container.
171+ grpc .WithConnectParams (grpc. ConnectParams { Backoff : bc } ), // Retry every second after failure.
160172 grpc .WithBlock (), // Block until connection succeeds.
161173 grpc .WithIdleTimeout (time .Duration (0 )), // Never close connection because of inactivity.
162174 }
163175
164176 if o .timeout > 0 {
165- dialOptions = append (dialOptions , grpc .WithTimeout (o .timeout ))
177+ var cancel context.CancelFunc
178+ ctx , cancel = context .WithTimeout (ctx , o .timeout )
179+ defer cancel ()
166180 }
167181
168182 interceptors := []grpc.UnaryClientInterceptor {LogGRPC }
@@ -186,20 +200,25 @@ func connect(
186200 lostConnection := false
187201 reconnect := true
188202
189- dialOptions = append (dialOptions , grpc .WithDialer (func (addr string , timeout time.Duration ) (net.Conn , error ) {
203+ dialOptions = append (dialOptions , grpc .WithContextDialer (func (ctx context.Context , addr string ) (net.Conn , error ) {
204+ logger := klog .FromContext (ctx )
190205 if haveConnected && ! lostConnection {
191206 // We have detected a loss of connection for the first time. Decide what to do...
192207 // Record this once. TODO (?): log at regular time intervals.
193- klog . Errorf ( "Lost connection to %s. " , address )
208+ logger . Error ( nil , "Lost connection" , "address " , address )
194209 // Inform caller and let it decide? Default is to reconnect.
195210 if o .reconnect != nil {
196- reconnect = o .reconnect ()
211+ reconnect = o .reconnect (ctx )
197212 }
198213 lostConnection = true
199214 }
200215 if ! reconnect {
201216 return nil , errors .New ("connection lost, reconnecting disabled" )
202217 }
218+ var timeout time.Duration
219+ if deadline , ok := ctx .Deadline (); ok {
220+ timeout = time .Until (deadline )
221+ }
203222 conn , err := net .DialTimeout ("unix" , address [len (unixPrefix ):], timeout )
204223 if err == nil {
205224 // Connection reestablished.
@@ -212,14 +231,14 @@ func connect(
212231 return nil , errors .New ("OnConnectionLoss callback only supported for unix:// addresses" )
213232 }
214233
215- klog .V (5 ).Infof ("Connecting to %s " , address )
234+ logger .V (5 ).Info ("Connecting" , "address " , address )
216235
217236 // Connect in background.
218237 var conn * grpc.ClientConn
219238 var err error
220239 ready := make (chan bool )
221240 go func () {
222- conn , err = grpc .Dial ( address , dialOptions ... )
241+ conn , err = grpc .DialContext ( ctx , address , dialOptions ... )
223242 close (ready )
224243 }()
225244
@@ -231,7 +250,7 @@ func connect(
231250 for {
232251 select {
233252 case <- ticker .C :
234- klog . Warningf ("Still connecting to %s " , address )
253+ logger . Info ("Still connecting" , "address " , address )
235254
236255 case <- ready :
237256 return conn , err
@@ -241,15 +260,14 @@ func connect(
241260
242261// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
243262func LogGRPC (ctx context.Context , method string , req , reply interface {}, cc * grpc.ClientConn , invoker grpc.UnaryInvoker , opts ... grpc.CallOption ) error {
244- klog . V ( 5 ). Infof ( "GRPC call: %s" , method )
245- klog .V (5 ).Infof ("GRPC request: %s " , protosanitizer .StripSecrets (req ))
263+ logger := klog . FromContext ( ctx )
264+ logger .V (5 ).Info ("GRPC call" , "method" , method , "request " , protosanitizer .StripSecrets (req ))
246265 err := invoker (ctx , method , req , reply , cc , opts ... )
247266 cappedStr := protosanitizer .StripSecrets (reply ).String ()
248267 if maxLogChar > 0 && len (cappedStr ) > maxLogChar {
249268 cappedStr = cappedStr [:maxLogChar ] + fmt .Sprintf (" [response body too large, log capped to %d chars]" , maxLogChar )
250269 }
251- klog .V (5 ).Infof ("GRPC response: %s" , cappedStr )
252- klog .V (5 ).Infof ("GRPC error: %v" , err )
270+ logger .V (5 ).Info ("GRPC response" , "response" , cappedStr , "err" , err )
253271 return err
254272}
255273
@@ -286,14 +304,14 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
286304 if additionalInfo != nil {
287305 additionalInfoVal , ok := additionalInfo .(AdditionalInfo )
288306 if ! ok {
289- klog .Errorf ( "Failed to record migrated status, cannot convert additional info %v " , additionalInfo )
307+ klog .FromContext ( ctx ). Error ( nil , "Failed to record migrated status, cannot convert additional info" , "additionalInfo " , additionalInfo )
290308 return err
291309 }
292310 migrated = additionalInfoVal .Migrated
293311 }
294312 cmmv , metricsErr := cmm .WithLabelValues (map [string ]string {metrics .LabelMigrated : migrated })
295313 if metricsErr != nil {
296- klog .Errorf ( "Failed to record migrated status, error: %v" , metricsErr )
314+ klog .FromContext ( ctx ). Error ( metricsErr , "Failed to record migrated status" )
297315 } else {
298316 cmmBase = cmmv
299317 }
0 commit comments