Skip to content

Commit 170162b

Browse files
committed
Use watch instead of poll to check PVC/VolumeSnapshot deletion
1 parent cae0b46 commit 170162b

File tree

3 files changed

+248
-2
lines changed

3 files changed

+248
-2
lines changed

pkg/csi/service/common/common_controller_helper.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import (
2727
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned"
2828
apierrors "k8s.io/apimachinery/pkg/api/errors"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/fields"
3031
"k8s.io/apimachinery/pkg/util/wait"
32+
"k8s.io/apimachinery/pkg/watch"
3133
clientset "k8s.io/client-go/kubernetes"
3234

3335
"github.com/container-storage-interface/spec/lib/go/csi"
@@ -448,3 +450,103 @@ func WaitForVolumeSnapshotDeleted(ctx context.Context, client snapshotterClientS
448450

449451
return nil
450452
}
453+
454+
// WaitForVolumeSnapshotDeletedWithWatch waits for a VolumeSnapshot to get deleted using Watch API instead of polling.
455+
func WaitForVolumeSnapshotDeletedWithWatch(ctx context.Context, client snapshotterClientSet.Interface,
456+
snapshotName string, namespace string, timeout time.Duration) error {
457+
log := logger.GetLogger(ctx)
458+
timeoutSeconds := int64(timeout.Seconds())
459+
460+
log.Infof("Waiting up to %d seconds for VolumeSnapshot %s in namespace %s to be deleted using Watch API",
461+
timeoutSeconds, snapshotName, namespace)
462+
463+
// First check if VolumeSnapshot already exists
464+
_, err := client.SnapshotV1().VolumeSnapshots(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
465+
if err != nil {
466+
if apierrors.IsNotFound(err) {
467+
log.Infof("VolumeSnapshot %s/%s is already deleted", namespace, snapshotName)
468+
return nil
469+
}
470+
return fmt.Errorf("unable to fetch VolumeSnapshot %s/%s with err: %+v", namespace, snapshotName, err)
471+
}
472+
473+
// Set up watch for the specific VolumeSnapshot
474+
watchSnapshot, err := client.SnapshotV1().VolumeSnapshots(namespace).Watch(
475+
ctx,
476+
metav1.ListOptions{
477+
FieldSelector: fields.OneTermEqualSelector("metadata.name", snapshotName).String(),
478+
TimeoutSeconds: &timeoutSeconds,
479+
Watch: true,
480+
})
481+
if err != nil {
482+
return fmt.Errorf("failed to watch VolumeSnapshot %s in namespace %s with Error: %+v",
483+
snapshotName, namespace, err)
484+
}
485+
defer watchSnapshot.Stop()
486+
487+
// Watch for deletion events
488+
for event := range watchSnapshot.ResultChan() {
489+
switch event.Type {
490+
case watch.Deleted:
491+
log.Infof("VolumeSnapshot %s/%s was deleted", namespace, snapshotName)
492+
return nil
493+
case watch.Error:
494+
return fmt.Errorf("error watching VolumeSnapshot %s/%s: %+v", namespace, snapshotName, event.Object)
495+
}
496+
// Continue watching for other event types (Added, Modified)
497+
}
498+
499+
// If we reach here, the watch timed out
500+
return fmt.Errorf("volumeSnapshot %s/%s was not deleted within %d seconds",
501+
namespace, snapshotName, timeoutSeconds)
502+
}
503+
504+
// WaitForPVCDeletedWithWatch waits for a PVC to get deleted using Watch API instead of polling.
505+
func WaitForPVCDeletedWithWatch(ctx context.Context, client clientset.Interface, pvcName string,
506+
namespace string, timeout time.Duration) error {
507+
log := logger.GetLogger(ctx)
508+
timeoutSeconds := int64(timeout.Seconds())
509+
510+
log.Infof("Waiting up to %d seconds for PersistentVolumeClaim %s in namespace %s to be deleted using Watch API",
511+
timeoutSeconds, pvcName, namespace)
512+
513+
// First check if PVC already exists
514+
_, err := client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
515+
if err != nil {
516+
if apierrors.IsNotFound(err) {
517+
log.Infof("PersistentVolumeClaim %s/%s is already deleted", namespace, pvcName)
518+
return nil
519+
}
520+
return fmt.Errorf("unable to fetch PersistentVolumeClaim %s/%s with err: %+v", namespace, pvcName, err)
521+
}
522+
523+
// Set up watch for the specific PVC
524+
watchPVC, err := client.CoreV1().PersistentVolumeClaims(namespace).Watch(
525+
ctx,
526+
metav1.ListOptions{
527+
FieldSelector: fields.OneTermEqualSelector("metadata.name", pvcName).String(),
528+
TimeoutSeconds: &timeoutSeconds,
529+
Watch: true,
530+
})
531+
if err != nil {
532+
return fmt.Errorf("failed to watch PersistentVolumeClaim %s in namespace %s with Error: %+v",
533+
pvcName, namespace, err)
534+
}
535+
defer watchPVC.Stop()
536+
537+
// Watch for deletion events
538+
for event := range watchPVC.ResultChan() {
539+
switch event.Type {
540+
case watch.Deleted:
541+
log.Infof("PersistentVolumeClaim %s/%s was deleted", namespace, pvcName)
542+
return nil
543+
case watch.Error:
544+
return fmt.Errorf("error watching PersistentVolumeClaim %s/%s: %+v", namespace, pvcName, event.Object)
545+
}
546+
// Continue watching for other event types (Added, Modified)
547+
}
548+
549+
// If we reach here, the watch timed out
550+
return fmt.Errorf("persistentVolumeClaim %s/%s was not deleted within %d seconds",
551+
namespace, pvcName, timeoutSeconds)
552+
}

pkg/csi/service/common/common_controller_helper_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,14 @@ package common
1919
import (
2020
"context"
2121
"testing"
22+
"time"
2223

24+
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
25+
snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake"
2326
vim25types "github.com/vmware/govmomi/vim25/types"
27+
v1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/client-go/kubernetes/fake"
2430
)
2531

2632
// TestUseVslmAPIsFuncForVC67Update3l tests UseVslmAPIs method for VC version 6.7 Update 3l
@@ -198,3 +204,141 @@ func TestCheckAPIForVC70u3(t *testing.T) {
198204
t.Fatalf("CheckAPI method failing for VC %q", vcVersion)
199205
}
200206
}
207+
208+
// TestWaitForPVCDeletedWithWatch tests the WaitForPVCDeletedWithWatch function
209+
func TestWaitForPVCDeletedWithWatch(t *testing.T) {
210+
ctx, cancel := context.WithCancel(context.Background())
211+
defer cancel()
212+
213+
// Test case 1: PVC already deleted (not found)
214+
t.Run("PVC already deleted", func(t *testing.T) {
215+
k8sclient := fake.NewSimpleClientset()
216+
217+
err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Second*5)
218+
if err != nil {
219+
t.Fatalf("Expected no error when PVC is already deleted, got: %v", err)
220+
}
221+
})
222+
223+
// Test case 2: PVC exists and gets deleted during watch
224+
t.Run("PVC gets deleted during watch", func(t *testing.T) {
225+
pvc := &v1.PersistentVolumeClaim{
226+
ObjectMeta: metav1.ObjectMeta{
227+
Name: "test-pvc",
228+
Namespace: "test-namespace",
229+
},
230+
}
231+
k8sclient := fake.NewSimpleClientset(pvc)
232+
233+
// Start watching in a goroutine
234+
errChan := make(chan error, 1)
235+
go func() {
236+
err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Second*5)
237+
errChan <- err
238+
}()
239+
240+
// Give the watch time to start
241+
time.Sleep(time.Millisecond * 100)
242+
243+
// Delete the PVC
244+
err := k8sclient.CoreV1().PersistentVolumeClaims("test-namespace").Delete(ctx, "test-pvc", metav1.DeleteOptions{})
245+
if err != nil {
246+
t.Fatalf("Failed to delete PVC: %v", err)
247+
}
248+
249+
// Wait for the watch to complete
250+
select {
251+
case err := <-errChan:
252+
if err != nil {
253+
t.Fatalf("Expected no error when PVC is deleted during watch, got: %v", err)
254+
}
255+
case <-time.After(time.Second * 2):
256+
t.Fatal("Watch did not complete within expected time")
257+
}
258+
})
259+
260+
// Test case 3: Watch timeout
261+
t.Run("Watch timeout", func(t *testing.T) {
262+
pvc := &v1.PersistentVolumeClaim{
263+
ObjectMeta: metav1.ObjectMeta{
264+
Name: "test-pvc",
265+
Namespace: "test-namespace",
266+
},
267+
}
268+
k8sclient := fake.NewSimpleClientset(pvc)
269+
270+
err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Millisecond*100)
271+
if err == nil {
272+
t.Fatal("Expected timeout error when PVC is not deleted")
273+
}
274+
})
275+
}
276+
277+
// TestWaitForVolumeSnapshotDeletedWithWatch tests the WaitForVolumeSnapshotDeletedWithWatch function
278+
func TestWaitForVolumeSnapshotDeletedWithWatch(t *testing.T) {
279+
ctx, cancel := context.WithCancel(context.Background())
280+
defer cancel()
281+
282+
// Test case 1: VolumeSnapshot already deleted (not found)
283+
t.Run("VolumeSnapshot already deleted", func(t *testing.T) {
284+
snapshotClient := snapshotfake.NewSimpleClientset()
285+
286+
err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Second*5)
287+
if err != nil {
288+
t.Fatalf("Expected no error when VolumeSnapshot is already deleted, got: %v", err)
289+
}
290+
})
291+
292+
// Test case 2: VolumeSnapshot exists and gets deleted during watch
293+
t.Run("VolumeSnapshot gets deleted during watch", func(t *testing.T) {
294+
snapshot := &snapshotv1.VolumeSnapshot{
295+
ObjectMeta: metav1.ObjectMeta{
296+
Name: "test-snapshot",
297+
Namespace: "test-namespace",
298+
},
299+
}
300+
snapshotClient := snapshotfake.NewSimpleClientset(snapshot)
301+
302+
// Start watching in a goroutine
303+
errChan := make(chan error, 1)
304+
go func() {
305+
err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Second*5)
306+
errChan <- err
307+
}()
308+
309+
// Give the watch time to start
310+
time.Sleep(time.Millisecond * 100)
311+
312+
// Delete the VolumeSnapshot
313+
err := snapshotClient.SnapshotV1().VolumeSnapshots("test-namespace").Delete(ctx, "test-snapshot", metav1.DeleteOptions{})
314+
if err != nil {
315+
t.Fatalf("Failed to delete VolumeSnapshot: %v", err)
316+
}
317+
318+
// Wait for the watch to complete
319+
select {
320+
case err := <-errChan:
321+
if err != nil {
322+
t.Fatalf("Expected no error when VolumeSnapshot is deleted during watch, got: %v", err)
323+
}
324+
case <-time.After(time.Second * 2):
325+
t.Fatal("Watch did not complete within expected time")
326+
}
327+
})
328+
329+
// Test case 3: Watch timeout
330+
t.Run("Watch timeout", func(t *testing.T) {
331+
snapshot := &snapshotv1.VolumeSnapshot{
332+
ObjectMeta: metav1.ObjectMeta{
333+
Name: "test-snapshot",
334+
Namespace: "test-namespace",
335+
},
336+
}
337+
snapshotClient := snapshotfake.NewSimpleClientset(snapshot)
338+
339+
err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Millisecond*100)
340+
if err == nil {
341+
t.Fatal("Expected timeout error when VolumeSnapshot is not deleted")
342+
}
343+
})
344+
}

pkg/csi/service/wcpguest/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ
639639
}
640640

641641
// Wait for PVC to be deleted from supervisor cluster
642-
err = common.WaitForPVCDeleted(ctx, c.supervisorClient,
642+
err = common.WaitForPVCDeletedWithWatch(ctx, c.supervisorClient,
643643
req.VolumeId, c.supervisorNamespace,
644644
time.Duration(getProvisionTimeoutInMin(ctx))*time.Minute)
645645
if err != nil {
@@ -1796,7 +1796,7 @@ func (c *controller) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshot
17961796
}
17971797

17981798
// Wait for VolumeSnapshot to be deleted from supervisor cluster
1799-
err = common.WaitForVolumeSnapshotDeleted(ctx, c.supervisorSnapshotterClient,
1799+
err = common.WaitForVolumeSnapshotDeletedWithWatch(ctx, c.supervisorSnapshotterClient,
18001800
supervisorVolumeSnapshotName, c.supervisorNamespace,
18011801
time.Duration(getSnapshotTimeoutInMin(ctx))*time.Minute)
18021802
if err != nil {

0 commit comments

Comments
 (0)