Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config"

clientset "k8s.io/client-go/kubernetes"
apis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
cnsregistervolumev1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsregistervolume/v1alpha1"
storagepolicyusagev1alpha2 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/storagepolicy/v1alpha2"
Expand Down Expand Up @@ -557,53 +558,65 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
setInstanceError(ctx, r, instance, "Duplicate Request")
return reconcile.Result{RequeueAfter: timeout}, nil
}
// Create PVC mapping to above created PV.
log.Infof("Creating PVC: %s", instance.Spec.PvcName)
pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb,
storageClassName, accessMode, pvName, datastoreAccessibleTopology, instance)

// Check if PVC already exists and has valid DataSourceRef
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, instance.Spec.PvcName, instance.Namespace)
if err != nil {
msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
log.Errorf("Failed to check existing PVC %s/%s with DataSourceRef: %+v", instance.Namespace,
instance.Spec.PvcName, err)
setInstanceError(ctx, r, instance, fmt.Sprintf("Failed to check existing PVC %s/%s with DataSourceRef: %+v",
instance.Namespace, instance.Spec.PvcName, err))
return reconcile.Result{RequeueAfter: timeout}, nil
}
log.Debugf("PVC spec is: %+v", pvcSpec)
pvc, err := k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx,
pvcSpec, metav1.CreateOptions{})
if err != nil {
if apierrors.IsAlreadyExists(err) {
log.Infof("PVC: %s already exists", instance.Spec.PvcName)
pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Get(ctx,
instance.Spec.PvcName, metav1.GetOptions{})

if pvc != nil {
log.Infof("PVC: %s already exists", instance.Spec.PvcName)
if pvc.Status.Phase == v1.ClaimBound && pvc.Spec.VolumeName != pvName {
// This is handle cases where PVC with this name already exists and
// is bound. This happens when a new CnsRegisterVolume instance is
// created to import a new volume with PVC name which is already
// created and is bound.
msg := fmt.Sprintf("Another PVC: %s already exists in namespace: %s which is Bound to a different PV",
instance.Spec.PvcName, instance.Namespace)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
// Untag the CNS volume which was created previously.
_, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false)
if err != nil {
msg := fmt.Sprintf("Failed to get PVC: %s on namespace: %s", instance.Spec.PvcName, instance.Namespace)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
if pvc.Status.Phase == v1.ClaimBound && pvc.Spec.VolumeName != pvName {
// This is handle cases where PVC with this name already exists and
// is bound. This happens when a new CnsRegisterVolume instance is
// created to import a new volume with PVC name which is already
// created and is bound.
msg := fmt.Sprintf("Another PVC: %s already exists in namespace: %s which is Bound to a different PV",
instance.Spec.PvcName, instance.Namespace)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
// Untag the CNS volume which was created previously.
_, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false)
log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err)
} else {
// Delete PV created above.
err = k8sclient.CoreV1().PersistentVolumes().Delete(ctx, pvName, *metav1.NewDeleteOptions(0))
if err != nil {
log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err)
} else {
// Delete PV created above.
err = k8sclient.CoreV1().PersistentVolumes().Delete(ctx, pvName, *metav1.NewDeleteOptions(0))
if err != nil {
log.Errorf("Failed to delete PV: %s with error: %+v", pvName, err)
}
log.Errorf("Failed to delete PV: %s with error: %+v", pvName, err)
}
return reconcile.Result{RequeueAfter: timeout}, nil
}
} else {
return reconcile.Result{RequeueAfter: timeout}, nil
}

if pvc.Spec.DataSourceRef != nil {
apiGroup := ""
if pvc.Spec.DataSourceRef.APIGroup != nil {
apiGroup = *pvc.Spec.DataSourceRef.APIGroup
}
log.Infof("PVC %s in namespace %s has valid DataSourceRef with apiGroup: %s, kind: %s, name: %s",
pvc.Name, pvc.Namespace, apiGroup, pvc.Spec.DataSourceRef.Kind, pvc.Spec.DataSourceRef.Name)
}
} else {
// Create PVC mapping to above created PV.
log.Infof("Creating PVC: %s", instance.Spec.PvcName)
pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb,
storageClassName, accessMode, pvName, datastoreAccessibleTopology, instance)
if err != nil {
msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err)
log.Errorf(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
log.Debugf("PVC spec is: %+v", pvcSpec)
pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx,
pvcSpec, metav1.CreateOptions{})
if err != nil {
log.Errorf("Failed to create PVC with spec: %+v. Error: %+v", pvcSpec, err)
setInstanceError(ctx, r, instance,
fmt.Sprintf("Failed to create PVC: %s for volume with err: %+v", instance.Spec.PvcName, err))
Expand All @@ -615,9 +628,9 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
setInstanceError(ctx, r, instance,
fmt.Sprintf("Delete PV %s failed with error: %+v", pvName, err))
return reconcile.Result{RequeueAfter: timeout}, nil
} else {
log.Infof("PVC: %s is created successfully", instance.Spec.PvcName)
}
} else {
log.Infof("PVC: %s is created successfully", instance.Spec.PvcName)
}
// Watch for PVC to be bound.
isBound, err := isPVCBound(ctx, k8sclient, pvc, time.Duration(1*time.Minute))
Expand Down Expand Up @@ -742,6 +755,59 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
return reconcile.Result{}, nil
}

// checkExistingPVCDataSourceRef checks if a PVC already exists and validates its DataSourceRef.
// Returns the PVC if it exists with no DataSourceRef or it exists with a supported DataSourceRef.
// If PVC exists but has an unsupported DataSourceRef, return (nil, error).
// If PVC does not exist, return (nil, nil) so that it will be created later.
func checkExistingPVCDataSourceRef(ctx context.Context, k8sclient clientset.Interface,
pvcName, namespace string) (*v1.PersistentVolumeClaim, error) {
log := logger.GetLogger(ctx)

// Try to get the existing PVC
existingPVC, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// PVC doesn't exist, return nil (we'll create a new one)
return nil, nil
}
// Some other error occurred
return nil, fmt.Errorf("failed to check existing PVC %s in namespace %s: %+v", pvcName, namespace, err)
}

// PVC exists, check if it has DataSourceRef
if existingPVC.Spec.DataSourceRef == nil {
log.Infof("Existing PVC %s in namespace %s has no DataSourceRef, can reuse", pvcName, namespace)
return existingPVC, nil
}

// Check if DataSourceRef matches supported types
apiGroup := ""
if existingPVC.Spec.DataSourceRef.APIGroup != nil {
apiGroup = *existingPVC.Spec.DataSourceRef.APIGroup
}

for _, supportedType := range supportedDataSourceTypes {
if supportedType.apiGroup == apiGroup && supportedType.kind == existingPVC.Spec.DataSourceRef.Kind {
log.Infof("Existing PVC %s in namespace %s has valid DataSourceRef (apiGroup: %s, kind: %s), can reuse",
pvcName, namespace, apiGroup, existingPVC.Spec.DataSourceRef.Kind)
return existingPVC, nil
}
}

// Check if DataSourceRef is VolumeSnapshot
if existingPVC.Spec.DataSourceRef.Kind == "VolumeSnapshot" &&
existingPVC.Spec.DataSourceRef.APIGroup != nil &&
*existingPVC.Spec.DataSourceRef.APIGroup == "snapshot.storage.k8s.io" {
log.Infof("WARNING: Existing PVC %s in namespace %s has valid VolumeSnapshot DataSourceRef, "+
"however, it is not supported for CNSRegisterVolume", pvcName, namespace)
}

// DataSourceRef is not supported
return nil, fmt.Errorf("existing PVC %s in namespace %s has unsupported DataSourceRef for CNSRegisterVolume. "+
"APIGroup: %s, Kind: %s is not supported. Supported types: %+v",
pvcName, namespace, apiGroup, existingPVC.Spec.DataSourceRef.Kind, supportedDataSourceTypes)
}

// validateCnsRegisterVolumeSpec validates the input params of
// CnsRegisterVolume instance.
func validateCnsRegisterVolumeSpec(ctx context.Context, instance *cnsregistervolumev1alpha1.CnsRegisterVolume) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
k8sfake "k8s.io/client-go/kubernetes/fake"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -613,6 +614,164 @@ var _ = Describe("Reconcile Accessibility Logic", func() {
})
})

var _ = Describe("checkExistingPVCDataSourceRef", func() {
var (
ctx context.Context
k8sclient *k8sfake.Clientset
namespace string
pvcName string
)

BeforeEach(func() {
ctx = context.Background()
k8sclient = k8sfake.NewSimpleClientset()
namespace = "test-namespace"
pvcName = "test-pvc"
})

Context("when PVC does not exist", func() {
It("should return nil without error", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).To(BeNil())
Expect(pvc).To(BeNil())
})
})

Context("when PVC exists without DataSourceRef", func() {
BeforeEach(func() {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
DataSourceRef: nil,
},
}
_, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())
})

It("should return the PVC without error", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).To(BeNil())
Expect(pvc).ToNot(BeNil())
Expect(pvc.Name).To(Equal(pvcName))
Expect(pvc.Spec.DataSourceRef).To(BeNil())
})
})

Context("when PVC exists with VolumeSnapshot DataSourceRef", func() {
BeforeEach(func() {
apiGroup := "snapshot.storage.k8s.io"
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
DataSourceRef: &corev1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: "VolumeSnapshot",
Name: "test-snapshot",
},
},
}
_, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())
})

It("should return an error since VolumeSnapshots are not supported for CNSRegisterVolume", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).ToNot(BeNil())
Expect(pvc).To(BeNil())
})
})

Context("when PVC exists with supported DataSourceRef", func() {
BeforeEach(func() {
apiGroup := "vmoperator.vmware.com"
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
DataSourceRef: &corev1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: "VirtualMachine",
Name: "test-vm",
},
},
}
_, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())
})

It("should return the PVC without error", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).To(BeNil())
Expect(pvc).ToNot(BeNil())
Expect(pvc.Name).To(Equal(pvcName))
Expect(pvc.Spec.DataSourceRef.Kind).To(Equal("VirtualMachine"))
Expect(*pvc.Spec.DataSourceRef.APIGroup).To(Equal("vmoperator.vmware.com"))
})
})

Context("when PVC exists with unsupported DataSourceRef", func() {
BeforeEach(func() {
apiGroup := "unsupported.example.com"
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
DataSourceRef: &corev1.TypedObjectReference{
APIGroup: &apiGroup,
Kind: "UnsupportedKind",
Name: "test-resource",
},
},
}
_, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())
})

It("should return an error", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).ToNot(BeNil())
Expect(pvc).To(BeNil())
})
})

Context("when PVC exists with empty APIGroup DataSourceRef", func() {
BeforeEach(func() {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Namespace: namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
DataSourceRef: &corev1.TypedObjectReference{
APIGroup: nil,
Kind: "SomeKind",
Name: "test-resource",
},
},
}
_, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())
})

It("should return an error for unsupported empty APIGroup", func() {
pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace)
Expect(err).ToNot(BeNil())
Expect(pvc).To(BeNil())
})
})
})

func TestCnsRegisterVolumeController(t *testing.T) {
backOffDuration = make(map[types.NamespacedName]time.Duration)

Expand Down
10 changes: 10 additions & 0 deletions pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ const (
scResourceNameSuffix = ".storageclass.storage.k8s.io/requests.storage"
)

var (
// Supported data source types for PVCs
supportedDataSourceTypes = []struct {
apiGroup string
kind string
}{
{"vmoperator.vmware.com", "VirtualMachine"},
}
)

// isDatastoreAccessibleToCluster verifies if the datastoreUrl is accessible to
// cluster with clusterID.
func isDatastoreAccessibleToCluster(ctx context.Context, vc *vsphere.VirtualCenter,
Expand Down