Skip to content

Commit e122875

Browse files
committed
csi volume plugin stop watching the volumeattachment object if the object is not found or volume is not attached when kubelet wait for volume attached
1 parent 619b005 commit e122875

File tree

2 files changed

+13
-189
lines changed

2 files changed

+13
-189
lines changed

pkg/volume/csi/csi_attacher.go

Lines changed: 13 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/types"
3434
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/apimachinery/pkg/watch"
3635
utilfeature "k8s.io/apiserver/pkg/util/feature"
3736
"k8s.io/client-go/kubernetes"
3837
"k8s.io/klog/v2"
@@ -54,8 +53,6 @@ type csiAttacher struct {
5453
csiClient csiClient
5554
}
5655

57-
type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
58-
5956
// volume.Attacher methods
6057
var _ volume.Attacher = &csiAttacher{}
6158

@@ -141,38 +138,34 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
141138
return "", nil
142139
}
143140

144-
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
141+
// WaitForAttach waits for the attach operation to complete and returns the device path when it is done.
142+
// But in this case, there should be no waiting. The device is found by the CSI driver later, in NodeStage / NodePublish calls.
143+
// so it should just return device metadata, in this case it is VolumeAttachment name. If the target VolumeAttachment does not
144+
// exist or is not attached, the function will return an error. And then the caller (kubelet) should retry it.
145+
// We can get rid of watching it that serves no purpose. More details in https://issues.k8s.io/124398
146+
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, _ time.Duration) (string, error) {
145147
source, err := getPVSourceFromSpec(spec)
146148
if err != nil {
147149
return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
148150
}
149151

152+
volumeHandle := source.VolumeHandle
150153
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
151154

152-
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
153-
}
154-
155-
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
156-
klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
157-
158-
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
159-
defer timer.Stop()
160-
161-
return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
162-
}
163-
164-
func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
165-
166-
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
167155
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
168156
if err != nil {
169157
klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
170158
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
171159
}
172-
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
160+
161+
successful, err := verifyAttachmentStatus(attach, volumeHandle)
173162
if err != nil {
174163
return "", err
175164
}
165+
if !successful {
166+
klog.Error(log("attacher.WaitForAttach failed for volume [%s] attached (will continue to try)", volumeHandle))
167+
return "", fmt.Errorf("volume %v is not attached for volume attachment %v", volumeHandle, attachID)
168+
}
176169
return attach.Name, nil
177170
}
178171

@@ -532,62 +525,6 @@ func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spe
532525
}
533526
}
534527

535-
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
536-
timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
537-
successful, err := verifyStatus(attach, volumeHandle)
538-
if err != nil {
539-
return err
540-
}
541-
if successful {
542-
return nil
543-
}
544-
545-
watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
546-
if err != nil {
547-
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
548-
}
549-
550-
ch := watcher.ResultChan()
551-
defer watcher.Stop()
552-
for {
553-
select {
554-
case event, ok := <-ch:
555-
if !ok {
556-
klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
557-
return errors.New("volume attachment watch channel had been closed")
558-
}
559-
560-
switch event.Type {
561-
case watch.Added, watch.Modified:
562-
attach, _ := event.Object.(*storage.VolumeAttachment)
563-
successful, err := verifyStatus(attach, volumeHandle)
564-
if err != nil {
565-
return err
566-
}
567-
if successful {
568-
return nil
569-
}
570-
case watch.Deleted:
571-
// set attach nil to get different results
572-
// for detachment, a deleted event means successful detachment, should return success
573-
// for attachment, should return fail
574-
if successful, err := verifyStatus(nil, volumeHandle); !successful {
575-
return err
576-
}
577-
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
578-
return nil
579-
580-
case watch.Error:
581-
klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
582-
}
583-
584-
case <-timer.C:
585-
klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
586-
return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
587-
}
588-
}
589-
}
590-
591528
func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
592529
klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
593530

pkg/volume/csi/csi_attacher_test.go

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -649,119 +649,6 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
649649
}
650650
}
651651

652-
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
653-
nodeName := "fakeNode"
654-
testCases := []struct {
655-
name string
656-
initAttached bool
657-
finalAttached bool
658-
trigerWatchEventTime time.Duration
659-
initAttachErr *storage.VolumeError
660-
finalAttachErr *storage.VolumeError
661-
timeout time.Duration
662-
shouldFail bool
663-
watchTimeout time.Duration
664-
}{
665-
{
666-
name: "attach success at get",
667-
initAttached: true,
668-
timeout: 50 * time.Millisecond,
669-
shouldFail: false,
670-
},
671-
{
672-
name: "attachment error ant get",
673-
initAttachErr: &storage.VolumeError{Message: "missing volume"},
674-
timeout: 30 * time.Millisecond,
675-
shouldFail: true,
676-
},
677-
{
678-
name: "attach success at watch",
679-
initAttached: false,
680-
finalAttached: true,
681-
trigerWatchEventTime: 5 * time.Millisecond,
682-
timeout: 50 * time.Millisecond,
683-
shouldFail: false,
684-
},
685-
{
686-
name: "attachment error ant watch",
687-
initAttached: false,
688-
finalAttached: false,
689-
finalAttachErr: &storage.VolumeError{Message: "missing volume"},
690-
trigerWatchEventTime: 5 * time.Millisecond,
691-
timeout: 30 * time.Millisecond,
692-
shouldFail: true,
693-
},
694-
{
695-
name: "time ran out",
696-
initAttached: false,
697-
finalAttached: true,
698-
trigerWatchEventTime: 100 * time.Millisecond,
699-
timeout: 50 * time.Millisecond,
700-
shouldFail: true,
701-
},
702-
}
703-
704-
for i, tc := range testCases {
705-
t.Run(tc.name, func(t *testing.T) {
706-
fakeClient := fakeclient.NewSimpleClientset()
707-
plug, tmpDir := newTestPlugin(t, fakeClient)
708-
defer os.RemoveAll(tmpDir)
709-
710-
fakeWatcher := watch.NewRaceFreeFake()
711-
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
712-
713-
attacher, err := plug.NewAttacher()
714-
if err != nil {
715-
t.Fatalf("failed to create new attacher: %v", err)
716-
}
717-
csiAttacher := getCsiAttacherFromVolumeAttacher(attacher, tc.watchTimeout)
718-
719-
t.Logf("running test: %v", tc.name)
720-
pvName := fmt.Sprintf("test-pv-%d", i)
721-
volID := fmt.Sprintf("test-vol-%d", i)
722-
attachID := getAttachmentName(volID, testDriver, nodeName)
723-
attachment := makeTestAttachment(attachID, nodeName, pvName)
724-
attachment.Status.Attached = tc.initAttached
725-
attachment.Status.AttachError = tc.initAttachErr
726-
_, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
727-
if err != nil {
728-
t.Fatalf("failed to attach: %v", err)
729-
}
730-
731-
trigerWatchEventTime := tc.trigerWatchEventTime
732-
finalAttached := tc.finalAttached
733-
finalAttachErr := tc.finalAttachErr
734-
var wg sync.WaitGroup
735-
// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment
736-
if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout {
737-
wg.Add(1)
738-
go func() {
739-
defer wg.Done()
740-
time.Sleep(trigerWatchEventTime)
741-
attachment := makeTestAttachment(attachID, nodeName, pvName)
742-
attachment.Status.Attached = finalAttached
743-
attachment.Status.AttachError = finalAttachErr
744-
fakeWatcher.Modify(attachment)
745-
}()
746-
}
747-
748-
retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout)
749-
if tc.shouldFail && err == nil {
750-
t.Error("expecting failure, but err is nil")
751-
}
752-
if tc.initAttachErr != nil && err != nil {
753-
if tc.initAttachErr.Message != err.Error() {
754-
t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error())
755-
}
756-
}
757-
if err == nil && retID != attachID {
758-
t.Errorf("attacher.WaitForAttach not returning attachment ID")
759-
}
760-
wg.Wait()
761-
})
762-
}
763-
}
764-
765652
func TestAttacherVolumesAreAttached(t *testing.T) {
766653
type attachedSpec struct {
767654
volName string

0 commit comments

Comments
 (0)