Skip to content

Commit 98f250f

Browse files
authored
Merge pull request kubernetes#91307 from yuga711/attach
CSI: Modify VolumeAttachment check to use Informer/Cache
2 parents 0535c11 + 2a89577 commit 98f250f

File tree

14 files changed

+129
-43
lines changed

14 files changed

+129
-43
lines changed

cmd/kube-controller-manager/app/core.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ func startAttachDetachController(ctx ControllerContext) (http.Handler, bool, err
332332
ctx.InformerFactory.Core().V1().PersistentVolumes(),
333333
csiNodeInformer,
334334
csiDriverInformer,
335+
ctx.InformerFactory.Storage().V1().VolumeAttachments(),
335336
ctx.Cloud,
336337
plugins,
337338
GetDynamicPluginProber(ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),

pkg/controller/volume/attachdetach/attach_detach_controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func NewAttachDetachController(
111111
pvInformer coreinformers.PersistentVolumeInformer,
112112
csiNodeInformer storageinformersv1.CSINodeInformer,
113113
csiDriverInformer storageinformersv1.CSIDriverInformer,
114+
volumeAttachmentInformer storageinformersv1.VolumeAttachmentInformer,
114115
cloud cloudprovider.Interface,
115116
plugins []volume.VolumePlugin,
116117
prober volume.DynamicPluginProber,
@@ -142,6 +143,9 @@ func NewAttachDetachController(
142143
adc.csiDriverLister = csiDriverInformer.Lister()
143144
adc.csiDriversSynced = csiDriverInformer.Informer().HasSynced
144145

146+
adc.volumeAttachmentLister = volumeAttachmentInformer.Lister()
147+
adc.volumeAttachmentSynced = volumeAttachmentInformer.Informer().HasSynced
148+
145149
if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
146150
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
147151
}
@@ -254,6 +258,12 @@ type attachDetachController struct {
254258
csiDriverLister storagelistersv1.CSIDriverLister
255259
csiDriversSynced kcache.InformerSynced
256260

261+
// volumeAttachmentLister is the shared volumeAttachment lister used to fetch and store
262+
// VolumeAttachment objects from the API server. It is shared with other controllers
263+
// and therefore the VolumeAttachment objects in its store should be treated as immutable.
264+
volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
265+
volumeAttachmentSynced kcache.InformerSynced
266+
257267
// cloud provider used by volume host
258268
cloud cloudprovider.Interface
259269

@@ -319,6 +329,9 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
319329
if adc.csiDriversSynced != nil {
320330
synced = append(synced, adc.csiDriversSynced)
321331
}
332+
if adc.volumeAttachmentSynced != nil {
333+
synced = append(synced, adc.volumeAttachmentSynced)
334+
}
322335

323336
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) {
324337
return
@@ -675,6 +688,10 @@ func (adc *attachDetachController) IsAttachDetachController() bool {
675688
return true
676689
}
677690

691+
func (adc *attachDetachController) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
692+
return adc.volumeAttachmentLister
693+
}
694+
678695
// VolumeHost implementation
679696
// This is an unfortunate requirement of the current factoring of volume plugin
680697
// initializing code. It requires kubelet specific methods used by the mounting

pkg/controller/volume/attachdetach/attach_detach_controller_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
4848
informerFactory.Core().V1().PersistentVolumes(),
4949
informerFactory.Storage().V1().CSINodes(),
5050
informerFactory.Storage().V1().CSIDrivers(),
51+
informerFactory.Storage().V1().VolumeAttachments(),
5152
nil, /* cloud */
5253
nil, /* plugins */
5354
nil, /* prober */
@@ -168,6 +169,7 @@ func attachDetachRecoveryTestCase(t *testing.T, extraPods1 []*v1.Pod, extraPods2
168169
informerFactory.Core().V1().PersistentVolumes(),
169170
informerFactory.Storage().V1().CSINodes(),
170171
informerFactory.Storage().V1().CSIDrivers(),
172+
informerFactory.Storage().V1().VolumeAttachments(),
171173
nil, /* cloud */
172174
plugins,
173175
prober,

pkg/volume/csi/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ go_test(
7979
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
8080
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
8181
"//staging/src/k8s.io/client-go/informers:go_default_library",
82+
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
8283
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
8384
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
85+
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
8486
"//staging/src/k8s.io/client-go/testing:go_default_library",
8587
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
8688
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",

pkg/volume/csi/csi_attacher.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,19 @@ func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.No
197197
}
198198

199199
attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
200+
var attach *storage.VolumeAttachment
201+
if c.plugin.volumeAttachmentLister != nil {
202+
attach, err = c.plugin.volumeAttachmentLister.Get(attachID)
203+
if err == nil {
204+
attached[spec] = attach.Status.Attached
205+
continue
206+
}
207+
klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err))
208+
}
209+
// The cache lookup is not setup or the object is not found in the cache.
210+
// Get the object from the API server.
200211
klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
201-
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
212+
attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
202213
if err != nil {
203214
attached[spec] = false
204215
klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))

pkg/volume/csi/csi_attacher_test.go

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ import (
3838
"k8s.io/apimachinery/pkg/watch"
3939
utilfeature "k8s.io/apiserver/pkg/util/feature"
4040
"k8s.io/client-go/informers"
41+
storageinformer "k8s.io/client-go/informers/storage/v1"
4142
clientset "k8s.io/client-go/kubernetes"
4243
fakeclient "k8s.io/client-go/kubernetes/fake"
44+
storagelister "k8s.io/client-go/listers/storage/v1"
4345
core "k8s.io/client-go/testing"
4446
utiltesting "k8s.io/client-go/util/testing"
4547
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -106,7 +108,9 @@ func markVolumeAttached(t *testing.T, client clientset.Interface, watch *watch.R
106108
if err != nil {
107109
t.Error(err)
108110
}
109-
watch.Modify(attach)
111+
if watch != nil {
112+
watch.Modify(attach)
113+
}
110114
}
111115
}
112116

@@ -197,7 +201,7 @@ func TestAttacherAttach(t *testing.T) {
197201
for _, tc := range testCases {
198202
t.Run(tc.name, func(t *testing.T) {
199203
t.Logf("test case: %s", tc.name)
200-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
204+
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
201205
defer os.RemoveAll(tmpDir)
202206

203207
attacher, err := plug.NewAttacher()
@@ -281,7 +285,7 @@ func TestAttacherAttachWithInline(t *testing.T) {
281285
for _, tc := range testCases {
282286
t.Run(tc.name, func(t *testing.T) {
283287
t.Logf("test case: %s", tc.name)
284-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
288+
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
285289
defer os.RemoveAll(tmpDir)
286290

287291
attacher, err := plug.NewAttacher()
@@ -349,7 +353,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
349353
getTestCSIDriver("attachable", nil, &bTrue, nil),
350354
getTestCSIDriver("nil", nil, nil, nil),
351355
)
352-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, fakeClient)
356+
plug, _, tmpDir, _ := newTestWatchPlugin(t, fakeClient, true)
353357
defer os.RemoveAll(tmpDir)
354358

355359
attacher, err := plug.NewAttacher()
@@ -390,7 +394,7 @@ func TestAttacherWithCSIDriver(t *testing.T) {
390394
status := storage.VolumeAttachmentStatus{
391395
Attached: true,
392396
}
393-
markVolumeAttached(t, csiAttacher.k8s, fakeWatcher, expectedAttachID, status)
397+
markVolumeAttached(t, csiAttacher.k8s, nil, expectedAttachID, status)
394398
}
395399
wg.Wait()
396400
})
@@ -515,7 +519,7 @@ func TestAttacherWaitForAttach(t *testing.T) {
515519

516520
for _, test := range tests {
517521
t.Run(test.name, func(t *testing.T) {
518-
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
522+
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
519523
defer os.RemoveAll(tmpDir)
520524

521525
attacher, err := plug.NewAttacher()
@@ -597,7 +601,7 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
597601

598602
for _, test := range tests {
599603
t.Run(test.name, func(t *testing.T) {
600-
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
604+
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
601605
defer os.RemoveAll(tmpDir)
602606

603607
attacher, err := plug.NewAttacher()
@@ -684,7 +688,7 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) {
684688

685689
for i, tc := range testCases {
686690
t.Run(tc.name, func(t *testing.T) {
687-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
691+
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
688692
defer os.RemoveAll(tmpDir)
689693

690694
attacher, err := plug.NewAttacher()
@@ -941,7 +945,7 @@ func TestAttacherDetach(t *testing.T) {
941945
for _, tc := range testCases {
942946
t.Run(tc.name, func(t *testing.T) {
943947
t.Logf("running test: %v", tc.name)
944-
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil)
948+
plug, fakeWatcher, tmpDir, client := newTestWatchPlugin(t, nil, false)
945949
defer os.RemoveAll(tmpDir)
946950
if tc.reactor != nil {
947951
client.PrependReactor("*", "*", tc.reactor)
@@ -998,7 +1002,7 @@ func TestAttacherDetach(t *testing.T) {
9981002
func TestAttacherGetDeviceMountPath(t *testing.T) {
9991003
// Setup
10001004
// Create a new attacher
1001-
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
1005+
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
10021006
defer os.RemoveAll(tmpDir)
10031007
attacher, err0 := plug.NewAttacher()
10041008
if err0 != nil {
@@ -1163,7 +1167,7 @@ func TestAttacherMountDevice(t *testing.T) {
11631167

11641168
// Setup
11651169
// Create a new attacher
1166-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
1170+
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
11671171
defer os.RemoveAll(tmpDir)
11681172
attacher, err0 := plug.NewAttacher()
11691173
if err0 != nil {
@@ -1314,7 +1318,7 @@ func TestAttacherMountDeviceWithInline(t *testing.T) {
13141318

13151319
// Setup
13161320
// Create a new attacher
1317-
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil)
1321+
plug, fakeWatcher, tmpDir, _ := newTestWatchPlugin(t, nil, false)
13181322
defer os.RemoveAll(tmpDir)
13191323
attacher, err0 := plug.NewAttacher()
13201324
if err0 != nil {
@@ -1442,7 +1446,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
14421446
t.Logf("Running test case: %s", tc.testName)
14431447
// Setup
14441448
// Create a new attacher
1445-
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil)
1449+
plug, _, tmpDir, _ := newTestWatchPlugin(t, nil, true)
14461450
defer os.RemoveAll(tmpDir)
14471451
attacher, err0 := plug.NewAttacher()
14481452
if err0 != nil {
@@ -1529,7 +1533,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
15291533
}
15301534

15311535
// create a plugin mgr to load plugins and setup a fake client
1532-
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
1536+
func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset, setupInformer bool) (*csiPlugin, *watch.RaceFreeFakeWatcher, string, *fakeclient.Clientset) {
15331537
tmpDir, err := utiltesting.MkTmpdir("csi-test")
15341538
if err != nil {
15351539
t.Fatalf("can't create temp dir: %v", err)
@@ -1545,12 +1549,24 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
15451549
Spec: v1.NodeSpec{},
15461550
})
15471551
fakeWatcher := watch.NewRaceFreeFake()
1548-
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
1552+
if !setupInformer {
1553+
// TODO: In the fakeClient, if default watchReactor is overwritten, the volumeAttachmentInformer
1554+
// and the csiAttacher.Attach both endup reading from same channel causing hang in Attach().
1555+
// So, until this is fixed, we don't overwrite default reactor while setting up volumeAttachment informer.
1556+
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
1557+
}
15491558

15501559
// Start informer for CSIDrivers.
15511560
factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod)
15521561
csiDriverInformer := factory.Storage().V1().CSIDrivers()
15531562
csiDriverLister := csiDriverInformer.Lister()
1563+
var volumeAttachmentInformer storageinformer.VolumeAttachmentInformer
1564+
var volumeAttachmentLister storagelister.VolumeAttachmentLister
1565+
if setupInformer {
1566+
volumeAttachmentInformer = factory.Storage().V1().VolumeAttachments()
1567+
volumeAttachmentLister = volumeAttachmentInformer.Lister()
1568+
}
1569+
15541570
factory.Start(wait.NeverStop)
15551571

15561572
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
@@ -1559,6 +1575,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
15591575
ProbeVolumePlugins(),
15601576
"fakeNode",
15611577
csiDriverLister,
1578+
volumeAttachmentLister,
15621579
)
15631580
plugMgr := host.GetPluginMgr()
15641581

@@ -1577,5 +1594,10 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu
15771594
return csiDriverInformer.Informer().HasSynced(), nil
15781595
})
15791596

1597+
if volumeAttachmentInformer != nil {
1598+
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
1599+
return volumeAttachmentInformer.Informer().HasSynced(), nil
1600+
})
1601+
}
15801602
return csiPlug, fakeWatcher, tmpDir, fakeClient
15811603
}

pkg/volume/csi/csi_plugin.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ const (
5959
)
6060

6161
type csiPlugin struct {
62-
host volume.VolumeHost
63-
blockEnabled bool
64-
csiDriverLister storagelisters.CSIDriverLister
62+
host volume.VolumeHost
63+
blockEnabled bool
64+
csiDriverLister storagelisters.CSIDriverLister
65+
volumeAttachmentLister storagelisters.VolumeAttachmentLister
6566
}
6667

6768
// ProbeVolumePlugins returns implemented plugins
@@ -186,20 +187,26 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
186187
if csiClient == nil {
187188
klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
188189
} else {
189-
// set CSIDriverLister
190+
// set CSIDriverLister and volumeAttachmentLister
190191
adcHost, ok := host.(volume.AttachDetachVolumeHost)
191192
if ok {
192193
p.csiDriverLister = adcHost.CSIDriverLister()
193194
if p.csiDriverLister == nil {
194195
klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
195196
}
197+
p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
198+
if p.volumeAttachmentLister == nil {
199+
klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
200+
}
196201
}
197202
kletHost, ok := host.(volume.KubeletVolumeHost)
198203
if ok {
199204
p.csiDriverLister = kletHost.CSIDriverLister()
200205
if p.csiDriverLister == nil {
201206
klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
202207
}
208+
// We don't run the volumeAttachmentLister in the kubelet context
209+
p.volumeAttachmentLister = nil
203210
}
204211
}
205212

pkg/volume/csi/csi_plugin_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
6262
factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod)
6363
csiDriverInformer := factory.Storage().V1().CSIDrivers()
6464
csiDriverLister := csiDriverInformer.Lister()
65+
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
66+
volumeAttachmentLister := volumeAttachmentInformer.Lister()
6567
go factory.Start(wait.NeverStop)
6668

6769
host := volumetest.NewFakeVolumeHostWithCSINodeName(t,
@@ -70,6 +72,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
7072
ProbeVolumePlugins(),
7173
"fakeNode",
7274
csiDriverLister,
75+
volumeAttachmentLister,
7376
)
7477

7578
pluginMgr := host.GetPluginMgr()
@@ -88,6 +91,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri
8891
return csiDriverInformer.Informer().HasSynced(), nil
8992
})
9093

94+
wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) {
95+
return volumeAttachmentInformer.Informer().HasSynced(), nil
96+
})
9197
return csiPlug, tmpDir
9298
}
9399

@@ -1018,6 +1024,7 @@ func TestPluginFindAttachablePlugin(t *testing.T) {
10181024
ProbeVolumePlugins(),
10191025
"fakeNode",
10201026
factory.Storage().V1().CSIDrivers().Lister(),
1027+
factory.Storage().V1().VolumeAttachments().Lister(),
10211028
)
10221029

10231030
plugMgr := host.GetPluginMgr()
@@ -1137,7 +1144,7 @@ func TestPluginFindDeviceMountablePluginBySpec(t *testing.T) {
11371144
Spec: v1.NodeSpec{},
11381145
},
11391146
)
1140-
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil)
1147+
host := volumetest.NewFakeVolumeHostWithCSINodeName(t, tmpDir, client, ProbeVolumePlugins(), "fakeNode", nil, nil)
11411148
plugMgr := host.GetPluginMgr()
11421149
plug, err := plugMgr.FindDeviceMountablePluginBySpec(test.spec)
11431150
if err != nil && !test.shouldFail {

pkg/volume/csi/csi_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func TestCSI_VolumeAll(t *testing.T) {
250250

251251
factory := informers.NewSharedInformerFactory(client, time.Hour /* disable resync */)
252252
csiDriverInformer := factory.Storage().V1().CSIDrivers()
253+
volumeAttachmentInformer := factory.Storage().V1().VolumeAttachments()
253254
if driverInfo != nil {
254255
csiDriverInformer.Informer().GetStore().Add(driverInfo)
255256
}
@@ -261,6 +262,7 @@ func TestCSI_VolumeAll(t *testing.T) {
261262
ProbeVolumePlugins(),
262263
"fakeNode",
263264
csiDriverInformer.Lister(),
265+
volumeAttachmentInformer.Lister(),
264266
)
265267
plugMgr := host.GetPluginMgr()
266268
csiClient := setupClient(t, true)

0 commit comments

Comments
 (0)