diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go index 35ce059891..501c2c32bd 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go @@ -43,6 +43,7 @@ import ( clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config" + clientset "k8s.io/client-go/kubernetes" apis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator" cnsregistervolumev1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsregistervolume/v1alpha1" storagepolicyusagev1alpha2 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/storagepolicy/v1alpha2" @@ -557,53 +558,65 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, setInstanceError(ctx, r, instance, "Duplicate Request") return reconcile.Result{RequeueAfter: timeout}, nil } - // Create PVC mapping to above created PV. - log.Infof("Creating PVC: %s", instance.Spec.PvcName) - pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb, - storageClassName, accessMode, pvName, datastoreAccessibleTopology, instance) + + // Check if PVC already exists and has valid DataSourceRef + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, instance.Spec.PvcName, instance.Namespace) if err != nil { - msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err) - log.Errorf(msg) - setInstanceError(ctx, r, instance, msg) + log.Errorf("Failed to check existing PVC %s/%s with DataSourceRef: %+v", instance.Namespace, + instance.Spec.PvcName, err) + setInstanceError(ctx, r, instance, fmt.Sprintf("Failed to check existing PVC %s/%s with DataSourceRef: %+v", + instance.Namespace, instance.Spec.PvcName, err)) return reconcile.Result{RequeueAfter: timeout}, nil } - log.Debugf("PVC spec is: %+v", pvcSpec) - pvc, err := k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx, - pvcSpec, metav1.CreateOptions{}) - if err != nil { - if apierrors.IsAlreadyExists(err) { - log.Infof("PVC: %s already exists", instance.Spec.PvcName) - pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Get(ctx, - instance.Spec.PvcName, metav1.GetOptions{}) + + if pvc != nil { + log.Infof("PVC: %s already exists", instance.Spec.PvcName) + if pvc.Status.Phase == v1.ClaimBound && pvc.Spec.VolumeName != pvName { + // This is handle cases where PVC with this name already exists and + // is bound. This happens when a new CnsRegisterVolume instance is + // created to import a new volume with PVC name which is already + // created and is bound. + msg := fmt.Sprintf("Another PVC: %s already exists in namespace: %s which is Bound to a different PV", + instance.Spec.PvcName, instance.Namespace) + log.Errorf(msg) + setInstanceError(ctx, r, instance, msg) + // Untag the CNS volume which was created previously. + _, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false) if err != nil { - msg := fmt.Sprintf("Failed to get PVC: %s on namespace: %s", instance.Spec.PvcName, instance.Namespace) - log.Errorf(msg) - setInstanceError(ctx, r, instance, msg) - return reconcile.Result{RequeueAfter: timeout}, nil - } - if pvc.Status.Phase == v1.ClaimBound && pvc.Spec.VolumeName != pvName { - // This is handle cases where PVC with this name already exists and - // is bound. This happens when a new CnsRegisterVolume instance is - // created to import a new volume with PVC name which is already - // created and is bound. - msg := fmt.Sprintf("Another PVC: %s already exists in namespace: %s which is Bound to a different PV", - instance.Spec.PvcName, instance.Namespace) - log.Errorf(msg) - setInstanceError(ctx, r, instance, msg) - // Untag the CNS volume which was created previously. - _, err = common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false) + log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err) + } else { + // Delete PV created above. + err = k8sclient.CoreV1().PersistentVolumes().Delete(ctx, pvName, *metav1.NewDeleteOptions(0)) if err != nil { - log.Errorf("Failed to untag CNS volume: %s with error: %+v", volumeID, err) - } else { - // Delete PV created above. - err = k8sclient.CoreV1().PersistentVolumes().Delete(ctx, pvName, *metav1.NewDeleteOptions(0)) - if err != nil { - log.Errorf("Failed to delete PV: %s with error: %+v", pvName, err) - } + log.Errorf("Failed to delete PV: %s with error: %+v", pvName, err) } - return reconcile.Result{RequeueAfter: timeout}, nil } - } else { + return reconcile.Result{RequeueAfter: timeout}, nil + } + + if pvc.Spec.DataSourceRef != nil { + apiGroup := "" + if pvc.Spec.DataSourceRef.APIGroup != nil { + apiGroup = *pvc.Spec.DataSourceRef.APIGroup + } + log.Infof("PVC %s in namespace %s has valid DataSourceRef with apiGroup: %s, kind: %s, name: %s", + pvc.Name, pvc.Namespace, apiGroup, pvc.Spec.DataSourceRef.Kind, pvc.Spec.DataSourceRef.Name) + } + } else { + // Create PVC mapping to above created PV. + log.Infof("Creating PVC: %s", instance.Spec.PvcName) + pvcSpec, err := getPersistentVolumeClaimSpec(ctx, instance.Spec.PvcName, instance.Namespace, capacityInMb, + storageClassName, accessMode, pvName, datastoreAccessibleTopology, instance) + if err != nil { + msg := fmt.Sprintf("Failed to create spec for PVC: %q. Error: %v", instance.Spec.PvcName, err) + log.Errorf(msg) + setInstanceError(ctx, r, instance, msg) + return reconcile.Result{RequeueAfter: timeout}, nil + } + log.Debugf("PVC spec is: %+v", pvcSpec) + pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Create(ctx, + pvcSpec, metav1.CreateOptions{}) + if err != nil { log.Errorf("Failed to create PVC with spec: %+v. Error: %+v", pvcSpec, err) setInstanceError(ctx, r, instance, fmt.Sprintf("Failed to create PVC: %s for volume with err: %+v", instance.Spec.PvcName, err)) @@ -615,9 +628,9 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, setInstanceError(ctx, r, instance, fmt.Sprintf("Delete PV %s failed with error: %+v", pvName, err)) return reconcile.Result{RequeueAfter: timeout}, nil + } else { + log.Infof("PVC: %s is created successfully", instance.Spec.PvcName) } - } else { - log.Infof("PVC: %s is created successfully", instance.Spec.PvcName) } // Watch for PVC to be bound. isBound, err := isPVCBound(ctx, k8sclient, pvc, time.Duration(1*time.Minute)) @@ -742,6 +755,59 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, return reconcile.Result{}, nil } +// checkExistingPVCDataSourceRef checks if a PVC already exists and validates its DataSourceRef. +// Returns the PVC if it exists with no DataSourceRef or it exists with a supported DataSourceRef. +// If PVC exists but has an unsupported DataSourceRef, return (nil, error). +// If PVC does not exist, return (nil, nil) so that it will be created later. +func checkExistingPVCDataSourceRef(ctx context.Context, k8sclient clientset.Interface, + pvcName, namespace string) (*v1.PersistentVolumeClaim, error) { + log := logger.GetLogger(ctx) + + // Try to get the existing PVC + existingPVC, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + // PVC doesn't exist, return nil (we'll create a new one) + return nil, nil + } + // Some other error occurred + return nil, fmt.Errorf("failed to check existing PVC %s in namespace %s: %+v", pvcName, namespace, err) + } + + // PVC exists, check if it has DataSourceRef + if existingPVC.Spec.DataSourceRef == nil { + log.Infof("Existing PVC %s in namespace %s has no DataSourceRef, can reuse", pvcName, namespace) + return existingPVC, nil + } + + // Check if DataSourceRef matches supported types + apiGroup := "" + if existingPVC.Spec.DataSourceRef.APIGroup != nil { + apiGroup = *existingPVC.Spec.DataSourceRef.APIGroup + } + + for _, supportedType := range supportedDataSourceTypes { + if supportedType.apiGroup == apiGroup && supportedType.kind == existingPVC.Spec.DataSourceRef.Kind { + log.Infof("Existing PVC %s in namespace %s has valid DataSourceRef (apiGroup: %s, kind: %s), can reuse", + pvcName, namespace, apiGroup, existingPVC.Spec.DataSourceRef.Kind) + return existingPVC, nil + } + } + + // Check if DataSourceRef is VolumeSnapshot + if existingPVC.Spec.DataSourceRef.Kind == "VolumeSnapshot" && + existingPVC.Spec.DataSourceRef.APIGroup != nil && + *existingPVC.Spec.DataSourceRef.APIGroup == "snapshot.storage.k8s.io" { + log.Infof("WARNING: Existing PVC %s in namespace %s has valid VolumeSnapshot DataSourceRef, "+ + "however, it is not supported for CNSRegisterVolume", pvcName, namespace) + } + + // DataSourceRef is not supported + return nil, fmt.Errorf("existing PVC %s in namespace %s has unsupported DataSourceRef for CNSRegisterVolume. "+ + "APIGroup: %s, Kind: %s is not supported. Supported types: %+v", + pvcName, namespace, apiGroup, existingPVC.Spec.DataSourceRef.Kind, supportedDataSourceTypes) +} + // validateCnsRegisterVolumeSpec validates the input params of // CnsRegisterVolume instance. func validateCnsRegisterVolumeSpec(ctx context.Context, instance *cnsregistervolumev1alpha1.CnsRegisterVolume) error { diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go index 2dbb6154dc..ffd39ce0b7 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" clientgoscheme "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -613,6 +614,164 @@ var _ = Describe("Reconcile Accessibility Logic", func() { }) }) +var _ = Describe("checkExistingPVCDataSourceRef", func() { + var ( + ctx context.Context + k8sclient *k8sfake.Clientset + namespace string + pvcName string + ) + + BeforeEach(func() { + ctx = context.Background() + k8sclient = k8sfake.NewSimpleClientset() + namespace = "test-namespace" + pvcName = "test-pvc" + }) + + Context("when PVC does not exist", func() { + It("should return nil without error", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).To(BeNil()) + Expect(pvc).To(BeNil()) + }) + }) + + Context("when PVC exists without DataSourceRef", func() { + BeforeEach(func() { + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + DataSourceRef: nil, + }, + } + _, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + }) + + It("should return the PVC without error", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).To(BeNil()) + Expect(pvc).ToNot(BeNil()) + Expect(pvc.Name).To(Equal(pvcName)) + Expect(pvc.Spec.DataSourceRef).To(BeNil()) + }) + }) + + Context("when PVC exists with VolumeSnapshot DataSourceRef", func() { + BeforeEach(func() { + apiGroup := "snapshot.storage.k8s.io" + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + DataSourceRef: &corev1.TypedObjectReference{ + APIGroup: &apiGroup, + Kind: "VolumeSnapshot", + Name: "test-snapshot", + }, + }, + } + _, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + }) + + It("should return an error since VolumeSnapshots are not supported for CNSRegisterVolume", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).ToNot(BeNil()) + Expect(pvc).To(BeNil()) + }) + }) + + Context("when PVC exists with supported DataSourceRef", func() { + BeforeEach(func() { + apiGroup := "vmoperator.vmware.com" + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + DataSourceRef: &corev1.TypedObjectReference{ + APIGroup: &apiGroup, + Kind: "VirtualMachine", + Name: "test-vm", + }, + }, + } + _, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + }) + + It("should return the PVC without error", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).To(BeNil()) + Expect(pvc).ToNot(BeNil()) + Expect(pvc.Name).To(Equal(pvcName)) + Expect(pvc.Spec.DataSourceRef.Kind).To(Equal("VirtualMachine")) + Expect(*pvc.Spec.DataSourceRef.APIGroup).To(Equal("vmoperator.vmware.com")) + }) + }) + + Context("when PVC exists with unsupported DataSourceRef", func() { + BeforeEach(func() { + apiGroup := "unsupported.example.com" + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + DataSourceRef: &corev1.TypedObjectReference{ + APIGroup: &apiGroup, + Kind: "UnsupportedKind", + Name: "test-resource", + }, + }, + } + _, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + }) + + It("should return an error", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).ToNot(BeNil()) + Expect(pvc).To(BeNil()) + }) + }) + + Context("when PVC exists with empty APIGroup DataSourceRef", func() { + BeforeEach(func() { + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName, + Namespace: namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + DataSourceRef: &corev1.TypedObjectReference{ + APIGroup: nil, + Kind: "SomeKind", + Name: "test-resource", + }, + }, + } + _, err := k8sclient.CoreV1().PersistentVolumeClaims(namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + }) + + It("should return an error for unsupported empty APIGroup", func() { + pvc, err := checkExistingPVCDataSourceRef(ctx, k8sclient, pvcName, namespace) + Expect(err).ToNot(BeNil()) + Expect(pvc).To(BeNil()) + }) + }) +}) + func TestCnsRegisterVolumeController(t *testing.T) { backOffDuration = make(map[types.NamespacedName]time.Duration) diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go index 2cc6cbe605..69a66eaf34 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/util.go @@ -49,6 +49,16 @@ const ( scResourceNameSuffix = ".storageclass.storage.k8s.io/requests.storage" ) +var ( + // Supported data source types for PVCs + supportedDataSourceTypes = []struct { + apiGroup string + kind string + }{ + {"vmoperator.vmware.com", "VirtualMachine"}, + } +) + // isDatastoreAccessibleToCluster verifies if the datastoreUrl is accessible to // cluster with clusterID. func isDatastoreAccessibleToCluster(ctx context.Context, vc *vsphere.VirtualCenter,