Skip to content

Commit 822739f

Browse files
authored
Merge pull request kubernetes#68633 from verult/nil-kubeclient
Delaying kubeclient and csi client injection into CSI plugin
2 parents fe72bd2 + a8e282e commit 822739f

File tree

5 files changed

+83
-45
lines changed

5 files changed

+83
-45
lines changed

pkg/volume/csi/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ go_library(
2727
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
2828
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2929
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
30-
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
3130
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
3231
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
3332
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",

pkg/volume/csi/csi_plugin.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"k8s.io/apimachinery/pkg/util/wait"
3737
utilfeature "k8s.io/apiserver/pkg/util/feature"
3838
clientset "k8s.io/client-go/kubernetes"
39-
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
4039
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
4140
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
4241
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
@@ -161,31 +160,22 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
161160
func (p *csiPlugin) Init(host volume.VolumeHost) error {
162161
p.host = host
163162

164-
kubeClient := host.GetKubeClient()
165-
if kubeClient == nil {
166-
return fmt.Errorf("error getting kube client")
167-
}
168-
169-
var csiClient csiclientset.Interface
170-
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) ||
171-
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
172-
csiClient = host.GetCSIClient()
163+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
164+
csiClient := host.GetCSIClient()
173165
if csiClient == nil {
174-
return fmt.Errorf("error getting CSI client")
166+
glog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
167+
} else {
168+
// Start informer for CSIDrivers.
169+
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
170+
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
171+
p.csiDriverLister = p.csiDriverInformer.Lister()
172+
go factory.Start(wait.NeverStop)
175173
}
176174
}
177175

178-
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
179-
// Start informer for CSIDrivers.
180-
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
181-
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
182-
p.csiDriverLister = p.csiDriverInformer.Lister()
183-
go factory.Start(wait.NeverStop)
184-
}
185-
186176
// Initializing csiDrivers map and label management channels
187177
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
188-
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient)
178+
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
189179

190180
return nil
191181
}

pkg/volume/csi/nodeinfomanager/BUILD

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//pkg/features:go_default_library",
10+
"//pkg/volume:go_default_library",
1011
"//pkg/volume/util:go_default_library",
1112
"//staging/src/k8s.io/api/core/v1:go_default_library",
1213
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@@ -15,10 +16,8 @@ go_library(
1516
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
1617
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
1718
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
18-
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
1919
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
2020
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
21-
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
2221
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
2322
"//vendor/github.com/golang/glog:go_default_library",
2423
],
@@ -45,6 +44,7 @@ go_test(
4544
deps = [
4645
"//pkg/apis/core/helper:go_default_library",
4746
"//pkg/features:go_default_library",
47+
"//pkg/volume/testing:go_default_library",
4848
"//staging/src/k8s.io/api/core/v1:go_default_library",
4949
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
5050
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@@ -53,6 +53,7 @@ go_test(
5353
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
5454
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
5555
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
56+
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
5657
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
5758
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",
5859
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",

pkg/volume/csi/nodeinfomanager/nodeinfomanager.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@ import (
3131
"k8s.io/apimachinery/pkg/types"
3232
"k8s.io/apimachinery/pkg/util/sets"
3333
utilfeature "k8s.io/apiserver/pkg/util/feature"
34-
"k8s.io/client-go/kubernetes"
3534
"k8s.io/client-go/util/retry"
3635
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
37-
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
3836
"k8s.io/kubernetes/pkg/features"
37+
"k8s.io/kubernetes/pkg/volume"
3938
"k8s.io/kubernetes/pkg/volume/util"
4039
)
4140

@@ -49,9 +48,8 @@ var nodeKind = v1.SchemeGroupVersion.WithKind("Node")
4948
// nodeInfoManager contains necessary common dependencies to update node info on both
5049
// the Node and CSINodeInfo objects.
5150
type nodeInfoManager struct {
52-
nodeName types.NodeName
53-
k8s kubernetes.Interface
54-
csiKubeClient csiclientset.Interface
51+
nodeName types.NodeName
52+
volumeHost volume.VolumeHost
5553
}
5654

5755
// If no updates is needed, the function must return the same Node object as the input.
@@ -73,12 +71,10 @@ type Interface interface {
7371
// NewNodeInfoManager initializes nodeInfoManager
7472
func NewNodeInfoManager(
7573
nodeName types.NodeName,
76-
kubeClient kubernetes.Interface,
77-
csiKubeClient csiclientset.Interface) Interface {
74+
volumeHost volume.VolumeHost) Interface {
7875
return &nodeInfoManager{
79-
nodeName: nodeName,
80-
k8s: kubeClient,
81-
csiKubeClient: csiKubeClient,
76+
nodeName: nodeName,
77+
volumeHost: volumeHost,
8278
}
8379
}
8480

@@ -143,7 +139,12 @@ func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
143139
// existing changes are not overwritten. RetryOnConflict uses
144140
// exponential backoff to avoid exhausting the apiserver.
145141

146-
nodeClient := nim.k8s.CoreV1().Nodes()
142+
kubeClient := nim.volumeHost.GetKubeClient()
143+
if kubeClient == nil {
144+
return fmt.Errorf("error getting kube client")
145+
}
146+
147+
nodeClient := kubeClient.CoreV1().Nodes()
147148
node, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
148149
if err != nil {
149150
return err // do not wrap error
@@ -315,12 +316,13 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
315316
driverNodeID string,
316317
topology *csipb.Topology) error {
317318

318-
if nim.csiKubeClient == nil {
319-
return fmt.Errorf("CSI client cannot be nil")
319+
csiKubeClient := nim.volumeHost.GetCSIClient()
320+
if csiKubeClient == nil {
321+
return fmt.Errorf("error getting CSI client")
320322
}
321323

322324
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
323-
nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
325+
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
324326
if nodeInfo == nil || errors.IsNotFound(err) {
325327
return nim.createNodeInfoObject(driverName, driverNodeID, topology)
326328
}
@@ -341,14 +343,24 @@ func (nim *nodeInfoManager) createNodeInfoObject(
341343
driverNodeID string,
342344
topology *csipb.Topology) error {
343345

346+
kubeClient := nim.volumeHost.GetKubeClient()
347+
if kubeClient == nil {
348+
return fmt.Errorf("error getting kube client")
349+
}
350+
351+
csiKubeClient := nim.volumeHost.GetCSIClient()
352+
if csiKubeClient == nil {
353+
return fmt.Errorf("error getting CSI client")
354+
}
355+
344356
var topologyKeys []string
345357
if topology != nil {
346358
for k := range topology.Segments {
347359
topologyKeys = append(topologyKeys, k)
348360
}
349361
}
350362

351-
node, err := nim.k8s.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
363+
node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
352364
if err != nil {
353365
return err // do not wrap error
354366
}
@@ -374,7 +386,7 @@ func (nim *nodeInfoManager) createNodeInfoObject(
374386
},
375387
}
376388

377-
_, err = nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
389+
_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
378390
return err // do not wrap error
379391
}
380392

@@ -384,6 +396,11 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
384396
driverNodeID string,
385397
topology *csipb.Topology) error {
386398

399+
csiKubeClient := nim.volumeHost.GetCSIClient()
400+
if csiKubeClient == nil {
401+
return fmt.Errorf("error getting CSI client")
402+
}
403+
387404
topologyKeys := make(sets.String)
388405
if topology != nil {
389406
for k := range topology.Segments {
@@ -416,14 +433,19 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
416433
newDriverInfos = append(newDriverInfos, driverInfo)
417434
nodeInfo.CSIDrivers = newDriverInfos
418435

419-
_, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
436+
_, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
420437
return err // do not wrap error
421438
}
422439

423440
func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {
424441
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
425442

426-
nodeInfoClient := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos()
443+
csiKubeClient := nim.volumeHost.GetCSIClient()
444+
if csiKubeClient == nil {
445+
return fmt.Errorf("error getting CSI client")
446+
}
447+
448+
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
427449
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
428450
if nodeInfo == nil || errors.IsNotFound(err) {
429451
// do nothing

pkg/volume/csi/nodeinfomanager/nodeinfomanager_test.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
utilfeature "k8s.io/apiserver/pkg/util/feature"
2828
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
2929
"k8s.io/client-go/kubernetes/fake"
30+
utiltesting "k8s.io/client-go/util/testing"
3031
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
3132
csifake "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
3233
"k8s.io/kubernetes/pkg/apis/core/helper"
3334
"k8s.io/kubernetes/pkg/features"
35+
volumetest "k8s.io/kubernetes/pkg/volume/testing"
3436
"testing"
3537
)
3638

@@ -530,10 +532,23 @@ func TestAddNodeInfoExistingAnnotation(t *testing.T) {
530532
nodeName := tc.existingNode.Name
531533
client := fake.NewSimpleClientset(tc.existingNode)
532534
csiClient := csifake.NewSimpleClientset()
533-
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
535+
536+
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
537+
if err != nil {
538+
t.Fatalf("can't create temp dir: %v", err)
539+
}
540+
host := volumetest.NewFakeVolumeHostWithCSINodeName(
541+
tmpDir,
542+
client,
543+
csiClient,
544+
nil,
545+
nodeName,
546+
)
547+
548+
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
534549

535550
// Act
536-
err := nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
551+
err = nim.AddNodeInfo(driverName, nodeID, 0 /* maxVolumeLimit */, nil) // TODO test maxVolumeLimit
537552
if err != nil {
538553
t.Errorf("expected no error from AddNodeInfo call but got: %v", err)
539554
continue
@@ -573,10 +588,21 @@ func test(t *testing.T, addNodeInfo bool, csiNodeInfoEnabled bool, testcases []t
573588
} else {
574589
csiClient = csifake.NewSimpleClientset(tc.existingNodeInfo)
575590
}
576-
nim := NewNodeInfoManager(types.NodeName(nodeName), client, csiClient)
591+
592+
tmpDir, err := utiltesting.MkTmpdir("nodeinfomanager-test")
593+
if err != nil {
594+
t.Fatalf("can't create temp dir: %v", err)
595+
}
596+
host := volumetest.NewFakeVolumeHostWithCSINodeName(
597+
tmpDir,
598+
client,
599+
csiClient,
600+
nil,
601+
nodeName,
602+
)
603+
nim := NewNodeInfoManager(types.NodeName(nodeName), host)
577604

578605
//// Act
579-
var err error
580606
if addNodeInfo {
581607
err = nim.AddNodeInfo(tc.driverName, tc.inputNodeID, 0 /* maxVolumeLimit */, tc.inputTopology) // TODO test maxVolumeLimit
582608
} else {

0 commit comments

Comments
 (0)