From 9b8cfbceda71bebc5f599cbb44f945306a720d86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sun, 20 Jul 2025 00:25:25 +0800 Subject: [PATCH] fix uncertain cache fix wrong error is used when checking final error. map is not safe for concurrent read/write from multiple worker. Also fixed that. Added related tests. --- go.mod | 2 +- pkg/controller/controller.go | 2 +- pkg/modifycontroller/controller.go | 13 +-- pkg/modifycontroller/controller_test.go | 89 ++++++++++++----- pkg/modifycontroller/modify_status.go | 23 ----- pkg/modifycontroller/modify_status_test.go | 107 +-------------------- pkg/modifycontroller/modify_volume.go | 12 +-- pkg/modifycontroller/modify_volume_test.go | 40 ++++++++ 8 files changed, 118 insertions(+), 170 deletions(-) diff --git a/go.mod b/go.mod index 9342b08ac..43e22abba 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( k8s.io/component-base v0.33.1 k8s.io/csi-translation-lib v0.33.0 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20241210054802-24370beab758 ) require ( @@ -69,7 +70,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect - k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index c3d1b39d0..4fba3ccf1 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -291,7 +291,7 @@ func (ctrl *resizeController) Run(workers int, ctx context.Context) { go ctrl.slowSet.Run(stopCh) } - for i := 0; i < workers; i++ { + for range workers { go wait.Until(ctrl.syncPVCs, 0, stopCh) } diff --git a/pkg/modifycontroller/controller.go b/pkg/modifycontroller/controller.go index 1895a3553..2b41ee00f 100644 --- a/pkg/modifycontroller/controller.go +++ b/pkg/modifycontroller/controller.go @@ -19,6 +19,7 @@ package modifycontroller import ( "context" "fmt" + "sync" "time" "github.com/kubernetes-csi/external-resizer/pkg/util" @@ -61,7 +62,7 @@ type modifyController struct { vacListerSynced cache.InformerSynced extraModifyMetadata bool // the key of the map is {PVC_NAMESPACE}/{PVC_NAME} - uncertainPVCs map[string]v1.PersistentVolumeClaim + uncertainPVCs sync.Map // slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate. slowSet *slowset.SlowSet } @@ -121,7 +122,6 @@ func NewModifyController( } func (ctrl *modifyController) initUncertainPVCs() error { - ctrl.uncertainPVCs = make(map[string]v1.PersistentVolumeClaim) allPVCs, err := ctrl.pvcLister.List(labels.Everything()) if err != nil { klog.Errorf("Failed to list pvcs when init uncertain pvcs: %v", err) @@ -133,7 +133,7 @@ func (ctrl *modifyController) initUncertainPVCs() error { if err != nil { return err } - ctrl.uncertainPVCs[pvcKey] = *pvc.DeepCopy() + ctrl.uncertainPVCs.Store(pvcKey, pvc) } } @@ -187,10 +187,7 @@ func (ctrl *modifyController) deletePVC(obj interface{}) { } func (ctrl *modifyController) init(ctx context.Context) bool { - informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced} - informersSyncd = append(informersSyncd, ctrl.vacListerSynced) - - if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) { + if !cache.WaitForCacheSync(ctx.Done(), ctrl.pvListerSynced, ctrl.pvcListerSynced, ctrl.vacListerSynced) { klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches") return false } @@ -220,7 +217,7 @@ func (ctrl *modifyController) Run( // Starts go-routine that deletes expired slowSet entries. go ctrl.slowSet.Run(stopCh) - for i := 0; i < workers; i++ { + for range workers { go wait.Until(ctrl.sync, 0, stopCh) } diff --git a/pkg/modifycontroller/controller_test.go b/pkg/modifycontroller/controller_test.go index 3acd842e1..2eb4de4f6 100644 --- a/pkg/modifycontroller/controller_test.go +++ b/pkg/modifycontroller/controller_test.go @@ -1,7 +1,6 @@ package modifycontroller import ( - "context" "errors" "fmt" "testing" @@ -14,7 +13,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -321,6 +319,65 @@ func TestInfeasibleRetry(t *testing.T) { } } +// Intended to catch any race conditions in the controller +func TestConcurrentSync(t *testing.T) { + cases := []struct { + name string + waitCount int + err error + }{ + // TODO: This case is flaky due to fake client lacks resourceVersion support. + // { + // name: "success", + // waitCount: 10, + // }, + { + name: "uncertain", + waitCount: 30, + err: nonFinalErr, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + client.SetModifyError(tc.err) + + initialObjects := []runtime.Object{testVacObject, targetVacObject} + for i := range 10 { + initialObjects = append(initialObjects, + &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: pvcNamespace}, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeAttributesClassName: &testVac, + VolumeName: fmt.Sprintf("testPV-%d", i), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("testPV-%d", i)}, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + Driver: testDriverName, + VolumeHandle: fmt.Sprintf("foo-%d", i), + }, + }, + }, + }, + ) + } + ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) + go ctrlInstance.Run(3, t.Context()) + + for client.GetModifyCount() < tc.waitCount { + time.Sleep(20 * time.Millisecond) + } + }) + } +} + // setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) *modifyController { t.Helper() @@ -329,11 +386,9 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject /* Create fake kubeClient, Informers, and ModifyController */ kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - driverName, _ := client.GetDriverName(context.TODO()) + ctx := t.Context() + driverName, _ := client.GetDriverName(ctx) csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) if err != nil { @@ -346,26 +401,10 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject workqueue.DefaultTypedControllerRateLimiter[string]()) /* Start informers and ModifyController*/ - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - go controller.Run(1, t.Context()) - - /* Add initial objects to informer caches */ - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", t.Name(), obj) - } - } + informerFactory.Start(ctx.Done()) - ctrlInstance, _ := controller.(*modifyController) + ctrlInstance := controller.(*modifyController) + ctrlInstance.init(ctx) return ctrlInstance } diff --git a/pkg/modifycontroller/modify_status.go b/pkg/modifycontroller/modify_status.go index eb7c70320..f39c08419 100644 --- a/pkg/modifycontroller/modify_status.go +++ b/pkg/modifycontroller/modify_status.go @@ -22,7 +22,6 @@ import ( "github.com/kubernetes-csi/external-resizer/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" "k8s.io/utils/ptr" ) @@ -61,16 +60,6 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus( if err != nil { return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err) } - // Remove this PVC from the uncertain cache since the status is known now - if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible { - pvcKey, err := cache.MetaNamespaceKeyFunc(pvc) - if err != nil { - return pvc, err - } - - ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) - ctrl.markForSlowRetry(pvc, pvcKey) - } return updatedPVC, nil } @@ -144,15 +133,3 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition) } return knownConditions } - -// removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache -func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) { - if ctrl.uncertainPVCs == nil { - return - } - // Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc - _, ok := ctrl.uncertainPVCs[pvcKey] - if ok { - delete(ctrl.uncertainPVCs, pvcKey) - } -} diff --git a/pkg/modifycontroller/modify_status_test.go b/pkg/modifycontroller/modify_status_test.go index 50c7de27a..431befc7a 100644 --- a/pkg/modifycontroller/modify_status_test.go +++ b/pkg/modifycontroller/modify_status_test.go @@ -14,13 +14,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" - storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" ) @@ -38,6 +36,7 @@ var ( testDriverName = "mock" infeasibleErr = status.Errorf(codes.InvalidArgument, "Parameters in VolumeAttributesClass is invalid") finalErr = status.Errorf(codes.Internal, "Final error") + nonFinalErr = status.Errorf(codes.Aborted, "Non-final error") pvcConditionInProgress = v1.PersistentVolumeClaimCondition{ Type: v1.PersistentVolumeClaimVolumeModifyingVolume, Status: v1.ConditionTrue, @@ -273,110 +272,6 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) { } } -func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) { - basePVC := testutil.MakeTestPVC([]v1.PersistentVolumeClaimCondition{}) - basePVC.WithModifyVolumeStatus(v1.PersistentVolumeClaimModifyVolumeInProgress) - secondPVC := testutil.GetTestPVC("test-vol0", "2G", "1G", "", "") - secondPVC.Status.Phase = v1.ClaimBound - secondPVC.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{} - secondPVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInfeasible - - tests := []struct { - name string - pvc *v1.PersistentVolumeClaim - }{ - { - name: "should delete the target pvc but keep the others in the cache", - pvc: basePVC.Get(), - }, - } - - for _, test := range tests { - tc := test - t.Run(tc.name, func(t *testing.T) { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true) - client := csi.NewMockClient("foo", true, true, true, true, true, false) - driverName, _ := client.GetDriverName(context.TODO()) - - var initialObjects []runtime.Object - initialObjects = append(initialObjects, test.pvc) - initialObjects = append(initialObjects, secondPVC) - - kubeClient, informerFactory := fakeK8s(initialObjects) - pvInformer := informerFactory.Core().V1().PersistentVolumes() - pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() - podInformer := informerFactory.Core().V1().Pods() - vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses() - - csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName) - if err != nil { - t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err) - } - controller := NewModifyController(driverName, - csiModifier, kubeClient, - time.Second, 2*time.Minute, false, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string]()) - - ctrlInstance, _ := controller.(*modifyController) - - stopCh := make(chan struct{}) - informerFactory.Start(stopCh) - - success := ctrlInstance.init(t.Context()) - if !success { - t.Fatal("failed to init controller") - } - - for _, obj := range initialObjects { - switch obj.(type) { - case *v1.PersistentVolume: - pvInformer.Informer().GetStore().Add(obj) - case *v1.PersistentVolumeClaim: - pvcInformer.Informer().GetStore().Add(obj) - case *v1.Pod: - podInformer.Informer().GetStore().Add(obj) - case *storagev1beta1.VolumeAttributesClass: - vacInformer.Informer().GetStore().Add(obj) - default: - t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj) - } - } - - time.Sleep(time.Second * 2) - - pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) - if err != nil { - t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) - } - ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey) - - deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc) - if err != nil { - t.Errorf("failed to extract pvc key from pvc %v", tc.pvc) - } - _, ok := ctrlInstance.uncertainPVCs[deletedPVCKey] - if ok { - t.Errorf("pvc %v should be deleted but it is still in the uncertainPVCs cache", tc.pvc) - } - if err != nil { - t.Errorf("err get pvc %v from uncertainPVCs: %v", tc.pvc, err) - } - - notDeletedPVCKey, err := cache.MetaNamespaceKeyFunc(secondPVC) - if err != nil { - t.Errorf("failed to extract pvc key from secondPVC %v", secondPVC) - } - _, ok = ctrlInstance.uncertainPVCs[notDeletedPVCKey] - if !ok { - t.Errorf("pvc %v should not be deleted, uncertainPVCs list %v", secondPVC, ctrlInstance.uncertainPVCs) - } - if err != nil { - t.Errorf("err get pvc %v from uncertainPVCs: %v", secondPVC, err) - } - }) - } -} - func createTestPV(capacityGB int, pvcName, pvcNamespace string, pvcUID types.UID, volumeMode *v1.PersistentVolumeMode, vacName string) *v1.PersistentVolume { capacity := testutil.QuantityGB(capacityGB) diff --git a/pkg/modifycontroller/modify_volume.go b/pkg/modifycontroller/modify_volume.go index 7523ab624..763ec02b7 100644 --- a/pkg/modifycontroller/modify_volume.go +++ b/pkg/modifycontroller/modify_volume.go @@ -57,7 +57,7 @@ func (ctrl *modifyController) modify(pvc *v1.PersistentVolumeClaim, pv *v1.Persi return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) } else if pvcSpecVacName != nil && curVacName != nil && *pvcSpecVacName != *curVacName { // Check if PVC in uncertain state - _, inUncertainState := ctrl.uncertainPVCs[pvcKey] + _, inUncertainState := ctrl.uncertainPVCs.Load(pvcKey) if !inUncertainState { klog.V(3).InfoS("previous operation on the PVC failed with a final error, retrying") return ctrl.validateVACAndModifyVolumeWithTarget(pvc, pv) @@ -119,7 +119,7 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( if err == nil { klog.V(4).Infof("Update volumeAttributesClass of PV %q to %s succeeded", pv.Name, *pvcSpecVacName) // Record an event to indicate that modify operation is successful. - ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully ", pvc.Name, vacObj.Name)) + ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeModifySuccess, fmt.Sprintf("external resizer modified volume %s with vac %s successfully", pvc.Name, vacObj.Name)) return pvc, pv, nil, true } else { errStatus, ok := status.FromError(err) @@ -132,9 +132,9 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( if keyErr != nil { return pvc, pv, keyErr, false } - if !util.IsFinalError(keyErr) { + if !util.IsFinalError(err) { // update conditions and cache pvc as uncertain - ctrl.uncertainPVCs[pvcKey] = *pvc + ctrl.uncertainPVCs.Store(pvcKey, pvc) } else { // Only InvalidArgument can be set to Infeasible state // Final errors other than InvalidArgument will still be in InProgress state @@ -146,10 +146,10 @@ func (ctrl *modifyController) controllerModifyVolumeWithTarget( } ctrl.markForSlowRetry(pvc, pvcKey) } - ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey) + ctrl.uncertainPVCs.Delete(pvcKey) } } else { - return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v ", err), false + return pvc, pv, fmt.Errorf("cannot get error status from modify volume err: %v", err), false } // Record an event to indicate that modify operation is failed. ctrl.eventRecorder.Eventf(pvc, v1.EventTypeWarning, util.VolumeModifyFailed, err.Error()) diff --git a/pkg/modifycontroller/modify_volume_test.go b/pkg/modifycontroller/modify_volume_test.go index 97490c478..e35b78d5f 100644 --- a/pkg/modifycontroller/modify_volume_test.go +++ b/pkg/modifycontroller/modify_volume_test.go @@ -1,6 +1,8 @@ package modifycontroller import ( + "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -153,6 +155,44 @@ func TestModify(t *testing.T) { } } +func TestModifyUncertain(t *testing.T) { + basePVC := createTestPVC(pvcName, targetVac /*vacName*/, testVac /*curVacName*/, targetVac /*targetVacName*/) + basePVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInProgress + basePV := createTestPV(1, pvcName, pvcNamespace, "foobaz" /*pvcUID*/, &fsVolumeMode, testVac) + + client := csi.NewMockClient(testDriverName, true, true, true, true, true, false) + initialObjects := []runtime.Object{testVacObject, targetVacObject, basePVC, basePV} + ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects) + + pvcKey := fmt.Sprintf("%s/%s", pvcNamespace, pvcName) + assertUncertain := func(uncertain bool) { + t.Helper() + _, ok := ctrlInstance.uncertainPVCs.Load(pvcKey) + if ok != uncertain { + t.Fatalf("expected uncertain state to be %v, got %v", uncertain, ok) + } + } + + // initialized to uncertain + assertUncertain(true) + + client.SetModifyError(finalErr) + pvc, pv, err, _ := ctrlInstance.modify(basePVC, basePV) + if !errors.Is(err, finalErr) { + t.Fatalf("expected error to be %v, got %v", finalErr, err) + } + // should clear uncertain state + assertUncertain(false) + + client.SetModifyError(nonFinalErr) + _, _, err, _ = ctrlInstance.modify(pvc, pv) + if !errors.Is(err, nonFinalErr) { + t.Fatalf("expected error to be %v, got %v", nonFinalErr, err) + } + // should enter uncertain state again + assertUncertain(true) +} + func createTestPVC(pvcName string, vacName string, curVacName string, targetVacName string) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: pvcNamespace},