@@ -24,6 +24,9 @@ import (
2424 "strings"
2525 "time"
2626
27+ "google.golang.org/grpc/codes"
28+ "google.golang.org/grpc/status"
29+
2730 "github.com/container-storage-interface/spec/lib/go/csi"
2831 "github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
2932 "google.golang.org/grpc"
@@ -33,6 +36,9 @@ import (
3336const (
3437 // Interval of logging connection errors
3538 connectionLoggingInterval = 10 * time .Second
39+
40+ // Interval of trying to call Probe() until it succeeds
41+ probeInterval = 1 * time .Second
3642)
3743
3844// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
@@ -163,6 +169,7 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
163169 return err
164170}
165171
172+ // GetDriverName returns name of CSI driver.
166173func GetDriverName (ctx context.Context , conn * grpc.ClientConn ) (string , error ) {
167174 client := csi .NewIdentityClient (conn )
168175
@@ -177,3 +184,111 @@ func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
177184 }
178185 return name , nil
179186}
187+
188+ // PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
189+ type PluginCapabilitySet map [csi.PluginCapability_Service_Type ]bool
190+
191+ // GetPluginCapabilities returns set of supported capabilities of CSI driver.
192+ func GetPluginCapabilities (ctx context.Context , conn * grpc.ClientConn ) (PluginCapabilitySet , error ) {
193+ client := csi .NewIdentityClient (conn )
194+ req := csi.GetPluginCapabilitiesRequest {}
195+ rsp , err := client .GetPluginCapabilities (ctx , & req )
196+ if err != nil {
197+ return nil , err
198+ }
199+ caps := PluginCapabilitySet {}
200+ for _ , cap := range rsp .GetCapabilities () {
201+ if cap == nil {
202+ continue
203+ }
204+ srv := cap .GetService ()
205+ if srv == nil {
206+ continue
207+ }
208+ t := srv .GetType ()
209+ caps [t ] = true
210+ }
211+ return caps , nil
212+ }
213+
214+ // ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
215+ type ControllerCapabilitySet map [csi.ControllerServiceCapability_RPC_Type ]bool
216+
217+ // GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
218+ func GetControllerCapabilities (ctx context.Context , conn * grpc.ClientConn ) (ControllerCapabilitySet , error ) {
219+ client := csi .NewControllerClient (conn )
220+ req := csi.ControllerGetCapabilitiesRequest {}
221+ rsp , err := client .ControllerGetCapabilities (ctx , & req )
222+ if err != nil {
223+ return nil , err
224+ }
225+
226+ caps := ControllerCapabilitySet {}
227+ for _ , cap := range rsp .GetCapabilities () {
228+ if cap == nil {
229+ continue
230+ }
231+ rpc := cap .GetRpc ()
232+ if rpc == nil {
233+ continue
234+ }
235+ t := rpc .GetType ()
236+ caps [t ] = true
237+ }
238+ return caps , nil
239+ }
240+
241+ // ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready.
242+ // Any error other than timeout is returned.
243+ func ProbeForever (conn * grpc.ClientConn , singleProbeTimeout time.Duration ) error {
244+ for {
245+ klog .Info ("Probing CSI driver for readiness" )
246+ ready , err := probeOnce (conn , singleProbeTimeout )
247+ if err != nil {
248+ st , ok := status .FromError (err )
249+ if ! ok {
250+ // This is not gRPC error. The probe must have failed before gRPC
251+ // method was called, otherwise we would get gRPC error.
252+ return fmt .Errorf ("CSI driver probe failed: %s" , err )
253+ }
254+ if st .Code () != codes .DeadlineExceeded {
255+ return fmt .Errorf ("CSI driver probe failed: %s" , err )
256+ }
257+ // Timeout -> driver is not ready. Fall through to sleep() below.
258+ klog .Warning ("CSI driver probe timed out" )
259+ } else {
260+ if ready {
261+ return nil
262+ }
263+ klog .Warning ("CSI driver is not ready" )
264+ }
265+ // Timeout was returned or driver is not ready.
266+ time .Sleep (probeInterval )
267+ }
268+ }
269+
270+ // probeOnce is a helper to simplify defer cancel()
271+ func probeOnce (conn * grpc.ClientConn , timeout time.Duration ) (bool , error ) {
272+ ctx , cancel := context .WithTimeout (context .Background (), timeout )
273+ defer cancel ()
274+ return Probe (ctx , conn )
275+ }
276+
277+ // Probe calls driver Probe() just once and returns its result without any processing.
278+ func Probe (ctx context.Context , conn * grpc.ClientConn ) (ready bool , err error ) {
279+ client := csi .NewIdentityClient (conn )
280+
281+ req := csi.ProbeRequest {}
282+ rsp , err := client .Probe (ctx , & req )
283+
284+ if err != nil {
285+ return false , err
286+ }
287+
288+ r := rsp .GetReady ()
289+ if r == nil {
290+ // "If not present, the caller SHALL assume that the plugin is in a ready state"
291+ return true , nil
292+ }
293+ return r .GetValue (), nil
294+ }
0 commit comments