Skip to content

Commit e74205b

Browse files
authored
Merge pull request kubernetes#126961 from carlory/fix-124398
kubelet: csi plugin stop watching the volumeattachment object
2 parents 3d65369 + e122875 commit e74205b

File tree

2 files changed

+13
-189
lines changed

2 files changed

+13
-189
lines changed

pkg/volume/csi/csi_attacher.go

Lines changed: 13 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/types"
3434
"k8s.io/apimachinery/pkg/util/wait"
35-
"k8s.io/apimachinery/pkg/watch"
3635
utilfeature "k8s.io/apiserver/pkg/util/feature"
3736
"k8s.io/client-go/kubernetes"
3837
"k8s.io/klog/v2"
@@ -54,8 +53,6 @@ type csiAttacher struct {
5453
csiClient csiClient
5554
}
5655

57-
type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
58-
5956
// volume.Attacher methods
6057
var _ volume.Attacher = &csiAttacher{}
6158

@@ -141,38 +138,34 @@ func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string
141138
return "", nil
142139
}
143140

144-
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
141+
// WaitForAttach waits for the attach operation to complete and returns the device path when it is done.
142+
// But in this case, there should be no waiting. The device is found by the CSI driver later, in NodeStage / NodePublish calls.
143+
// so it should just return device metadata, in this case it is VolumeAttachment name. If the target VolumeAttachment does not
144+
// exist or is not attached, the function will return an error. And then the caller (kubelet) should retry it.
145+
// We can get rid of watching it that serves no purpose. More details in https://issues.k8s.io/124398
146+
func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, _ time.Duration) (string, error) {
145147
source, err := getPVSourceFromSpec(spec)
146148
if err != nil {
147149
return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
148150
}
149151

152+
volumeHandle := source.VolumeHandle
150153
attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
151154

152-
return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
153-
}
154-
155-
func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
156-
klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
157-
158-
timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
159-
defer timer.Stop()
160-
161-
return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
162-
}
163-
164-
func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
165-
166-
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
167155
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
168156
if err != nil {
169157
klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
170158
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
171159
}
172-
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
160+
161+
successful, err := verifyAttachmentStatus(attach, volumeHandle)
173162
if err != nil {
174163
return "", err
175164
}
165+
if !successful {
166+
klog.Error(log("attacher.WaitForAttach failed for volume [%s] attached (will continue to try)", volumeHandle))
167+
return "", fmt.Errorf("volume %v is not attached for volume attachment %v", volumeHandle, attachID)
168+
}
176169
return attach.Name, nil
177170
}
178171

@@ -532,62 +525,6 @@ func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spe
532525
}
533526
}
534527

535-
func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
536-
timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
537-
successful, err := verifyStatus(attach, volumeHandle)
538-
if err != nil {
539-
return err
540-
}
541-
if successful {
542-
return nil
543-
}
544-
545-
watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
546-
if err != nil {
547-
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
548-
}
549-
550-
ch := watcher.ResultChan()
551-
defer watcher.Stop()
552-
for {
553-
select {
554-
case event, ok := <-ch:
555-
if !ok {
556-
klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
557-
return errors.New("volume attachment watch channel had been closed")
558-
}
559-
560-
switch event.Type {
561-
case watch.Added, watch.Modified:
562-
attach, _ := event.Object.(*storage.VolumeAttachment)
563-
successful, err := verifyStatus(attach, volumeHandle)
564-
if err != nil {
565-
return err
566-
}
567-
if successful {
568-
return nil
569-
}
570-
case watch.Deleted:
571-
// set attach nil to get different results
572-
// for detachment, a deleted event means successful detachment, should return success
573-
// for attachment, should return fail
574-
if successful, err := verifyStatus(nil, volumeHandle); !successful {
575-
return err
576-
}
577-
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
578-
return nil
579-
580-
case watch.Error:
581-
klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
582-
}
583-
584-
case <-timer.C:
585-
klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
586-
return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
587-
}
588-
}
589-
}
590-
591528
func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
592529
klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
593530

pkg/volume/csi/csi_attacher_test.go

Lines changed: 0 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -649,119 +649,6 @@ func TestAttacherWaitForAttachWithInline(t *testing.T) {
649649
}
650650
}
651651

652-
func TestAttacherWaitForVolumeAttachment(t *testing.T) {
653-
nodeName := "fakeNode"
654-
testCases := []struct {
655-
name string
656-
initAttached bool
657-
finalAttached bool
658-
trigerWatchEventTime time.Duration
659-
initAttachErr *storage.VolumeError
660-
finalAttachErr *storage.VolumeError
661-
timeout time.Duration
662-
shouldFail bool
663-
watchTimeout time.Duration
664-
}{
665-
{
666-
name: "attach success at get",
667-
initAttached: true,
668-
timeout: 50 * time.Millisecond,
669-
shouldFail: false,
670-
},
671-
{
672-
name: "attachment error ant get",
673-
initAttachErr: &storage.VolumeError{Message: "missing volume"},
674-
timeout: 30 * time.Millisecond,
675-
shouldFail: true,
676-
},
677-
{
678-
name: "attach success at watch",
679-
initAttached: false,
680-
finalAttached: true,
681-
trigerWatchEventTime: 5 * time.Millisecond,
682-
timeout: 50 * time.Millisecond,
683-
shouldFail: false,
684-
},
685-
{
686-
name: "attachment error ant watch",
687-
initAttached: false,
688-
finalAttached: false,
689-
finalAttachErr: &storage.VolumeError{Message: "missing volume"},
690-
trigerWatchEventTime: 5 * time.Millisecond,
691-
timeout: 30 * time.Millisecond,
692-
shouldFail: true,
693-
},
694-
{
695-
name: "time ran out",
696-
initAttached: false,
697-
finalAttached: true,
698-
trigerWatchEventTime: 100 * time.Millisecond,
699-
timeout: 50 * time.Millisecond,
700-
shouldFail: true,
701-
},
702-
}
703-
704-
for i, tc := range testCases {
705-
t.Run(tc.name, func(t *testing.T) {
706-
fakeClient := fakeclient.NewSimpleClientset()
707-
plug, tmpDir := newTestPlugin(t, fakeClient)
708-
defer os.RemoveAll(tmpDir)
709-
710-
fakeWatcher := watch.NewRaceFreeFake()
711-
fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil))
712-
713-
attacher, err := plug.NewAttacher()
714-
if err != nil {
715-
t.Fatalf("failed to create new attacher: %v", err)
716-
}
717-
csiAttacher := getCsiAttacherFromVolumeAttacher(attacher, tc.watchTimeout)
718-
719-
t.Logf("running test: %v", tc.name)
720-
pvName := fmt.Sprintf("test-pv-%d", i)
721-
volID := fmt.Sprintf("test-vol-%d", i)
722-
attachID := getAttachmentName(volID, testDriver, nodeName)
723-
attachment := makeTestAttachment(attachID, nodeName, pvName)
724-
attachment.Status.Attached = tc.initAttached
725-
attachment.Status.AttachError = tc.initAttachErr
726-
_, err = csiAttacher.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
727-
if err != nil {
728-
t.Fatalf("failed to attach: %v", err)
729-
}
730-
731-
trigerWatchEventTime := tc.trigerWatchEventTime
732-
finalAttached := tc.finalAttached
733-
finalAttachErr := tc.finalAttachErr
734-
var wg sync.WaitGroup
735-
// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment
736-
if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout {
737-
wg.Add(1)
738-
go func() {
739-
defer wg.Done()
740-
time.Sleep(trigerWatchEventTime)
741-
attachment := makeTestAttachment(attachID, nodeName, pvName)
742-
attachment.Status.Attached = finalAttached
743-
attachment.Status.AttachError = finalAttachErr
744-
fakeWatcher.Modify(attachment)
745-
}()
746-
}
747-
748-
retID, err := csiAttacher.waitForVolumeAttachment(volID, attachID, tc.timeout)
749-
if tc.shouldFail && err == nil {
750-
t.Error("expecting failure, but err is nil")
751-
}
752-
if tc.initAttachErr != nil && err != nil {
753-
if tc.initAttachErr.Message != err.Error() {
754-
t.Errorf("expecting error [%v], got [%v]", tc.initAttachErr.Message, err.Error())
755-
}
756-
}
757-
if err == nil && retID != attachID {
758-
t.Errorf("attacher.WaitForAttach not returning attachment ID")
759-
}
760-
wg.Wait()
761-
})
762-
}
763-
}
764-
765652
func TestAttacherVolumesAreAttached(t *testing.T) {
766653
type attachedSpec struct {
767654
volName string

0 commit comments

Comments
 (0)