From 55abf9fb23154195eb43b210a3a6947ca7e2d09b Mon Sep 17 00:00:00 2001 From: Chethan Venkatesh Date: Fri, 14 Nov 2025 11:59:49 -0800 Subject: [PATCH] Use Patch operation for PV & PVC updates --- .../k8sorchestrator/k8sorchestrator.go | 66 +++++++++++++--- .../k8sorchestrator/k8sorchestrator_helper.go | 28 ++++++- pkg/csi/service/wcpguest/controller.go | 67 +++++++++++++++-- pkg/kubernetes/kubernetes.go | 75 +++++++++++++++++-- .../cnsregistervolume_controller.go | 25 ++++++- pkg/syncer/fullsync.go | 66 ++++++++++++++-- pkg/syncer/fullsync_test.go | 4 +- .../pv_to_backingdiskobjectid_mapping.go | 60 +++++++++++++-- pkg/syncer/pvcsi_fullsync.go | 68 +++++++++++++++-- pkg/syncer/pvcsi_metadatasyncer.go | 31 +++++++- pkg/syncer/syncer_test.go | 26 ++++++- pkg/syncer/volume_health.go | 62 +++++++++++++-- 12 files changed, 519 insertions(+), 59 deletions(-) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 752758eb41..34f5eebb80 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -19,6 +19,7 @@ package k8sorchestrator import ( "context" "crypto/sha256" + "encoding/json" "errors" "fmt" "os" @@ -44,6 +45,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -2173,15 +2175,36 @@ func (c *K8sOrchestrator) PreLinkedCloneCreateAction(ctx context.Context, pvcNam return err } - if linkedClonePVC.Labels == nil { - linkedClonePVC.Labels = make(map[string]string) + oldData, err := json.Marshal(linkedClonePVC) + if err != nil { + log.Errorf("Failed to marshal PVC %s/%s: %v", pvcNamespace, pvcName, err) + return err + } + + newPVC := linkedClonePVC.DeepCopy() + if newPVC.Labels == nil { + newPVC.Labels = make(map[string]string) } // Add label - if _, ok := linkedClonePVC.Labels[common.AnnKeyLinkedClone]; !ok { - linkedClonePVC.Labels[common.LinkedClonePVCLabel] = linkedClonePVC.Annotations[common.AttributeIsLinkedClone] + if _, ok := newPVC.Labels[common.AnnKeyLinkedClone]; !ok { + newPVC.Labels[common.LinkedClonePVCLabel] = newPVC.Annotations[common.AttributeIsLinkedClone] + } + + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("Failed to marshal updated PVC %s/%s with labels: %v", pvcNamespace, pvcName, err) + return err } - _, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, linkedClonePVC, metav1.UpdateOptions{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, linkedClonePVC) + if err != nil { + log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err) + return err + } + + _, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, linkedClonePVC.Name, + k8stypes.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("failed to add linked clone label for PVC %s/%s. Error: %+v, retrying...", pvcNamespace, pvcName, err) @@ -2229,11 +2252,36 @@ func (c *K8sOrchestrator) UpdatePersistentVolumeLabel(ctx context.Context, if err != nil { return fmt.Errorf("error getting PV %s from API server: %w", pvName, err) } - if pv.Labels == nil { - pv.Labels = make(map[string]string) + + oldData, err := json.Marshal(pv) + if err != nil { + errMsg := fmt.Sprintf("Failed to marshal PV %s: %v", pvName, err) + log.Error(errMsg) + return err + } + + newPV := pv.DeepCopy() + if newPV.Labels == nil { + newPV.Labels = make(map[string]string) + } + newPV.Labels[key] = value + + newData, err := json.Marshal(newPV) + if err != nil { + errMsg := fmt.Sprintf("Failed to marshal updated PV %s with labels: %v", pvName, err) + log.Error(errMsg) + return err } - pv.Labels[key] = value - _, err = c.k8sClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{}) + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pv) + if err != nil { + errMsg := fmt.Sprintf("Error creating two way merge patch for PV %s with error: %v", pvName, err) + log.Error(errMsg) + return err + } + + _, err = c.k8sClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, k8stypes.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { errMsg := fmt.Sprintf("error updating PV %s with labels %s/%s. Error: %v", pvName, key, value, err) log.Error(errMsg) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_helper.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_helper.go index 20b34d356c..7108eb9644 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_helper.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_helper.go @@ -23,6 +23,7 @@ import ( "time" k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" v1 "k8s.io/api/core/v1" @@ -83,17 +84,38 @@ func (c *K8sOrchestrator) updatePVCAnnotations(ctx context.Context, return err } + oldData, err := json.Marshal(pvcObj) + if err != nil { + log.Errorf("failed to marshal PVC %s/%s: %v", pvcNamespace, pvcName, err) + return err + } + + newPVC := pvcObj.DeepCopy() for key, val := range annotations { // If value is not set, remove the annotation. if val == "" { - delete(pvcObj.ObjectMeta.Annotations, key) + delete(newPVC.ObjectMeta.Annotations, key) log.Debugf("Removing annotation %s on pvc %s/%s", key, pvcNamespace, pvcName) } else { - metav1.SetMetaDataAnnotation(&pvcObj.ObjectMeta, key, val) + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, key, val) log.Debugf("Updating annotation %s on pvc %s/%s to value: %s", key, pvcNamespace, pvcName, val) } } - _, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvcObj, metav1.UpdateOptions{}) + + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("failed to marshal updated PVC %s/%s with annotations: %v", pvcNamespace, pvcName, err) + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvcObj) + if err != nil { + log.Errorf("error creating two way merge patch for PVC %s/%s: %v", pvcNamespace, pvcName, err) + return err + } + + _, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvcObj.Name, + k8stypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("failed to update pvc annotations %s/%s with err:%+v", pvcNamespace, pvcName, err) return err diff --git a/pkg/csi/service/wcpguest/controller.go b/pkg/csi/service/wcpguest/controller.go index a2dc6a787e..d3599668b6 100644 --- a/pkg/csi/service/wcpguest/controller.go +++ b/pkg/csi/service/wcpguest/controller.go @@ -18,6 +18,7 @@ package wcpguest import ( "context" + "encoding/json" "fmt" "math" "net/http" @@ -48,6 +49,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -622,10 +624,37 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ if finalizer == cnsoperatortypes.CNSVolumeFinalizer { log.Infof("Removing %q finalizer from PersistentVolumeClaim with name: %q on namespace: %q", cnsoperatortypes.CNSVolumeFinalizer, svPVC.Name, svPVC.Namespace) - svPVC.ObjectMeta.Finalizers = slices.Delete(svPVC.ObjectMeta.Finalizers, i, i+1) + + oldData, err := json.Marshal(svPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal supervisor PVC %q in %q namespace. Error: %+v", + req.VolumeId, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } + + newPVC := svPVC.DeepCopy() + newPVC.ObjectMeta.Finalizers = slices.Delete(newPVC.ObjectMeta.Finalizers, i, i+1) + + newData, err := json.Marshal(newPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal updated supervisor PVC %q in %q namespace. Error: %+v", + req.VolumeId, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, svPVC) + if err != nil { + msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC %q in %q namespace. Error: %+v", + req.VolumeId, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } + // Update the instance after removing finalizer - _, err := c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Update(ctx, svPVC, - metav1.UpdateOptions{}) + _, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Patch(ctx, + svPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { msg := fmt.Sprintf("failed to update supervisor PVC %q in %q namespace. Error: %+v", req.VolumeId, c.supervisorNamespace, err) @@ -1466,14 +1495,38 @@ func (c *controller) ControllerExpandVolume(ctx context.Context, req *csi.Contro switch (gcPvcRequestSize).Cmp(svPvcRequestSize) { case 1: // Update requested storage in SV PVC spec - svPvcClone := svPVC.DeepCopy() - svPvcClone.Spec.Resources.Requests[corev1.ResourceName(corev1.ResourceStorage)] = *gcPvcRequestSize + oldExpandData, err := json.Marshal(svPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal supervisor PVC for expansion %q in %q namespace. Error: %+v", + volumeID, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } + + newExpandPVC := svPVC.DeepCopy() + newExpandPVC.Spec.Resources.Requests[corev1.ResourceName(corev1.ResourceStorage)] = *gcPvcRequestSize + + newExpandData, err := json.Marshal(newExpandPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal updated supervisor PVC for expansion %q in %q namespace. Error: %+v", + volumeID, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } + + expandPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldExpandData, newExpandData, svPVC) + if err != nil { + msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC expansion %q in %q namespace. Error: %+v", + volumeID, c.supervisorNamespace, err) + log.Error(msg) + return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg) + } // Make an update call to SV API server log.Infof("Increasing the size of supervisor PVC %s in namespace %s to %s", volumeID, c.supervisorNamespace, gcPvcRequestSize.String()) - svPVC, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Update( - ctx, svPvcClone, metav1.UpdateOptions{}) + svPVC, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Patch( + ctx, svPVC.Name, types.StrategicMergePatchType, expandPatchBytes, metav1.PatchOptions{}) if err != nil { msg := fmt.Sprintf("failed to update supervisor PVC %q in %q namespace. Error: %+v", volumeID, c.supervisorNamespace, err) diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index cf903481f2..191a8b3aa0 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -19,6 +19,7 @@ package kubernetes import ( "context" "embed" + "encoding/json" "flag" "fmt" "net" @@ -40,6 +41,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" utilyaml "k8s.io/apimachinery/pkg/util/yaml" clientset "k8s.io/client-go/kubernetes" @@ -728,8 +731,28 @@ func RetainPersistentVolume(ctx context.Context, k8sClient clientset.Interface, return err } - pv.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimRetain - _, err = k8sClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{}) + oldData, err := json.Marshal(pv) + if err != nil { + log.Errorf("Failed to marshal PV: %v, Error: %v", pv, err) + return err + } + + newPV := pv.DeepCopy() + newPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimRetain + newData, err := json.Marshal(newPV) + if err != nil { + log.Errorf("Failed to marshal updated PV with reclaim policy: %v, Error: %v", newPV, err) + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pv) + if err != nil { + log.Errorf("Error creating two way merge patch for PV %q with error: %v", pv.Name, err) + return err + } + + _, err = k8sClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, k8stypes.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("Failed to set the reclaim policy to retain. Error: %s", err.Error()) return err @@ -809,14 +832,34 @@ func AddFinalizerOnPVC(ctx context.Context, k8sClient clientset.Interface, pvcNa return err } + oldData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("Failed to marshal PVC: %v", err) + return err + } + + newPVC := pvc.DeepCopy() // If the finalizer is already present, no action is needed - if !controllerutil.AddFinalizer(pvc, finalizer) { + if !controllerutil.AddFinalizer(newPVC, finalizer) { log.Info("Finalizer already present on PVC. No action needed.") return nil } + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("Failed to marshal updated PVC with finalizer: %v", err) + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err) + return err + } + // Update the PVC with the new finalizer - _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvc, metav1.UpdateOptions{}) + _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvc.Name, k8stypes.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("Failed to add finalizer on PVC. Error: %s", err.Error()) return err @@ -843,14 +886,34 @@ func RemoveFinalizerFromPVC(ctx context.Context, k8sClient clientset.Interface, return err } + oldData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("Failed to marshal PVC: %v", err) + return err + } + + newPVC := pvc.DeepCopy() // If the finalizer is not present, no action is needed - if !controllerutil.RemoveFinalizer(pvc, finalizer) { + if !controllerutil.RemoveFinalizer(newPVC, finalizer) { log.Info("Finalizer not present on PVC. No action needed.") return nil } + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("Failed to marshal updated PVC with finalizer removed: %v", err) + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err) + return err + } + // Update the PVC to remove the finalizer - _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvc, metav1.UpdateOptions{}) + _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvc.Name, k8stypes.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("Failed to remove finalizer from PVC. Error: %s", err.Error()) return err diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go index f4b5c0e897..777eea77cb 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller.go @@ -33,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" apitypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -837,18 +838,40 @@ func validatePVCTopologyCompatibility(ctx context.Context, k8sclient clientset.I } topologyAnnotation = "[" + strings.Join(segmentsArray, ",") + "]" + oldData, err := json.Marshal(pvc) + if err != nil { + return logger.LogNewErrorf(log, "failed to marshal PVC %s/%s: %v", pvc.Namespace, pvc.Name, err) + } + + // Update the original PVC object's annotations first + // This ensures tests that expect the original PVC object to be modified work correctly if pvc.Annotations == nil { pvc.Annotations = make(map[string]string) } pvc.Annotations[common.AnnVolumeAccessibleTopology] = topologyAnnotation + newData, err := json.Marshal(pvc) + if err != nil { + return logger.LogNewErrorf(log, "failed to marshal updated PVC %s/%s with topology annotation: %v", + pvc.Namespace, pvc.Name, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + return logger.LogNewErrorf(log, "error creating two way merge patch for PVC %s/%s: %v", pvc.Namespace, pvc.Name, err) + } + // Update the PVC in Kubernetes to persist the topology annotation - _, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + updatedPVC, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, + apitypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { return logger.LogNewErrorf(log, "failed to update PVC %s/%s with topology annotation: %+v", pvc.Namespace, pvc.Name, err) } + // Update the PVC's resource version to match the patched PVC + pvc.ResourceVersion = updatedPVC.ResourceVersion + log.Infof("Successfully added topology annotation %s to PVC %s/%s", topologyAnnotation, pvc.Namespace, pvc.Name) // Return nil as we just added the topology annotation based on actual volume placement diff --git a/pkg/syncer/fullsync.go b/pkg/syncer/fullsync.go index d38ef57180..cfb07f8684 100644 --- a/pkg/syncer/fullsync.go +++ b/pkg/syncer/fullsync.go @@ -683,8 +683,17 @@ func setFileShareAnnotationsOnPVC(ctx context.Context, k8sClient clientset.Inter pv.Spec.CSI.VolumeHandle, err) return err } + oldData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("setFileShareAnnotationsOnPVC: Failed to marshal PVC %q in namespace %q: %v", pvc.Name, pvc.Namespace, err) + return err + } + vSANFileBackingDetails := volume.BackingObjectDetails.(*cnstypes.CnsVsanFileShareBackingDetails) accessPoints := make(map[string]string) + + // Update the original PVC object's annotations first + // This ensures tests that expect the original PVC object to be modified work correctly for _, kv := range vSANFileBackingDetails.AccessPoints { if kv.Key == common.Nfsv3AccessPointKey { pvc.Annotations[common.Nfsv3ExportPathAnnotationKey] = kv.Value @@ -696,14 +705,33 @@ func setFileShareAnnotationsOnPVC(ctx context.Context, k8sClient clientset.Inter log.Debugf("setFileShareAnnotationsOnPVC: Access point details for PVC: %q, namespace: %q are %+v", pvc.Name, pvc.Namespace, accessPoints) + newData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("setFileShareAnnotationsOnPVC: Failed to marshal updated PVC %q in namespace %q with annotations: %v", + pvc.Name, pvc.Namespace, err) + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + log.Errorf("setFileShareAnnotationsOnPVC: Error creating two way merge patch for PVC %q in namespace %q "+ + "with error: %v", pvc.Name, pvc.Namespace, err) + return err + } + // Update PVC to add annotation on it - pvc, err = k8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, - metav1.UpdateOptions{}) + updatedPVC, err := k8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, + pvc.Name, types.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("setFileShareAnnotationsOnPVC: Error updating PVC %q in namespace %q, Err: %v", pvc.Name, pvc.Namespace, err) return err } + + // Update the PVC's resource version to match the patched PVC + pvc.ResourceVersion = updatedPVC.ResourceVersion + log.Infof("setFileShareAnnotationsOnPVC: Added file share export paths annotation successfully on PVC %q, "+ "namespce %q", pvc.Name, pvc.Namespace) return nil @@ -1727,9 +1755,37 @@ func RemoveCNSFinalizerFromPVCIfTKGClusterDeleted(ctx context.Context, k8sClient log.Infof("RemoveCNSFinalizerFromPVCIfTKGClusterDeleted: Removing %q finalizer from PVC "+ "with name: %q on namespace: %q in Terminating state", cnsoperatortypes.CNSVolumeFinalizer, pvc.Name, pvc.Namespace) - controllerutil.RemoveFinalizer(pvc, finalizerToRemove) - _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, - pvc, metav1.UpdateOptions{}) + + oldData, err := json.Marshal(pvc) + if err != nil { + msg := fmt.Sprintf("RemoveCNSFinalizerFromPVCIfTKGClusterDeleted: Failed to marshal PVC %q "+ + "in namespace %q: %v", pvc.Name, pvc.Namespace, err) + log.Error(msg) + return + } + + newPVC := pvc.DeepCopy() + controllerutil.RemoveFinalizer(newPVC, finalizerToRemove) + + newData, err := json.Marshal(newPVC) + if err != nil { + msg := fmt.Sprintf("RemoveCNSFinalizerFromPVCIfTKGClusterDeleted: Failed to marshal updated PVC %q "+ + "in namespace %q with finalizer removed: %v", pvc.Name, pvc.Namespace, err) + log.Error(msg) + return + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + msg := fmt.Sprintf("RemoveCNSFinalizerFromPVCIfTKGClusterDeleted: Error creating two way merge patch"+ + "for PVC %q in namespace %q with error: %v", pvc.Name, pvc.Namespace, err) + log.Error(msg) + return + } + + _, err = k8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, + types.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { msg := fmt.Sprintf("RemoveCNSFinalizerFromPVCIfTKGClusterDeleted: failed to update "+ "supervisor PVC %q in %q namespace. Err: %+v", pvc.Name, pvc.Namespace, err) diff --git a/pkg/syncer/fullsync_test.go b/pkg/syncer/fullsync_test.go index 546eca42c6..5865f8fa48 100644 --- a/pkg/syncer/fullsync_test.go +++ b/pkg/syncer/fullsync_test.go @@ -216,9 +216,9 @@ func TestSetFileShareAnnotationsOnPVC_PVCUpdateError(t *testing.T) { }, } - // Create fake k8s client with PV and configure it to fail on PVC update + // Create fake k8s client with PV and configure it to fail on PVC patch k8sClient := k8sfake.NewSimpleClientset(pv) - k8sClient.PrependReactor("update", "persistentvolumeclaims", + k8sClient.PrependReactor("patch", "persistentvolumeclaims", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { return true, nil, errors.New("update failed") }) diff --git a/pkg/syncer/pv_to_backingdiskobjectid_mapping.go b/pkg/syncer/pv_to_backingdiskobjectid_mapping.go index d21d17ea2e..96f24b96c7 100644 --- a/pkg/syncer/pv_to_backingdiskobjectid_mapping.go +++ b/pkg/syncer/pv_to_backingdiskobjectid_mapping.go @@ -18,12 +18,15 @@ package syncer import ( "context" + "encoding/json" "strings" cnstypes "github.com/vmware/govmomi/cns/types" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/utils" @@ -153,13 +156,37 @@ func updatePVtoBackingDiskObjectIdMappingStatus(ctx context.Context, k8sclient c } if !found || val != pvToBackingDiskObjectIdPair { + oldData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to marshal PVC %s/%s: %v", + pvc.Namespace, pvc.Name, err) + return false, err + } + // PVToBackingDiskObjectId annotation on pvc is changed, set it to new value. - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annPVtoBackingDiskObjectId, pvToBackingDiskObjectIdPair) + newPVC := pvc.DeepCopy() + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annPVtoBackingDiskObjectId, pvToBackingDiskObjectIdPair) log.Infof("updatePVtoBackingDiskObjectIdMappingStatus: set pv to backingdiskobjectid annotation for "+ "pvc %s/%s from old value %s to new value %s", pvc.Namespace, pvc.Name, val, pvToBackingDiskObjectIdPair) - _, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to marshal updated PVC %s/%s with annotation: %v", + pvc.Namespace, pvc.Name, err) + return false, err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Error creating two way merge patch for PVC %s/%s: %v", + pvc.Namespace, pvc.Name, err) + return false, err + } + + _, err = k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, + types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { if apierrors.IsConflict(err) { log.Debugf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to update pvc %s/%s with err:%+v, "+ @@ -175,12 +202,35 @@ func updatePVtoBackingDiskObjectIdMappingStatus(ctx context.Context, k8sclient c return false, err } + oldRetryData, err := json.Marshal(newPvc) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to marshal retry PVC %s/%s: %v", + newPvc.Namespace, newPvc.Name, err) + return false, err + } + + newRetryPVC := newPvc.DeepCopy() log.Infof("updatePVtoBackingDiskObjectIdMappingStatus: updating pv to backingdiskobjectid annotation "+ "for pvc %s/%s which get from API server from old value %s to new value %s", newPvc.Namespace, newPvc.Name, val, pvToBackingDiskObjectIdPair) - metav1.SetMetaDataAnnotation(&newPvc.ObjectMeta, annPVtoBackingDiskObjectId, pvToBackingDiskObjectIdPair) - _, err = k8sclient.CoreV1().PersistentVolumeClaims(newPvc.Namespace).Update(ctx, - newPvc, metav1.UpdateOptions{}) + metav1.SetMetaDataAnnotation(&newRetryPVC.ObjectMeta, annPVtoBackingDiskObjectId, pvToBackingDiskObjectIdPair) + + newRetryData, err := json.Marshal(newRetryPVC) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to marshal updated retry PVC %s/%s "+ + "with annotation: %v", newPvc.Namespace, newPvc.Name, err) + return false, err + } + + retryPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldRetryData, newRetryData, newPvc) + if err != nil { + log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Error creating two way merge patch "+ + "for retry PVC %s/%s: %v", newPvc.Namespace, newPvc.Name, err) + return false, err + } + + _, err = k8sclient.CoreV1().PersistentVolumeClaims(newPvc.Namespace).Patch(ctx, newPvc.Name, + types.StrategicMergePatchType, retryPatchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("updatePVtoBackingDiskObjectIdMappingStatus: Failed to update pvc %s/%s with err:%+v", newPvc.Namespace, newPvc.Name, err) diff --git a/pkg/syncer/pvcsi_fullsync.go b/pkg/syncer/pvcsi_fullsync.go index 3e7e2c73ad..ff4dd256b5 100644 --- a/pkg/syncer/pvcsi_fullsync.go +++ b/pkg/syncer/pvcsi_fullsync.go @@ -487,9 +487,35 @@ func setGuestClusterDetailsOnSupervisorPVC(ctx context.Context, metadataSyncer * if svPVC.Labels == nil { svPVC.Labels = make(map[string]string) } - svPVC.Labels[key] = metadataSyncer.configInfo.Cfg.GC.TanzuKubernetesClusterUID - _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Update( - ctx, svPVC, metav1.UpdateOptions{}) + oldData, err := json.Marshal(svPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + newPVC := svPVC.DeepCopy() + newPVC.Labels[key] = metadataSyncer.configInfo.Cfg.GC.TanzuKubernetesClusterUID + + newData, err := json.Marshal(newPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal updated supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, svPVC) + if err != nil { + msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Patch( + ctx, svPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { msg := fmt.Sprintf("failed to update supervisor PVC: %q with guest cluster labels in %q namespace."+ "Error: %+v", pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) @@ -503,11 +529,37 @@ func setGuestClusterDetailsOnSupervisorPVC(ctx context.Context, metadataSyncer * if metadataSyncer.coCommonInterface.IsFSSEnabled(ctx, common.SVPVCSnapshotProtectionFinalizer) { cnsFinalizerPresent := slices.Contains(svPVC.ObjectMeta.Finalizers, cnsoperatortypes.CNSVolumeFinalizer) if !cnsFinalizerPresent { - svPVC.ObjectMeta.Finalizers = append(svPVC.ObjectMeta.Finalizers, cnsoperatortypes.CNSVolumeFinalizer) - } - if !cnsFinalizerPresent { - _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Update( - ctx, svPVC, metav1.UpdateOptions{}) + oldFinalizerData, err := json.Marshal(svPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal supervisor PVC for finalizer: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + newFinalizerPVC := svPVC.DeepCopy() + newFinalizerPVC.ObjectMeta.Finalizers = append(newFinalizerPVC.ObjectMeta.Finalizers, + cnsoperatortypes.CNSVolumeFinalizer) + + newFinalizerData, err := json.Marshal(newFinalizerPVC) + if err != nil { + msg := fmt.Sprintf("failed to marshal updated supervisor PVC with finalizer: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + finalizerPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldFinalizerData, newFinalizerData, svPVC) + if err != nil { + msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC finalizer: %q "+ + "in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + log.Error(msg) + continue + } + + _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Patch( + ctx, svPVC.Name, types.StrategicMergePatchType, finalizerPatchBytes, metav1.PatchOptions{}) if err != nil { msg := fmt.Sprintf("failed to update supervisor PVC: %q with guest cluster labels in %q namespace."+ " Error: %+v", pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) diff --git a/pkg/syncer/pvcsi_metadatasyncer.go b/pkg/syncer/pvcsi_metadatasyncer.go index 9c5aeb6de2..48d7005a53 100644 --- a/pkg/syncer/pvcsi_metadatasyncer.go +++ b/pkg/syncer/pvcsi_metadatasyncer.go @@ -18,6 +18,7 @@ package syncer import ( "context" + "encoding/json" "fmt" "github.com/davecgh/go-spew/spew" @@ -25,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" cnsvolumemetadatav1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsvolumemetadata/v1alpha1" cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger" @@ -103,9 +105,32 @@ func pvcsiVolumeDeleted(ctx context.Context, uID string, metadataSyncer *metadat key := fmt.Sprintf("%s/%s", metadataSyncer.configInfo.Cfg.GC.TanzuKubernetesClusterName, metadataSyncer.configInfo.Cfg.GC.ClusterDistribution) if _, ok := svPVC.Labels[key]; ok { - delete(svPVC.Labels, key) - _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Update( - ctx, svPVC, metav1.UpdateOptions{}) + oldData, err := json.Marshal(svPVC) + if err != nil { + log.Errorf("failed to marshal supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + return + } + + newPVC := svPVC.DeepCopy() + delete(newPVC.Labels, key) + + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("failed to marshal updated supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + return + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, svPVC) + if err != nil { + log.Errorf("error creating two way merge patch for supervisor PVC: %q in %q namespace. Error: %+v", + pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) + return + } + + _, err = metadataSyncer.supervisorClient.CoreV1().PersistentVolumeClaims(supervisorNamespace).Patch( + ctx, svPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("failed to update supervisor PVC: %q in %q namespace. Error: %+v", pv.Spec.CSI.VolumeHandle, supervisorNamespace, err) diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index d052b2d06d..290e06116a 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -18,6 +18,7 @@ package syncer import ( "context" + "encoding/json" "fmt" "os" "sync" @@ -33,6 +34,8 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" testclient "k8s.io/client-go/kubernetes/fake" @@ -748,11 +751,28 @@ func runTestFullSyncWorkflows(t *testing.T) { } // Update pvc with new label. + oldData, err := json.Marshal(pvc) + if err != nil { + t.Fatalf("Failed to marshal PVC: %v", err) + } + + newPVC := pvc.DeepCopy() newPVCLabel := make(map[string]string) newPVCLabel[testPVCLabelName] = newTestPVCLabelValue - pvc.Labels = newPVCLabel - if pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(testNamespace).Update( - ctx, pvc, metav1.UpdateOptions{}); err != nil { + newPVC.Labels = newPVCLabel + + newData, err := json.Marshal(newPVC) + if err != nil { + t.Fatalf("Failed to marshal updated PVC with labels: %v", err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + t.Fatalf("Error creating two way merge patch for PVC: %v", err) + } + + if pvc, err = k8sclient.CoreV1().PersistentVolumeClaims(testNamespace).Patch( + ctx, pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { t.Fatal(err) } waitForListerSync() diff --git a/pkg/syncer/volume_health.go b/pkg/syncer/volume_health.go index ce40f73abd..55f09f0fe2 100644 --- a/pkg/syncer/volume_health.go +++ b/pkg/syncer/volume_health.go @@ -18,6 +18,7 @@ package syncer import ( "context" + "encoding/json" "time" cnstypes "github.com/vmware/govmomi/cns/types" @@ -25,6 +26,8 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/utils" @@ -134,14 +137,37 @@ func updateVolumeHealthStatus(ctx context.Context, k8sclient clientset.Interface val, found := pvc.Annotations[annVolumeHealth] _, foundAnnHealthTS := pvc.Annotations[annVolumeHealthTS] if !found || val != volHealthStatus || !foundAnnHealthTS { + oldData, err := json.Marshal(pvc) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Failed to marshal PVC %s/%s: %v", pvc.Namespace, pvc.Name, err) + return + } + // VolumeHealth annotation on pvc is changed, set it to new value. - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annVolumeHealth, volHealthStatus) + newPVC := pvc.DeepCopy() + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annVolumeHealth, volHealthStatus) timeNow := time.Now().Format(time.UnixDate) - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annVolumeHealthTS, timeNow) + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annVolumeHealthTS, timeNow) log.Infof("updateVolumeHealthStatus: set volumehealth annotation for pvc %s/%s from old "+ "value %s to new value %s and volumehealthTS annotation to %s", pvc.Namespace, pvc.Name, val, volHealthStatus, timeNow) - _, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + + newData, err := json.Marshal(newPVC) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Failed to marshal updated PVC %s/%s with health annotations: %v", + pvc.Namespace, pvc.Name, err) + return + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Error creating two way merge patch for PVC %s/%s with error: %v", + pvc.Namespace, pvc.Name, err) + return + } + + _, err = k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Patch(ctx, pvc.Name, types.StrategicMergePatchType, + patchBytes, metav1.PatchOptions{}) if err != nil { if apierrors.IsConflict(err) { log.Debugf("updateVolumeHealthStatus: Failed to update pvc %s/%s with err:%+v, will retry the update", @@ -151,14 +177,36 @@ func updateVolumeHealthStatus(ctx context.Context, k8sclient clientset.Interface newPvc, err := k8sclient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get( ctx, pvc.Name, metav1.GetOptions{}) if err == nil { + oldRetryData, err := json.Marshal(newPvc) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Failed to marshal retry PVC %s/%s: %v", newPvc.Namespace, newPvc.Name, err) + return + } + + newRetryPVC := newPvc.DeepCopy() timeUpdate := time.Now().Format(time.UnixDate) log.Infof("updateVolumeHealthStatus: updating volume health annotation for pvc %s/%s which "+ "get from API server from old value %s to new value %s and volumehealthTS annotation to %s", newPvc.Namespace, newPvc.Name, val, volHealthStatus, timeUpdate) - metav1.SetMetaDataAnnotation(&newPvc.ObjectMeta, annVolumeHealth, volHealthStatus) - metav1.SetMetaDataAnnotation(&newPvc.ObjectMeta, annVolumeHealthTS, timeUpdate) - _, err := k8sclient.CoreV1().PersistentVolumeClaims(newPvc.Namespace).Update(ctx, - newPvc, metav1.UpdateOptions{}) + metav1.SetMetaDataAnnotation(&newRetryPVC.ObjectMeta, annVolumeHealth, volHealthStatus) + metav1.SetMetaDataAnnotation(&newRetryPVC.ObjectMeta, annVolumeHealthTS, timeUpdate) + + newRetryData, err := json.Marshal(newRetryPVC) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Failed to marshal updated retry PVC %s/%s with health annotations: %v", + newPvc.Namespace, newPvc.Name, err) + return + } + + retryPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldRetryData, newRetryData, newPvc) + if err != nil { + log.Errorf("updateVolumeHealthStatus: Error creating two way merge patch for retry PVC %s/%s "+ + "with error: %v", newPvc.Namespace, newPvc.Name, err) + return + } + + _, err = k8sclient.CoreV1().PersistentVolumeClaims(newPvc.Namespace).Patch(ctx, newPvc.Name, + types.StrategicMergePatchType, retryPatchBytes, metav1.PatchOptions{}) if err != nil { log.Errorf("updateVolumeHealthStatus: Failed to update pvc %s/%s with err:%+v", newPvc.Namespace, newPvc.Name, err)