Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -44,9 +45,8 @@ import (
cnsoperatortypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"

clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config"

clientset "k8s.io/client-go/kubernetes"
clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config"
apis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
cnsregistervolumev1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsregistervolume/v1alpha1"
storagepolicyusagev1alpha2 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/storagepolicy/v1alpha2"
Expand Down Expand Up @@ -563,7 +563,8 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,

// Validate topology compatibility if PVC exists and can be reused
if topologyMgr != nil {
err = validatePVCTopologyCompatibility(ctx, pvc, volume.DatastoreUrl, topologyMgr, vc)
err = validatePVCTopologyCompatibility(ctx, k8sclient, pvc, volume.DatastoreUrl, topologyMgr, vc,
datastoreAccessibleTopology)
if err != nil {
msg := fmt.Sprintf("PVC topology validation failed: %v", err)
log.Error(msg)
Expand Down Expand Up @@ -810,17 +811,43 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,

// validatePVCTopologyCompatibility checks if the existing PVC's topology annotation is compatible
// with the volume's actual placement zone.
func validatePVCTopologyCompatibility(ctx context.Context, pvc *v1.PersistentVolumeClaim,
func validatePVCTopologyCompatibility(ctx context.Context, k8sclient clientset.Interface, pvc *v1.PersistentVolumeClaim,
volumeDatastoreURL string, topologyMgr commoncotypes.ControllerTopologyService,
vc *cnsvsphere.VirtualCenter) error {
vc *cnsvsphere.VirtualCenter, datastoreAccessibleTopology []map[string]string) error {
log := logger.GetLogger(ctx)

// Check if PVC has topology annotation
topologyAnnotation, exists := pvc.Annotations[common.AnnVolumeAccessibleTopology]
if !exists || topologyAnnotation == "" {
// No topology annotation on PVC, skip validation
log.Debugf("PVC %s/%s has no topology annotation, skipping topology validation",
// No topology annotation on PVC, add topology annotation and skip validation
log.Debugf("PVC %s/%s has no topology annotation, adding topology annotation",
pvc.Namespace, pvc.Name)
var segmentsArray []string
for _, topologyTerm := range datastoreAccessibleTopology {
jsonSegment, err := json.Marshal(topologyTerm)
if err != nil {
return logger.LogNewErrorf(log,
"failed to marshal topology segment: %+v to json. Error: %+v", topologyTerm, err)
}
segmentsArray = append(segmentsArray, string(jsonSegment))
}
topologyAnnotation = "[" + strings.Join(segmentsArray, ",") + "]"

if pvc.Annotations == nil {
pvc.Annotations = make(map[string]string)
}
pvc.Annotations[common.AnnVolumeAccessibleTopology] = topologyAnnotation

// Update the PVC in Kubernetes to persist the topology annotation
_, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
if err != nil {
return logger.LogNewErrorf(log, "failed to update PVC %s/%s with topology annotation: %+v",
pvc.Namespace, pvc.Name, err)
}

log.Infof("Successfully added topology annotation %s to PVC %s/%s",
topologyAnnotation, pvc.Namespace, pvc.Name)
// Return nil as we just added the topology annotation based on actual volume placement
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cnsregistervolume

import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -791,30 +792,43 @@ var _ = Describe("checkExistingPVCDataSourceRef", func() {

var _ = Describe("validatePVCTopologyCompatibility", func() {
var (
ctx context.Context
pvc *corev1.PersistentVolumeClaim
volumeDatastoreURL string
mockTopologyMgr *mockTopologyService
mockVC *cnsvsphere.VirtualCenter
ctx context.Context
pvc *corev1.PersistentVolumeClaim
volumeDatastoreURL string
mockTopologyMgr *mockTopologyService
mockVC *cnsvsphere.VirtualCenter
datastoreAccessibleTopology []map[string]string
mockK8sClient *k8sfake.Clientset
)

BeforeEach(func() {
ctx = context.Background()
volumeDatastoreURL = "dummy-datastore-url"
mockTopologyMgr = &mockTopologyService{}
mockVC = &cnsvsphere.VirtualCenter{}
datastoreAccessibleTopology = []map[string]string{
{"topology.kubernetes.io/zone": "zone-1"},
}

pvc = &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pvc",
Namespace: "test-namespace",
},
}

// Create a fake Kubernetes client
mockK8sClient = k8sfake.NewSimpleClientset()
})

Context("when PVC has no topology annotation", func() {
It("should return nil without error", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
// Add the PVC to the fake client so it can be updated
_, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())

err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).To(BeNil())
})
})
Expand All @@ -827,7 +841,12 @@ var _ = Describe("validatePVCTopologyCompatibility", func() {
})

It("should return nil without error", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
// Add the PVC to the fake client so it can be updated
_, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())

err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).To(BeNil())
})
})
Expand All @@ -840,7 +859,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() {
})

It("should return error for invalid JSON", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).ToNot(BeNil())
Expect(err.Error()).To(ContainSubstring("failed to parse topology annotation"))
})
Expand All @@ -855,7 +875,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() {
})

It("should return error from topology manager", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).ToNot(BeNil())
Expect(err.Error()).To(ContainSubstring("failed to get topology for volume datastore"))
})
Expand All @@ -872,7 +893,8 @@ var _ = Describe("validatePVCTopologyCompatibility", func() {
})

It("should return nil without error", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).To(BeNil())
})
})
Expand All @@ -888,11 +910,124 @@ var _ = Describe("validatePVCTopologyCompatibility", func() {
})

It("should return error for incompatible zones", func() {
err := validatePVCTopologyCompatibility(ctx, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC)
err := validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).ToNot(BeNil())
Expect(err.Error()).To(ContainSubstring("is not compatible with volume placement"))
})
})

Context("when PVC exists without topology annotation and annotation needs to be added", func() {
var originalPVC *corev1.PersistentVolumeClaim

BeforeEach(func() {
// Create a PVC with some existing annotations but no topology annotation
originalPVC = &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "existing-pvc",
Namespace: "test-namespace",
Annotations: map[string]string{
"some.other/annotation": "existing-value",
"another/annotation": "another-value",
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
},
}
pvc = originalPVC
})

It("should add topology annotation to existing PVC and return nil", func() {
// Verify PVC initially has no topology annotation
_, exists := pvc.Annotations["csi.vsphere.volume-accessible-topology"]
Expect(exists).To(BeFalse())

// Add the PVC to the fake client so it can be updated
_, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())

// Call the function
err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
datastoreAccessibleTopology)
Expect(err).To(BeNil())

// Verify topology annotation was added
topologyAnnotation, exists := pvc.Annotations["csi.vsphere.volume-accessible-topology"]
Expect(exists).To(BeTrue())
Expect(topologyAnnotation).ToNot(BeEmpty())

// Verify the annotation contains the expected topology data
expectedAnnotation := `[{"topology.kubernetes.io/zone":"zone-1"}]`
Expect(topologyAnnotation).To(Equal(expectedAnnotation))

// Verify existing annotations are preserved
Expect(pvc.Annotations["some.other/annotation"]).To(Equal("existing-value"))
Expect(pvc.Annotations["another/annotation"]).To(Equal("another-value"))
})

It("should handle PVC with nil annotations map", func() {
// Create PVC with nil annotations
pvcWithNilAnnotations := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc-nil-annotations",
Namespace: "test-namespace",
Annotations: nil,
},
}

// Add the PVC to the fake client so it can be updated
_, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvcWithNilAnnotations.Namespace).Create(ctx,
pvcWithNilAnnotations, metav1.CreateOptions{})
Expect(err).To(BeNil())

// Call the function
err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvcWithNilAnnotations, volumeDatastoreURL,
mockTopologyMgr, mockVC, datastoreAccessibleTopology)
Expect(err).To(BeNil())

// Verify annotations map was created and topology annotation was added
Expect(pvcWithNilAnnotations.Annotations).ToNot(BeNil())
topologyAnnotation, exists := pvcWithNilAnnotations.Annotations["csi.vsphere.volume-accessible-topology"]
Expect(exists).To(BeTrue())
Expect(topologyAnnotation).To(Equal(`[{"topology.kubernetes.io/zone":"zone-1"}]`))
})

It("should handle complex topology data with multiple zones", func() {
// Use more complex topology data
complexTopology := []map[string]string{
{"topology.kubernetes.io/zone": "zone-a", "topology.kubernetes.io/region": "us-west"},
{"topology.kubernetes.io/zone": "zone-b", "topology.kubernetes.io/region": "us-west"},
}

// Add the PVC to the fake client so it can be updated
_, err := mockK8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
Expect(err).To(BeNil())

err = validatePVCTopologyCompatibility(ctx, mockK8sClient, pvc, volumeDatastoreURL, mockTopologyMgr, mockVC,
complexTopology)
Expect(err).To(BeNil())

// Verify the complex topology was properly serialized
topologyAnnotation := pvc.Annotations["csi.vsphere.volume-accessible-topology"]
Expect(topologyAnnotation).ToNot(BeEmpty())

// Parse the annotation to verify it contains both topology segments
var parsedTopology []map[string]string
err = json.Unmarshal([]byte(topologyAnnotation), &parsedTopology)
Expect(err).To(BeNil())
Expect(len(parsedTopology)).To(Equal(2))
Expect(parsedTopology[0]).To(Equal(map[string]string{
"topology.kubernetes.io/zone": "zone-a",
"topology.kubernetes.io/region": "us-west",
}))
Expect(parsedTopology[1]).To(Equal(map[string]string{
"topology.kubernetes.io/zone": "zone-b",
"topology.kubernetes.io/region": "us-west",
}))
})

})
})

var _ = Describe("isTopologyCompatible", func() {
Expand Down