Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -534,6 +535,40 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
return reconcile.Result{RequeueAfter: timeout}, nil
}

// If existing PVC has DataSourceRef and volumeMode set, handle volumeMode validation and inheritance
volumeModeInherited := false
if pvc != nil && pvc.Spec.DataSourceRef != nil && pvc.Spec.VolumeMode != nil {
if instance.Spec.VolumeMode == "" {
// Inherit the volumeMode from the existing PVC
log.Infof("Existing PVC %s in namespace %s has DataSourceRef and volumeMode set to %s. "+
"CnsRegisterVolume does not have volumeMode set. Inheriting volumeMode from existing PVC.",
pvc.Name, pvc.Namespace, *pvc.Spec.VolumeMode)
instance.Spec.VolumeMode = *pvc.Spec.VolumeMode
volumeModeInherited = true
} else if instance.Spec.VolumeMode != *pvc.Spec.VolumeMode {
// Both are set but don't match - this is an error
msg := fmt.Sprintf("VolumeMode mismatch: existing PVC %s in namespace %s has volumeMode %s, "+
"but CnsRegisterVolume specifies volumeMode %s",
pvc.Name, pvc.Namespace, *pvc.Spec.VolumeMode, instance.Spec.VolumeMode)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
}

// Persist the inherited volumeMode to the CnsRegisterVolume CR
if volumeModeInherited {
err = updateCnsRegisterVolume(ctx, r.client, instance)
if err != nil {
msg := fmt.Sprintf("Failed to update CnsRegisterVolume with inherited volumeMode. Error: %+v", err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
log.Infof("Successfully updated CnsRegisterVolume %s with inherited volumeMode: %s",
instance.Name, instance.Spec.VolumeMode)
}

// Do this check before creating a PV. Otherwise, PVC will be bound to PV after PV
// is created even if validation fails
if pvc != nil {
Expand Down Expand Up @@ -618,6 +653,13 @@ func (r *ReconcileCnsRegisterVolume) Reconcile(ctx context.Context,
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
} else {
// PV exists - check if volumeMode needs correction
pv, err = validateAndFixPVVolumeMode(ctx, k8sclient, r, instance, pv, pvName, volumeID,
capacityInMb, accessMode, storageClassName, pvNodeAffinity, timeout)
if err != nil {
return reconcile.Result{RequeueAfter: timeout}, nil
}
}
// If PV is already bound to a different PVC at this point, then its a
// duplicate request.
Expand Down Expand Up @@ -1029,6 +1071,103 @@ func isBlockVolumeRegisterRequest(ctx context.Context, instance *cnsregistervolu
return false
}

// validateAndFixPVVolumeMode checks if an existing PV has the correct volumeMode.
// If the volumeMode doesn't match what's expected, it untags the CNS volume, deletes
// and recreates the PV with the correct volumeMode since volumeMode is immutable on PVs.
func validateAndFixPVVolumeMode(ctx context.Context, k8sclient clientset.Interface,
r *ReconcileCnsRegisterVolume, instance *cnsregistervolumev1alpha1.CnsRegisterVolume,
pv *v1.PersistentVolume, pvName, volumeID string, capacityInMb int64,
accessMode v1.PersistentVolumeAccessMode, storageClassName string,
pvNodeAffinity *v1.VolumeNodeAffinity, timeout time.Duration) (*v1.PersistentVolume, error) {
log := logger.GetLogger(ctx)

// Determine expected volumeMode
expectedVolumeMode := instance.Spec.VolumeMode
if expectedVolumeMode == "" {
expectedVolumeMode = v1.PersistentVolumeFilesystem
}

// Get actual volumeMode from PV
pvVolumeMode := v1.PersistentVolumeFilesystem
if pv.Spec.VolumeMode != nil {
pvVolumeMode = *pv.Spec.VolumeMode
}

// Check if volumeMode matches
if expectedVolumeMode != pvVolumeMode {
log.Warnf("PV %s exists but has incorrect volumeMode. Expected: %s, Actual: %s. "+
"Untagging CNS volume and recreating PV with correct volumeMode.", pvName, expectedVolumeMode, pvVolumeMode)

// Untag the CNS volume before deleting PV to prevent underlying volume deletion.
// deleteDisk=false ensures the underlying vSphere disk is preserved.
log.Infof("Untagging CNS volume %s to preserve underlying disk before PV deletion", volumeID)
_, err := common.DeleteVolumeUtil(ctx, r.volumeManager, volumeID, false)
if err != nil {
msg := fmt.Sprintf("Failed to untag CNS volume %s. Error: %+v", volumeID, err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return nil, err
}
log.Infof("Successfully untagged CNS volume %s", volumeID)

// Delete the existing PV (underlying volume is safe due to CNS untag with deleteDisk=false)
err = k8sclient.CoreV1().PersistentVolumes().Delete(ctx, pvName, *metav1.NewDeleteOptions(0))
if err != nil {
msg := fmt.Sprintf("Failed to delete PV %s with incorrect volumeMode. Error: %+v", pvName, err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return nil, err
}
log.Infof("Successfully deleted PV %s with incorrect volumeMode", pvName)

// Wait for PV to be fully deleted before recreating
log.Infof("Waiting for PV %s to be fully deleted", pvName)
waitErr := wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
_, err := k8sclient.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// PV is fully deleted
return true, nil
}
// Unexpected error
log.Warnf("Error checking PV deletion status: %+v", err)
return false, err
}
// PV still exists, continue waiting
return false, nil
})
if waitErr != nil {
msg := fmt.Sprintf("Timeout waiting for PV %s to be deleted. Error: %+v", pvName, waitErr)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return nil, waitErr
}
log.Infof("PV %s has been fully deleted", pvName)

// Recreate PV with correct volumeMode
claimRef := &v1.ObjectReference{
Kind: "PersistentVolumeClaim",
APIVersion: "v1",
Namespace: instance.Namespace,
Name: instance.Spec.PvcName,
}
pvSpec := getPersistentVolumeSpec(pvName, volumeID, capacityInMb,
accessMode, instance.Spec.VolumeMode, storageClassName, claimRef)
pvSpec.Spec.NodeAffinity = pvNodeAffinity
log.Debugf("Recreating PV with spec: %+v", pvSpec)
pv, err = k8sclient.CoreV1().PersistentVolumes().Create(ctx, pvSpec, metav1.CreateOptions{})
if err != nil {
log.Errorf("Failed to recreate PV with spec: %+v. Error: %+v", pvSpec, err)
setInstanceError(ctx, r, instance,
fmt.Sprintf("Failed to recreate PV: %s with correct volumeMode. Error: %+v", pvName, err))
return nil, err
}
log.Infof("Successfully recreated PV %s with correct volumeMode: %s", pvName, expectedVolumeMode)
}

return pv, nil
}

// setInstanceError sets error and records an event on the CnsRegisterVolume
// instance.
func setInstanceError(ctx context.Context, r *ReconcileCnsRegisterVolume,
Expand Down
Loading