Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package k8sorchestrator
import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -44,6 +45,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -2173,15 +2175,36 @@ func (c *K8sOrchestrator) PreLinkedCloneCreateAction(ctx context.Context, pvcNam
return err
}

if linkedClonePVC.Labels == nil {
linkedClonePVC.Labels = make(map[string]string)
oldData, err := json.Marshal(linkedClonePVC)
if err != nil {
log.Errorf("Failed to marshal PVC %s/%s: %v", pvcNamespace, pvcName, err)
return err
}

newPVC := linkedClonePVC.DeepCopy()
if newPVC.Labels == nil {
newPVC.Labels = make(map[string]string)
}
// Add label
if _, ok := linkedClonePVC.Labels[common.AnnKeyLinkedClone]; !ok {
linkedClonePVC.Labels[common.LinkedClonePVCLabel] = linkedClonePVC.Annotations[common.AttributeIsLinkedClone]
if _, ok := newPVC.Labels[common.AnnKeyLinkedClone]; !ok {
newPVC.Labels[common.LinkedClonePVCLabel] = newPVC.Annotations[common.AttributeIsLinkedClone]
}

newData, err := json.Marshal(newPVC)
if err != nil {
log.Errorf("Failed to marshal updated PVC %s/%s with labels: %v", pvcNamespace, pvcName, err)
return err
}

_, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, linkedClonePVC, metav1.UpdateOptions{})
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, linkedClonePVC)
if err != nil {
log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err)
return err
}

_, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, linkedClonePVC.Name,
k8stypes.StrategicMergePatchType,
patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("failed to add linked clone label for PVC %s/%s. Error: %+v, retrying...",
pvcNamespace, pvcName, err)
Expand Down Expand Up @@ -2229,11 +2252,36 @@ func (c *K8sOrchestrator) UpdatePersistentVolumeLabel(ctx context.Context,
if err != nil {
return fmt.Errorf("error getting PV %s from API server: %w", pvName, err)
}
if pv.Labels == nil {
pv.Labels = make(map[string]string)

oldData, err := json.Marshal(pv)
if err != nil {
errMsg := fmt.Sprintf("Failed to marshal PV %s: %v", pvName, err)
log.Error(errMsg)
return err
}

newPV := pv.DeepCopy()
if newPV.Labels == nil {
newPV.Labels = make(map[string]string)
}
newPV.Labels[key] = value

newData, err := json.Marshal(newPV)
if err != nil {
errMsg := fmt.Sprintf("Failed to marshal updated PV %s with labels: %v", pvName, err)
log.Error(errMsg)
return err
}
pv.Labels[key] = value
_, err = c.k8sClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pv)
if err != nil {
errMsg := fmt.Sprintf("Error creating two way merge patch for PV %s with error: %v", pvName, err)
log.Error(errMsg)
return err
}

_, err = c.k8sClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, k8stypes.StrategicMergePatchType,
patchBytes, metav1.PatchOptions{})
if err != nil {
errMsg := fmt.Sprintf("error updating PV %s with labels %s/%s. Error: %v", pvName, key, value, err)
log.Error(errMsg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -83,17 +84,38 @@ func (c *K8sOrchestrator) updatePVCAnnotations(ctx context.Context,
return err
}

oldData, err := json.Marshal(pvcObj)
if err != nil {
log.Errorf("failed to marshal PVC %s/%s: %v", pvcNamespace, pvcName, err)
return err
}

newPVC := pvcObj.DeepCopy()
for key, val := range annotations {
// If value is not set, remove the annotation.
if val == "" {
delete(pvcObj.ObjectMeta.Annotations, key)
delete(newPVC.ObjectMeta.Annotations, key)
log.Debugf("Removing annotation %s on pvc %s/%s", key, pvcNamespace, pvcName)
} else {
metav1.SetMetaDataAnnotation(&pvcObj.ObjectMeta, key, val)
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, key, val)
log.Debugf("Updating annotation %s on pvc %s/%s to value: %s", key, pvcNamespace, pvcName, val)
}
}
_, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvcObj, metav1.UpdateOptions{})

newData, err := json.Marshal(newPVC)
if err != nil {
log.Errorf("failed to marshal updated PVC %s/%s with annotations: %v", pvcNamespace, pvcName, err)
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvcObj)
if err != nil {
log.Errorf("error creating two way merge patch for PVC %s/%s: %v", pvcNamespace, pvcName, err)
return err
}

_, err = c.k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvcObj.Name,
k8stypes.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("failed to update pvc annotations %s/%s with err:%+v", pvcNamespace, pvcName, err)
return err
Expand Down
67 changes: 60 additions & 7 deletions pkg/csi/service/wcpguest/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package wcpguest

import (
"context"
"encoding/json"
"fmt"
"math"
"net/http"
Expand Down Expand Up @@ -48,6 +49,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -622,10 +624,37 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ
if finalizer == cnsoperatortypes.CNSVolumeFinalizer {
log.Infof("Removing %q finalizer from PersistentVolumeClaim with name: %q on namespace: %q",
cnsoperatortypes.CNSVolumeFinalizer, svPVC.Name, svPVC.Namespace)
svPVC.ObjectMeta.Finalizers = slices.Delete(svPVC.ObjectMeta.Finalizers, i, i+1)

oldData, err := json.Marshal(svPVC)
if err != nil {
msg := fmt.Sprintf("failed to marshal supervisor PVC %q in %q namespace. Error: %+v",
req.VolumeId, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

newPVC := svPVC.DeepCopy()
newPVC.ObjectMeta.Finalizers = slices.Delete(newPVC.ObjectMeta.Finalizers, i, i+1)

newData, err := json.Marshal(newPVC)
if err != nil {
msg := fmt.Sprintf("failed to marshal updated supervisor PVC %q in %q namespace. Error: %+v",
req.VolumeId, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, svPVC)
if err != nil {
msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC %q in %q namespace. Error: %+v",
req.VolumeId, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

// Update the instance after removing finalizer
_, err := c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Update(ctx, svPVC,
metav1.UpdateOptions{})
_, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Patch(ctx,
svPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
msg := fmt.Sprintf("failed to update supervisor PVC %q in %q namespace. Error: %+v",
req.VolumeId, c.supervisorNamespace, err)
Expand Down Expand Up @@ -1466,14 +1495,38 @@ func (c *controller) ControllerExpandVolume(ctx context.Context, req *csi.Contro
switch (gcPvcRequestSize).Cmp(svPvcRequestSize) {
case 1:
// Update requested storage in SV PVC spec
svPvcClone := svPVC.DeepCopy()
svPvcClone.Spec.Resources.Requests[corev1.ResourceName(corev1.ResourceStorage)] = *gcPvcRequestSize
oldExpandData, err := json.Marshal(svPVC)
if err != nil {
msg := fmt.Sprintf("failed to marshal supervisor PVC for expansion %q in %q namespace. Error: %+v",
volumeID, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

newExpandPVC := svPVC.DeepCopy()
newExpandPVC.Spec.Resources.Requests[corev1.ResourceName(corev1.ResourceStorage)] = *gcPvcRequestSize

newExpandData, err := json.Marshal(newExpandPVC)
if err != nil {
msg := fmt.Sprintf("failed to marshal updated supervisor PVC for expansion %q in %q namespace. Error: %+v",
volumeID, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

expandPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldExpandData, newExpandData, svPVC)
if err != nil {
msg := fmt.Sprintf("error creating two way merge patch for supervisor PVC expansion %q in %q namespace. Error: %+v",
volumeID, c.supervisorNamespace, err)
log.Error(msg)
return nil, csifault.CSIInternalFault, status.Error(codes.Internal, msg)
}

// Make an update call to SV API server
log.Infof("Increasing the size of supervisor PVC %s in namespace %s to %s",
volumeID, c.supervisorNamespace, gcPvcRequestSize.String())
svPVC, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Update(
ctx, svPvcClone, metav1.UpdateOptions{})
svPVC, err = c.supervisorClient.CoreV1().PersistentVolumeClaims(c.supervisorNamespace).Patch(
ctx, svPVC.Name, types.StrategicMergePatchType, expandPatchBytes, metav1.PatchOptions{})
if err != nil {
msg := fmt.Sprintf("failed to update supervisor PVC %q in %q namespace. Error: %+v",
volumeID, c.supervisorNamespace, err)
Expand Down
75 changes: 69 additions & 6 deletions pkg/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kubernetes
import (
"context"
"embed"
"encoding/json"
"flag"
"fmt"
"net"
Expand All @@ -40,6 +41,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -728,8 +731,28 @@ func RetainPersistentVolume(ctx context.Context, k8sClient clientset.Interface,
return err
}

pv.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimRetain
_, err = k8sClient.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
oldData, err := json.Marshal(pv)
if err != nil {
log.Errorf("Failed to marshal PV: %v, Error: %v", pv, err)
return err
}

newPV := pv.DeepCopy()
newPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimRetain
newData, err := json.Marshal(newPV)
if err != nil {
log.Errorf("Failed to marshal updated PV with reclaim policy: %v, Error: %v", newPV, err)
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pv)
if err != nil {
log.Errorf("Error creating two way merge patch for PV %q with error: %v", pv.Name, err)
return err
}

_, err = k8sClient.CoreV1().PersistentVolumes().Patch(ctx, pv.Name, k8stypes.StrategicMergePatchType,
patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to set the reclaim policy to retain. Error: %s", err.Error())
return err
Expand Down Expand Up @@ -809,14 +832,34 @@ func AddFinalizerOnPVC(ctx context.Context, k8sClient clientset.Interface, pvcNa
return err
}

oldData, err := json.Marshal(pvc)
if err != nil {
log.Errorf("Failed to marshal PVC: %v", err)
return err
}

newPVC := pvc.DeepCopy()
// If the finalizer is already present, no action is needed
if !controllerutil.AddFinalizer(pvc, finalizer) {
if !controllerutil.AddFinalizer(newPVC, finalizer) {
log.Info("Finalizer already present on PVC. No action needed.")
return nil
}

newData, err := json.Marshal(newPVC)
if err != nil {
log.Errorf("Failed to marshal updated PVC with finalizer: %v", err)
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc)
if err != nil {
log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err)
return err
}

// Update the PVC with the new finalizer
_, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvc, metav1.UpdateOptions{})
_, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvc.Name, k8stypes.StrategicMergePatchType,
patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to add finalizer on PVC. Error: %s", err.Error())
return err
Expand All @@ -843,14 +886,34 @@ func RemoveFinalizerFromPVC(ctx context.Context, k8sClient clientset.Interface,
return err
}

oldData, err := json.Marshal(pvc)
if err != nil {
log.Errorf("Failed to marshal PVC: %v", err)
return err
}

newPVC := pvc.DeepCopy()
// If the finalizer is not present, no action is needed
if !controllerutil.RemoveFinalizer(pvc, finalizer) {
if !controllerutil.RemoveFinalizer(newPVC, finalizer) {
log.Info("Finalizer not present on PVC. No action needed.")
return nil
}

newData, err := json.Marshal(newPVC)
if err != nil {
log.Errorf("Failed to marshal updated PVC with finalizer removed: %v", err)
return err
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, pvc)
if err != nil {
log.Errorf("Error creating two way merge patch for PVC %s/%s with error: %v", pvcNamespace, pvcName, err)
return err
}

// Update the PVC to remove the finalizer
_, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Update(ctx, pvc, metav1.UpdateOptions{})
_, err = k8sClient.CoreV1().PersistentVolumeClaims(pvcNamespace).Patch(ctx, pvc.Name, k8stypes.StrategicMergePatchType,
patchBytes, metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to remove finalizer from PVC. Error: %s", err.Error())
return err
Expand Down
Loading