Skip to content

Commit 36877da

Browse files
authored
Merge pull request kubernetes#68426 from verult/csi-informer-spam
Consolidated CSIDriver logic under CSIDriverRegistry flag
2 parents c0953ed + 4ca39ef commit 36877da

File tree

13 files changed

+61
-57
lines changed

13 files changed

+61
-57
lines changed

pkg/features/kube_features.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -391,16 +391,6 @@ const (
391391
//
392392
// Allow TTL controller to clean up Pods and Jobs after they finish.
393393
TTLAfterFinished utilfeature.Feature = "TTLAfterFinished"
394-
395-
// owner: @jsafrane
396-
// Kubernetes skips attaching CSI volumes that don't require attachment.
397-
//
398-
CSISkipAttach utilfeature.Feature = "CSISkipAttach"
399-
400-
// owner: @jsafrane
401-
//
402-
// Kubelet sends pod information in NodePublish CSI call when a CSI driver wants so.
403-
CSIPodInfo utilfeature.Feature = "CSIPodInfo"
404394
)
405395

406396
func init() {
@@ -467,8 +457,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
467457
VolumeSnapshotDataSource: {Default: false, PreRelease: utilfeature.Alpha},
468458
ProcMountType: {Default: false, PreRelease: utilfeature.Alpha},
469459
TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha},
470-
CSISkipAttach: {Default: false, PreRelease: utilfeature.Alpha},
471-
CSIPodInfo: {Default: false, PreRelease: utilfeature.Alpha},
472460

473461
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
474462
// unintentionally on either side:

pkg/volume/csi/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_library(
2727
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
2828
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2929
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
30+
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
3031
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
3132
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1:go_default_library",
3233
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
@@ -59,6 +60,7 @@ go_test(
5960
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
6061
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
6162
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
63+
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
6264
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
6365
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
6466
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",

pkg/volume/csi/csi_attacher_test.go

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ import (
3030
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/types"
33+
"k8s.io/apimachinery/pkg/util/wait"
3334
"k8s.io/apimachinery/pkg/watch"
3435
utilfeature "k8s.io/apiserver/pkg/util/feature"
36+
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
3537
clientset "k8s.io/client-go/kubernetes"
3638
fakeclient "k8s.io/client-go/kubernetes/fake"
3739
core "k8s.io/client-go/testing"
3840
utiltesting "k8s.io/client-go/util/testing"
3941
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
42+
"k8s.io/kubernetes/pkg/features"
4043
"k8s.io/kubernetes/pkg/volume"
4144
volumetest "k8s.io/kubernetes/pkg/volume/testing"
4245
)
@@ -202,14 +205,7 @@ func TestAttacherAttach(t *testing.T) {
202205
}
203206

204207
func TestAttacherWithCSIDriver(t *testing.T) {
205-
originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy()
206-
defer func() {
207-
utilfeature.DefaultFeatureGate = originalFeatures
208-
}()
209-
err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true")
210-
if err != nil {
211-
t.Fatalf("Failed to set CSISkipAttach=true: %s", err)
212-
}
208+
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
213209

214210
tests := []struct {
215211
name string
@@ -277,14 +273,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
277273
}
278274

279275
func TestAttacherWaitForVolumeAttachmentWithCSIDriver(t *testing.T) {
280-
originalFeatures := utilfeature.DefaultFeatureGate.DeepCopy()
281-
defer func() {
282-
utilfeature.DefaultFeatureGate = originalFeatures
283-
}()
284-
err := utilfeature.DefaultFeatureGate.Set("CSISkipAttach=true")
285-
if err != nil {
286-
t.Fatalf("Failed to set CSISkipAttach=true: %s", err)
287-
}
276+
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, true)()
288277

289278
// In order to detect if the volume plugin would skip WaitForAttach for non-attachable drivers,
290279
// we do not instantiate any VolumeAttachment. So if the plugin does not skip attach, WaitForVolumeAttachment
@@ -940,11 +929,11 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin,
940929
t.Fatalf("cannot assert plugin to be type csiPlugin")
941930
}
942931

943-
for {
932+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
944933
// Wait until the informer in CSI volume plugin has all CSIDrivers.
945-
if csiPlug.csiDriverInformer.Informer().HasSynced() {
946-
break
947-
}
934+
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
935+
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
936+
})
948937
}
949938

950939
return csiPlug, fakeWatcher, tmpDir, fakeClient

pkg/volume/csi/csi_mounter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
238238
}
239239

240240
func (c *csiMountMgr) podAttributes() (map[string]string, error) {
241-
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIPodInfo) {
241+
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
242242
return nil, nil
243243
}
244244
if c.plugin.csiDriverLister == nil {

pkg/volume/csi/csi_mounter_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
storage "k8s.io/api/storage/v1beta1"
3333
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
3434
"k8s.io/apimachinery/pkg/types"
35+
"k8s.io/apimachinery/pkg/util/wait"
3536
utilfeature "k8s.io/apiserver/pkg/util/feature"
3637
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
3738
fakeclient "k8s.io/client-go/kubernetes/fake"
@@ -95,7 +96,7 @@ func TestMounterGetPath(t *testing.T) {
9596
}
9697

9798
func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
98-
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIPodInfo, podInfoEnabled)()
99+
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIDriverRegistry, podInfoEnabled)()
99100
tests := []struct {
100101
name string
101102
driver string
@@ -154,12 +155,13 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
154155
plug, tmpDir := newTestPlugin(t, fakeClient, fakeCSIClient)
155156
defer os.RemoveAll(tmpDir)
156157

157-
for {
158+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
158159
// Wait until the informer in CSI volume plugin has all CSIDrivers.
159-
if plug.csiDriverInformer.Informer().HasSynced() {
160-
break
161-
}
160+
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
161+
return plug.csiDriverInformer.Informer().HasSynced(), nil
162+
})
162163
}
164+
163165
pv := makeTestPV("test-pv", 10, test.driver, testVol)
164166
pv.Spec.CSI.VolumeAttributes = test.attributes
165167
pvName := pv.GetName()

pkg/volume/csi/csi_plugin.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/apimachinery/pkg/util/wait"
3737
utilfeature "k8s.io/apiserver/pkg/util/feature"
3838
clientset "k8s.io/client-go/kubernetes"
39+
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
3940
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
4041
csiinformer "k8s.io/csi-api/pkg/client/informers/externalversions/csi/v1alpha1"
4142
csilister "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
@@ -160,19 +161,32 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
160161
func (p *csiPlugin) Init(host volume.VolumeHost) error {
161162
p.host = host
162163

163-
// Initializing csiDrivers map and label management channels
164-
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
165-
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host.GetKubeClient(), host.GetCSIClient())
164+
kubeClient := host.GetKubeClient()
165+
if kubeClient == nil {
166+
return fmt.Errorf("error getting kube client")
167+
}
166168

167-
csiClient := host.GetCSIClient()
168-
if csiClient != nil {
169+
var csiClient csiclientset.Interface
170+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) ||
171+
utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
172+
csiClient = host.GetCSIClient()
173+
if csiClient == nil {
174+
return fmt.Errorf("error getting CSI client")
175+
}
176+
}
177+
178+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
169179
// Start informer for CSIDrivers.
170180
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
171181
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
172182
p.csiDriverLister = p.csiDriverInformer.Lister()
173183
go factory.Start(wait.NeverStop)
174184
}
175185

186+
// Initializing csiDrivers map and label management channels
187+
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
188+
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), kubeClient, csiClient)
189+
176190
return nil
177191
}
178192

@@ -514,7 +528,7 @@ func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapP
514528
}
515529

516530
func (p *csiPlugin) skipAttach(driver string) (bool, error) {
517-
if !utilfeature.DefaultFeatureGate.Enabled(features.CSISkipAttach) {
531+
if !utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
518532
return false, nil
519533
}
520534
if p.csiDriverLister == nil {

pkg/volume/csi/csi_plugin_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"k8s.io/apimachinery/pkg/api/resource"
2828
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/types"
30+
"k8s.io/apimachinery/pkg/util/wait"
3031
utilfeature "k8s.io/apiserver/pkg/util/feature"
3132
fakeclient "k8s.io/client-go/kubernetes/fake"
3233
utiltesting "k8s.io/client-go/util/testing"
3334
fakecsi "k8s.io/csi-api/pkg/client/clientset/versioned/fake"
35+
"k8s.io/kubernetes/pkg/features"
3436
"k8s.io/kubernetes/pkg/volume"
3537
volumetest "k8s.io/kubernetes/pkg/volume/testing"
3638
)
@@ -73,11 +75,11 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs
7375
t.Fatalf("cannot assert plugin to be type csiPlugin")
7476
}
7577

76-
for {
78+
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
7779
// Wait until the informer in CSI volume plugin has all CSIDrivers.
78-
if csiPlug.csiDriverInformer.Informer().HasSynced() {
79-
break
80-
}
80+
wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) {
81+
return csiPlug.csiDriverInformer.Informer().HasSynced(), nil
82+
})
8183
}
8284

8385
return csiPlug, tmpDir

pkg/volume/csi/csi_util.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ import (
2828
"k8s.io/client-go/kubernetes"
2929
kstrings "k8s.io/kubernetes/pkg/util/strings"
3030
"k8s.io/kubernetes/pkg/volume"
31+
"time"
32+
)
33+
34+
const (
35+
testInformerSyncPeriod = 100 * time.Millisecond
36+
testInformerSyncTimeout = 30 * time.Second
3137
)
3238

3339
func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {

pkg/volume/csi/nodeinfomanager/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ go_test(
5151
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
5252
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
5353
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
54+
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
5455
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
5556
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
5657
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned/fake:go_default_library",

pkg/volume/csi/nodeinfomanager/nodeinfomanager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
315315
driverNodeID string,
316316
topology *csipb.Topology) error {
317317

318+
if nim.csiKubeClient == nil {
319+
return fmt.Errorf("CSI client cannot be nil")
320+
}
321+
318322
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
319323
nodeInfo, err := nim.csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
320324
if nodeInfo == nil || errors.IsNotFound(err) {

0 commit comments

Comments
 (0)