diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go index 960bdf0723..42c3eb39a3 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "reflect" + "strings" "sync" "time" @@ -44,9 +45,8 @@ import ( cnsoperatortypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util" - clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config" - clientset "k8s.io/client-go/kubernetes" + clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config" apis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator" cnsregistervolumev1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsregistervolume/v1alpha1" storagepolicyusagev1alpha2 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/storagepolicy/v1alpha2" @@ -563,7 +563,8 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, // Validate topology compatibility if PVC exists and can be reused if topologyMgr != nil { - err = validatePVCTopologyCompatibility(ctx, pvc, volume.DatastoreUrl, topologyMgr, vc) + err = validatePVCTopologyCompatibility(ctx, k8sclient, pvc, volume.DatastoreUrl, topologyMgr, vc, + datastoreAccessibleTopology) if err != nil { msg := fmt.Sprintf("PVC topology validation failed: %v", err) log.Error(msg) @@ -810,17 +811,43 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context, // validatePVCTopologyCompatibility checks if the existing PVC's topology annotation is compatible // with the volume's actual placement zone. -func validatePVCTopologyCompatibility(ctx context.Context, pvc *v1.PersistentVolumeClaim, +func validatePVCTopologyCompatibility(ctx context.Context, k8sclient clientset.Interface, pvc *v1.PersistentVolumeClaim, volumeDatastoreURL string, topologyMgr commoncotypes.ControllerTopologyService, - vc *cnsvsphere.VirtualCenter) error { + vc *cnsvsphere.VirtualCenter, datastoreAccessibleTopology []map[string]string) error { log := logger.GetLogger(ctx) // Check if PVC has topology annotation topologyAnnotation, exists := pvc.Annotations[common.AnnVolumeAccessibleTopology] if !exists || topologyAnnotation == "" { - // No topology annotation on PVC, skip validation - log.Debugf("PVC %s/%s has no topology annotation, skipping topology validation", + // No topology annotation on PVC, add topology annotation and skip validation + log.Debugf("PVC %s/%s has no topology annotation, adding topology annotation", pvc.Namespace, pvc.Name) + var segmentsArray []string + for _, topologyTerm := range datastoreAccessibleTopology { + jsonSegment, err := json.Marshal(topologyTerm) + if err != nil { + return logger.LogNewErrorf(log, + "failed to marshal topology segment: %+v to json. Error: %+v", topologyTerm, err) + } + segmentsArray = append(segmentsArray, string(jsonSegment)) + } + topologyAnnotation = "[" + strings.Join(segmentsArray, ",") + "]" + + if pvc.Annotations == nil { + pvc.Annotations = make(map[string]string) + } + pvc.Annotations[common.AnnVolumeAccessibleTopology] = topologyAnnotation + + // Update the PVC in Kubernetes to persist the topology annotation + _, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + if err != nil { + return logger.LogNewErrorf(log, "failed to update PVC %s/%s with topology annotation: %+v", + pvc.Namespace, pvc.Name, err) + } + + log.Infof("Successfully added topology annotation %s to PVC %s/%s", + topologyAnnotation, pvc.Namespace, pvc.Name) + // Return nil as we just added the topology annotation based on actual volume placement return nil } diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go index 519dd5142b..edaf6eaf27 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go @@ -18,6 +18,7 @@ package cnsregistervolume import ( "context" + "encoding/json" "fmt" "reflect" "testing" @@ -791,11 +792,13 @@ var _ = Describe("checkExistingPVCDataSourceRef", func() { var _ = Describe("validatePVCTopologyCompatibility", func() { var ( - ctx context.Context - pvc *corev1.PersistentVolumeClaim - volumeDatastoreURL string - mockTopologyMgr *mockTopologyService - mockVC *cnsvsphere.VirtualCenter + ctx context.Context + pvc *corev1.PersistentVolumeClaim + volumeDatastoreURL string + mockTopologyMgr *mockTopologyService + mockVC *cnsvsphere.VirtualCenter + datastoreAccessibleTopology []map[string]string + mockK8sClient *k8sfake.Clientset ) BeforeEach(func() { @@ -803,6 +806,9 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { volumeDatastoreURL = "dummy-datastore-url" mockTopologyMgr = &mockTopologyService{} mockVC = &cnsvsphere.VirtualCenter{} + datastoreAccessibleTopology = []map[string]string{ + {"topology.kubernetes.io/zone": "zone-1"}, + } pvc = &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ @@ -810,11 +816,19 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { Namespace: "test-namespace", }, } + + // Create a fake Kubernetes client + mockK8sClient = k8sfake.NewSimpleClientset() }) Context("when PVC has no topology annotation", func() { It("should return nil without error", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + // Add the PVC to the fake client so it can be updated + _, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).To(BeNil()) }) }) @@ -827,7 +841,12 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { }) It("should return nil without error", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + // Add the PVC to the fake client so it can be updated + _, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).To(BeNil()) }) }) @@ -840,7 +859,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { }) It("should return error for invalid JSON", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(ContainSubstring("failed to parse topology annotation")) }) @@ -855,7 +875,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { }) It("should return error from topology manager", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(ContainSubstring("failed to get topology for volume datastore")) }) @@ -872,7 +893,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { }) It("should return nil without error", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).To(BeNil()) }) }) @@ -888,11 +910,124 @@ var _ = Describe("validatePVCTopologyCompatibility", func() { }) It("should return error for incompatible zones", func() { - err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC) + err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(ContainSubstring("is not compatible with volume placement")) }) }) + + Context("when PVC exists without topology annotation and annotation needs to be added", func() { + var originalPVC *corev1.PersistentVolumeClaim + + BeforeEach(func() { + // Create a PVC with some existing annotations but no topology annotation + originalPVC = &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "existing-pvc", + Namespace: "test-namespace", + Annotations: map[string]string{ + "some.other/annotation": "existing-value", + "another/annotation": "another-value", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + }, + } + pvc = originalPVC + }) + + It("should add topology annotation to existing PVC and return nil", func() { + // Verify PVC initially has no topology annotation + _, exists := pvc.Annotations["csi.vsphere.volume-accessible-topology"] + Expect(exists).To(BeFalse()) + + // Add the PVC to the fake client so it can be updated + _, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // Call the function + err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + datastoreAccessibleTopology) + Expect(err).To(BeNil()) + + // Verify topology annotation was added + topologyAnnotation, exists := pvc.Annotations["csi.vsphere.volume-accessible-topology"] + Expect(exists).To(BeTrue()) + Expect(topologyAnnotation).ToNot(BeEmpty()) + + // Verify the annotation contains the expected topology data + expectedAnnotation := `[{"topology.kubernetes.io/zone":"zone-1"}]` + Expect(topologyAnnotation).To(Equal(expectedAnnotation)) + + // Verify existing annotations are preserved + Expect(pvc.Annotations["some.other/annotation"]).To(Equal("existing-value")) + Expect(pvc.Annotations["another/annotation"]).To(Equal("another-value")) + }) + + It("should handle PVC with nil annotations map", func() { + // Create PVC with nil annotations + pvcWithNilAnnotations := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-nil-annotations", + Namespace: "test-namespace", + Annotations: nil, + }, + } + + // Add the PVC to the fake client so it can be updated + _, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvcWithNilAnnotations.Namespace).Create(ctx, + pvcWithNilAnnotations, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // Call the function + err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvcWithNilAnnotations, volumeDatastoreURL, + mockTopologyMgr, mockVC, datastoreAccessibleTopology) + Expect(err).To(BeNil()) + + // Verify annotations map was created and topology annotation was added + Expect(pvcWithNilAnnotations.Annotations).ToNot(BeNil()) + topologyAnnotation, exists := pvcWithNilAnnotations.Annotations["csi.vsphere.volume-accessible-topology"] + Expect(exists).To(BeTrue()) + Expect(topologyAnnotation).To(Equal(`[{"topology.kubernetes.io/zone":"zone-1"}]`)) + }) + + It("should handle complex topology data with multiple zones", func() { + // Use more complex topology data + complexTopology := []map[string]string{ + {"topology.kubernetes.io/zone": "zone-a", "topology.kubernetes.io/region": "us-west"}, + {"topology.kubernetes.io/zone": "zone-b", "topology.kubernetes.io/region": "us-west"}, + } + + // Add the PVC to the fake client so it can be updated + _, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC, + complexTopology) + Expect(err).To(BeNil()) + + // Verify the complex topology was properly serialized + topologyAnnotation := pvc.Annotations["csi.vsphere.volume-accessible-topology"] + Expect(topologyAnnotation).ToNot(BeEmpty()) + + // Parse the annotation to verify it contains both topology segments + var parsedTopology []map[string]string + err = json.Unmarshal([]byte(topologyAnnotation), &parsedTopology) + Expect(err).To(BeNil()) + Expect(len(parsedTopology)).To(Equal(2)) + Expect(parsedTopology[0]).To(Equal(map[string]string{ + "topology.kubernetes.io/zone": "zone-a", + "topology.kubernetes.io/region": "us-west", + })) + Expect(parsedTopology[1]).To(Equal(map[string]string{ + "topology.kubernetes.io/zone": "zone-b", + "topology.kubernetes.io/region": "us-west", + })) + }) + + }) }) var _ = Describe("isTopologyCompatible", func() {