Skip to content

Commit 8477c48

Browse files
authored
Merge pull request kubernetes#74652 from cofyc/fix72500
Delay CSI client initialization
2 parents 6ec5a7d + fa926ed commit 8477c48

File tree

5 files changed

+77
-30
lines changed

5 files changed

+77
-30
lines changed

pkg/volume/csi/csi_block.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ import (
3737
)
3838

3939
type csiBlockMapper struct {
40+
csiClientGetter
4041
k8s kubernetes.Interface
41-
csiClient csiClient
4242
plugin *csiPlugin
4343
driverName csiDriverName
4444
specName string
@@ -247,14 +247,20 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
247247
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
248248
defer cancel()
249249

250+
csiClient, err := m.csiClientGetter.Get()
251+
if err != nil {
252+
klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
253+
return "", err
254+
}
255+
250256
// Call NodeStageVolume
251-
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment)
257+
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
252258
if err != nil {
253259
return "", err
254260
}
255261

256262
// Call NodePublishVolume
257-
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath)
263+
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
258264
if err != nil {
259265
return "", err
260266
}
@@ -326,6 +332,12 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
326332
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
327333
defer cancel()
328334

335+
csiClient, err := m.csiClientGetter.Get()
336+
if err != nil {
337+
klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
338+
return err
339+
}
340+
329341
// Call NodeUnpublishVolume
330342
publishPath := m.getPublishPath()
331343
if _, err := os.Stat(publishPath); err != nil {
@@ -335,7 +347,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
335347
return err
336348
}
337349
} else {
338-
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath)
350+
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
339351
if err != nil {
340352
return err
341353
}
@@ -350,7 +362,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
350362
return err
351363
}
352364
} else {
353-
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath)
365+
err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
354366
if err != nil {
355367
return err
356368
}

pkg/volume/csi/csi_client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io"
2424
"net"
2525
"strings"
26+
"sync"
2627
"time"
2728

2829
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
@@ -807,3 +808,36 @@ func versionRequiresV0Client(version *utilversion.Version) bool {
807808

808809
return false
809810
}
811+
812+
// CSI client getter with cache.
813+
// This provides a method to initialize CSI client with driver name and caches
814+
// it for later use. When CSI clients have not been discovered yet (e.g.
815+
// on kubelet restart), client initialization will fail. Users of CSI client (e.g.
816+
// mounter manager and block mapper) can use this to delay CSI client
817+
// initialization until needed.
818+
type csiClientGetter struct {
819+
sync.RWMutex
820+
csiClient csiClient
821+
driverName csiDriverName
822+
}
823+
824+
func (c *csiClientGetter) Get() (csiClient, error) {
825+
c.RLock()
826+
if c.csiClient != nil {
827+
c.RUnlock()
828+
return c.csiClient, nil
829+
}
830+
c.RUnlock()
831+
c.Lock()
832+
defer c.Unlock()
833+
// Double-checking locking criterion.
834+
if c.csiClient != nil {
835+
return c.csiClient, nil
836+
}
837+
csi, err := newCsiDriverClient(c.driverName)
838+
if err != nil {
839+
return nil, err
840+
}
841+
c.csiClient = csi
842+
return c.csiClient, nil
843+
}

pkg/volume/csi/csi_mounter.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var (
5656
)
5757

5858
type csiMountMgr struct {
59-
csiClient csiClient
59+
csiClientGetter
6060
k8s kubernetes.Interface
6161
plugin *csiPlugin
6262
driverName csiDriverName
@@ -111,7 +111,11 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
111111
return nil
112112
}
113113

114-
csi := c.csiClient
114+
csi, err := c.csiClientGetter.Get()
115+
if err != nil {
116+
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
117+
return err
118+
}
115119
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
116120
defer cancel()
117121

@@ -343,7 +347,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
343347
klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
344348

345349
volID := c.volumeID
346-
csi := c.csiClient
350+
csi, err := c.csiClientGetter.Get()
351+
if err != nil {
352+
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
353+
return err
354+
}
347355

348356
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
349357
defer cancel()

pkg/volume/csi/csi_plugin.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,6 @@ func (p *csiPlugin) NewMounter(
383383
return nil, errors.New("failed to get a Kubernetes client")
384384
}
385385

386-
csi, err := newCsiDriverClient(csiDriverName(driverName))
387-
if err != nil {
388-
return nil, err
389-
}
390-
391386
mounter := &csiMountMgr{
392387
plugin: p,
393388
k8s: k8s,
@@ -398,9 +393,9 @@ func (p *csiPlugin) NewMounter(
398393
driverMode: driverMode,
399394
volumeID: volumeHandle,
400395
specVolumeID: spec.Name(),
401-
csiClient: csi,
402396
readOnly: readOnly,
403397
}
398+
mounter.csiClientGetter.driverName = csiDriverName(driverName)
404399

405400
// Save volume info in pod dir
406401
dir := mounter.GetPath()
@@ -458,10 +453,7 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
458453
}
459454
unmounter.driverName = csiDriverName(data[volDataKey.driverName])
460455
unmounter.volumeID = data[volDataKey.volHandle]
461-
unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName)
462-
if err != nil {
463-
return nil, err
464-
}
456+
unmounter.csiClientGetter.driverName = unmounter.driverName
465457

466458
return unmounter, nil
467459
}
@@ -638,10 +630,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
638630
}
639631

640632
klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
641-
client, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
642-
if err != nil {
643-
return nil, err
644-
}
645633

646634
k8s := p.host.GetKubeClient()
647635
if k8s == nil {
@@ -650,7 +638,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
650638
}
651639

652640
mapper := &csiBlockMapper{
653-
csiClient: client,
654641
k8s: k8s,
655642
plugin: p,
656643
volumeID: pvSource.VolumeHandle,
@@ -660,6 +647,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
660647
specName: spec.Name(),
661648
podUID: podRef.UID,
662649
}
650+
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
663651

664652
// Save volume info in pod dir
665653
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
@@ -714,7 +702,7 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo
714702
}
715703
unmapper.driverName = csiDriverName(data[volDataKey.driverName])
716704
unmapper.volumeID = data[volDataKey.volHandle]
717-
unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName)
705+
unmapper.csiClientGetter.driverName = unmapper.driverName
718706
if err != nil {
719707
return nil, err
720708
}

pkg/volume/csi/csi_plugin_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,8 @@ func TestPluginNewMounter(t *testing.T) {
614614
if string(csiMounter.podUID) != string(test.podUID) {
615615
t.Error("mounter podUID not set")
616616
}
617-
if csiMounter.csiClient == nil {
617+
csiClient, err := csiMounter.csiClientGetter.Get()
618+
if csiClient == nil {
618619
t.Error("mounter csiClient is nil")
619620
}
620621
if csiMounter.driverMode != test.driverMode {
@@ -732,7 +733,8 @@ func TestPluginNewMounterWithInline(t *testing.T) {
732733
if string(csiMounter.podUID) != string(test.podUID) {
733734
t.Error("mounter podUID not set")
734735
}
735-
if csiMounter.csiClient == nil {
736+
csiClient, err := csiMounter.csiClientGetter.Get()
737+
if csiClient == nil {
736738
t.Error("mounter csiClient is nil")
737739
}
738740
if csiMounter.driverMode != test.driverMode {
@@ -815,8 +817,9 @@ func TestPluginNewUnmounter(t *testing.T) {
815817
t.Error("podUID not set")
816818
}
817819

818-
if csiUnmounter.csiClient == nil {
819-
t.Error("unmounter csiClient is nil")
820+
csiClient, err := csiUnmounter.csiClientGetter.Get()
821+
if csiClient == nil {
822+
t.Error("mounter csiClient is nil")
820823
}
821824
}
822825

@@ -932,7 +935,8 @@ func TestPluginNewBlockMapper(t *testing.T) {
932935
if csiMapper.podUID == types.UID("") {
933936
t.Error("CSI block mapper missing pod.UID")
934937
}
935-
if csiMapper.csiClient == nil {
938+
csiClient, err := csiMapper.csiClientGetter.Get()
939+
if csiClient == nil {
936940
t.Error("mapper csiClient is nil")
937941
}
938942

@@ -994,7 +998,8 @@ func TestPluginNewUnmapper(t *testing.T) {
994998
t.Error("specName not set")
995999
}
9961000

997-
if csiUnmapper.csiClient == nil {
1001+
csiClient, err := csiUnmapper.csiClientGetter.Get()
1002+
if csiClient == nil {
9981003
t.Error("unmapper csiClient is nil")
9991004
}
10001005

0 commit comments

Comments
 (0)