@@ -18,6 +18,8 @@ package connection
1818
1919import (
2020 "context"
21+ "errors"
22+ "net"
2123 "strings"
2224 "time"
2325
@@ -34,19 +36,94 @@ const (
3436// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
3537// file or have format '<protocol>://', following gRPC name resolution mechanism at
3638// https://github.com/grpc/grpc/blob/master/doc/naming.md.
39+ //
3740// The function tries to connect indefinitely every second until it connects. The function automatically disables TLS
3841// and adds interceptor for logging of all gRPC messages at level 5.
39- func Connect (address string ) (* grpc.ClientConn , error ) {
40- dialOptions := []grpc.DialOption {
42+ //
43+ // For a connection to a Unix Domain socket, the behavior after
44+ // loosing the connection is configurable. The default is to
45+ // log the connection loss and reestablish a connection. Applications
46+ // which need to know about a connection loss can be notified by
47+ // passing a callback with OnConnectionLoss and in that callback
48+ // can decide what to do:
49+ // - exit the application with os.Exit
50+ // - invalidate cached information
51+ // - disable the reconnect, which will cause all gRPC method calls to fail with status.Unavailable
52+ //
53+ // For other connections, the default behavior from gRPC is used and
54+ // loss of connection is not detected reliably.
55+ func Connect (address string , options ... Option ) (* grpc.ClientConn , error ) {
56+ return connect (address , []grpc.DialOption {}, options )
57+ }
58+
59+ // Option is the type of all optional parameters for Connect.
60+ type Option func (o * options )
61+
62+ // OnConnectionLoss registers a callback that will be invoked when the
63+ // connection got lost. If that callback returns true, the connection
64+ // is restablished. Otherwise the connection is left as it is and
65+ // all future gRPC calls using it will fail with status.Unavailable.
66+ func OnConnectionLoss (reconnect func () bool ) Option {
67+ return func (o * options ) {
68+ o .reconnect = reconnect
69+ }
70+ }
71+
72+ type options struct {
73+ reconnect func () bool
74+ }
75+
76+ // connect is the internal implementation of Connect. It has more options to enable testing.
77+ func connect (address string , dialOptions []grpc.DialOption , connectOptions []Option ) (* grpc.ClientConn , error ) {
78+ var o options
79+ for _ , option := range connectOptions {
80+ option (& o )
81+ }
82+
83+ dialOptions = append (dialOptions ,
4184 grpc .WithInsecure (), // Don't use TLS, it's usually local Unix domain socket in a container.
4285 grpc .WithBackoffMaxDelay (time .Second ), // Retry every second after failure.
4386 grpc .WithBlock (), // Block until connection succeeds.
4487 grpc .WithUnaryInterceptor (LogGRPC ), // Log all messages.
45- }
88+ )
89+ unixPrefix := "unix://"
4690 if strings .HasPrefix (address , "/" ) {
4791 // It looks like filesystem path.
48- address = "unix://" + address
92+ address = unixPrefix + address
93+ }
94+
95+ if strings .HasPrefix (address , unixPrefix ) {
96+ // state variables for the custom dialer
97+ haveConnected := false
98+ lostConnection := false
99+ reconnect := true
100+
101+ dialOptions = append (dialOptions , grpc .WithDialer (func (addr string , timeout time.Duration ) (net.Conn , error ) {
102+ if haveConnected && ! lostConnection {
103+ // We have detected a loss of connection for the first time. Decide what to do...
104+ // Record this once. TODO (?): log at regular time intervals.
105+ glog .Errorf ("Lost connection to %s." , address )
106+ // Inform caller and let it decide? Default is to reconnect.
107+ if o .reconnect != nil {
108+ reconnect = o .reconnect ()
109+ }
110+ lostConnection = true
111+ }
112+ if ! reconnect {
113+ return nil , errors .New ("connection lost, reconnecting disabled" )
114+ }
115+ conn , err := net .DialTimeout ("unix" , address [len (unixPrefix ):], timeout )
116+ if err == nil {
117+ // Connection restablished.
118+ haveConnected = true
119+ lostConnection = false
120+ }
121+ return conn , err
122+ }))
123+ } else if o .reconnect != nil {
124+ return nil , errors .New ("OnConnectionLoss callback only supported for unix:// addresses" )
49125 }
126+
50127 glog .Infof ("Connecting to %s" , address )
51128
52129 // Connect in background.
0 commit comments