Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pkg/csi/service/common/common_controller_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -448,3 +450,103 @@ func WaitForVolumeSnapshotDeleted(ctx context.Context, client snapshotterClientS

return nil
}

// WaitForVolumeSnapshotDeletedWithWatch waits for a VolumeSnapshot to get deleted using Watch API instead of polling.
func WaitForVolumeSnapshotDeletedWithWatch(ctx context.Context, client snapshotterClientSet.Interface,
snapshotName string, namespace string, timeout time.Duration) error {
log := logger.GetLogger(ctx)
timeoutSeconds := int64(timeout.Seconds())

log.Infof("Waiting up to %d seconds for VolumeSnapshot %s in namespace %s to be deleted using Watch API",
timeoutSeconds, snapshotName, namespace)

// First check if VolumeSnapshot already exists
_, err := client.SnapshotV1().VolumeSnapshots(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.Infof("VolumeSnapshot %s/%s is already deleted", namespace, snapshotName)
return nil
}
return fmt.Errorf("unable to fetch VolumeSnapshot %s/%s with err: %+v", namespace, snapshotName, err)
}

// Set up watch for the specific VolumeSnapshot
watchSnapshot, err := client.SnapshotV1().VolumeSnapshots(namespace).Watch(
ctx,
metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", snapshotName).String(),
TimeoutSeconds: &timeoutSeconds,
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch VolumeSnapshot %s in namespace %s with Error: %+v",
snapshotName, namespace, err)
}
defer watchSnapshot.Stop()

// Watch for deletion events
for event := range watchSnapshot.ResultChan() {
switch event.Type {
case watch.Deleted:
log.Infof("VolumeSnapshot %s/%s was deleted", namespace, snapshotName)
return nil
case watch.Error:
return fmt.Errorf("error watching VolumeSnapshot %s/%s: %+v", namespace, snapshotName, event.Object)
}
// Continue watching for other event types (Added, Modified)
}

// If we reach here, the watch timed out
return fmt.Errorf("volumeSnapshot %s/%s was not deleted within %d seconds",
namespace, snapshotName, timeoutSeconds)
}

// WaitForPVCDeletedWithWatch waits for a PVC to get deleted using Watch API instead of polling.
func WaitForPVCDeletedWithWatch(ctx context.Context, client clientset.Interface, pvcName string,
namespace string, timeout time.Duration) error {
log := logger.GetLogger(ctx)
timeoutSeconds := int64(timeout.Seconds())

log.Infof("Waiting up to %d seconds for PersistentVolumeClaim %s in namespace %s to be deleted using Watch API",
timeoutSeconds, pvcName, namespace)

// First check if PVC already exists
_, err := client.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
log.Infof("PersistentVolumeClaim %s/%s is already deleted", namespace, pvcName)
return nil
}
return fmt.Errorf("unable to fetch PersistentVolumeClaim %s/%s with err: %+v", namespace, pvcName, err)
}

// Set up watch for the specific PVC
watchPVC, err := client.CoreV1().PersistentVolumeClaims(namespace).Watch(
ctx,
metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", pvcName).String(),
TimeoutSeconds: &timeoutSeconds,
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch PersistentVolumeClaim %s in namespace %s with Error: %+v",
pvcName, namespace, err)
}
defer watchPVC.Stop()

// Watch for deletion events
for event := range watchPVC.ResultChan() {
switch event.Type {
case watch.Deleted:
log.Infof("PersistentVolumeClaim %s/%s was deleted", namespace, pvcName)
return nil
case watch.Error:
return fmt.Errorf("error watching PersistentVolumeClaim %s/%s: %+v", namespace, pvcName, event.Object)
}
// Continue watching for other event types (Added, Modified)
}

// If we reach here, the watch timed out
return fmt.Errorf("persistentVolumeClaim %s/%s was not deleted within %d seconds",
namespace, pvcName, timeoutSeconds)
}
144 changes: 144 additions & 0 deletions pkg/csi/service/common/common_controller_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ package common
import (
"context"
"testing"
"time"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake"
vim25types "github.com/vmware/govmomi/vim25/types"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

// TestUseVslmAPIsFuncForVC67Update3l tests UseVslmAPIs method for VC version 6.7 Update 3l
Expand Down Expand Up @@ -198,3 +204,141 @@ func TestCheckAPIForVC70u3(t *testing.T) {
t.Fatalf("CheckAPI method failing for VC %q", vcVersion)
}
}

// TestWaitForPVCDeletedWithWatch tests the WaitForPVCDeletedWithWatch function
func TestWaitForPVCDeletedWithWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test case 1: PVC already deleted (not found)
t.Run("PVC already deleted", func(t *testing.T) {
k8sclient := fake.NewSimpleClientset()

err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Second*5)
if err != nil {
t.Fatalf("Expected no error when PVC is already deleted, got: %v", err)
}
})

// Test case 2: PVC exists and gets deleted during watch
t.Run("PVC gets deleted during watch", func(t *testing.T) {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pvc",
Namespace: "test-namespace",
},
}
k8sclient := fake.NewSimpleClientset(pvc)

// Start watching in a goroutine
errChan := make(chan error, 1)
go func() {
err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Second*5)
errChan <- err
}()

// Give the watch time to start
time.Sleep(time.Millisecond * 100)

// Delete the PVC
err := k8sclient.CoreV1().PersistentVolumeClaims("test-namespace").Delete(ctx, "test-pvc", metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete PVC: %v", err)
}

// Wait for the watch to complete
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Expected no error when PVC is deleted during watch, got: %v", err)
}
case <-time.After(time.Second * 2):
t.Fatal("Watch did not complete within expected time")
}
})

// Test case 3: Watch timeout
t.Run("Watch timeout", func(t *testing.T) {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pvc",
Namespace: "test-namespace",
},
}
k8sclient := fake.NewSimpleClientset(pvc)

err := WaitForPVCDeletedWithWatch(ctx, k8sclient, "test-pvc", "test-namespace", time.Millisecond*100)
if err == nil {
t.Fatal("Expected timeout error when PVC is not deleted")
}
})
}

// TestWaitForVolumeSnapshotDeletedWithWatch tests the WaitForVolumeSnapshotDeletedWithWatch function
func TestWaitForVolumeSnapshotDeletedWithWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test case 1: VolumeSnapshot already deleted (not found)
t.Run("VolumeSnapshot already deleted", func(t *testing.T) {
snapshotClient := snapshotfake.NewSimpleClientset()

err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Second*5)
if err != nil {
t.Fatalf("Expected no error when VolumeSnapshot is already deleted, got: %v", err)
}
})

// Test case 2: VolumeSnapshot exists and gets deleted during watch
t.Run("VolumeSnapshot gets deleted during watch", func(t *testing.T) {
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "test-snapshot",
Namespace: "test-namespace",
},
}
snapshotClient := snapshotfake.NewSimpleClientset(snapshot)

// Start watching in a goroutine
errChan := make(chan error, 1)
go func() {
err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Second*5)
errChan <- err
}()

// Give the watch time to start
time.Sleep(time.Millisecond * 100)

// Delete the VolumeSnapshot
err := snapshotClient.SnapshotV1().VolumeSnapshots("test-namespace").Delete(ctx, "test-snapshot", metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete VolumeSnapshot: %v", err)
}

// Wait for the watch to complete
select {
case err := <-errChan:
if err != nil {
t.Fatalf("Expected no error when VolumeSnapshot is deleted during watch, got: %v", err)
}
case <-time.After(time.Second * 2):
t.Fatal("Watch did not complete within expected time")
}
})

// Test case 3: Watch timeout
t.Run("Watch timeout", func(t *testing.T) {
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "test-snapshot",
Namespace: "test-namespace",
},
}
snapshotClient := snapshotfake.NewSimpleClientset(snapshot)

err := WaitForVolumeSnapshotDeletedWithWatch(ctx, snapshotClient, "test-snapshot", "test-namespace", time.Millisecond*100)
if err == nil {
t.Fatal("Expected timeout error when VolumeSnapshot is not deleted")
}
})
}
4 changes: 2 additions & 2 deletions pkg/csi/service/wcpguest/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ
}

// Wait for PVC to be deleted from supervisor cluster
err = common.WaitForPVCDeleted(ctx, c.supervisorClient,
err = common.WaitForPVCDeletedWithWatch(ctx, c.supervisorClient,
req.VolumeId, c.supervisorNamespace,
time.Duration(getProvisionTimeoutInMin(ctx))*time.Minute)
if err != nil {
Expand Down Expand Up @@ -1796,7 +1796,7 @@ func (c *controller) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshot
}

// Wait for VolumeSnapshot to be deleted from supervisor cluster
err = common.WaitForVolumeSnapshotDeleted(ctx, c.supervisorSnapshotterClient,
err = common.WaitForVolumeSnapshotDeletedWithWatch(ctx, c.supervisorSnapshotterClient,
supervisorVolumeSnapshotName, c.supervisorNamespace,
time.Duration(getSnapshotTimeoutInMin(ctx))*time.Minute)
if err != nil {
Expand Down