Skip to content

Commit 5bfedda

Browse files
committed
fix uncertain cache
fix wrong error is used when checking final error. map is not safe work read/write concurrently from multiple worker. Also fixed that. Added related tests.
1 parent 8a94520 commit 5bfedda

File tree

7 files changed

+118
-170
lines changed

7 files changed

+118
-170
lines changed

pkg/controller/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (ctrl *resizeController) Run(workers int, ctx context.Context) {
291291
go ctrl.slowSet.Run(stopCh)
292292
}
293293

294-
for i := 0; i < workers; i++ {
294+
for range workers {
295295
go wait.Until(ctrl.syncPVCs, 0, stopCh)
296296
}
297297

pkg/modifycontroller/controller.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package modifycontroller
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"time"
2324

2425
"github.com/kubernetes-csi/external-resizer/pkg/util"
@@ -61,7 +62,7 @@ type modifyController struct {
6162
vacListerSynced cache.InformerSynced
6263
extraModifyMetadata bool
6364
// the key of the map is {PVC_NAMESPACE}/{PVC_NAME}
64-
uncertainPVCs map[string]v1.PersistentVolumeClaim
65+
uncertainPVCs sync.Map
6566
// slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate.
6667
slowSet *slowset.SlowSet
6768
}
@@ -121,7 +122,6 @@ func NewModifyController(
121122
}
122123

123124
func (ctrl *modifyController) initUncertainPVCs() error {
124-
ctrl.uncertainPVCs = make(map[string]v1.PersistentVolumeClaim)
125125
allPVCs, err := ctrl.pvcLister.List(labels.Everything())
126126
if err != nil {
127127
klog.Errorf("Failed to list pvcs when init uncertain pvcs: %v", err)
@@ -133,7 +133,7 @@ func (ctrl *modifyController) initUncertainPVCs() error {
133133
if err != nil {
134134
return err
135135
}
136-
ctrl.uncertainPVCs[pvcKey] = *pvc.DeepCopy()
136+
ctrl.uncertainPVCs.Store(pvcKey, pvc)
137137
}
138138
}
139139

@@ -187,10 +187,7 @@ func (ctrl *modifyController) deletePVC(obj interface{}) {
187187
}
188188

189189
func (ctrl *modifyController) init(ctx context.Context) bool {
190-
informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced}
191-
informersSyncd = append(informersSyncd, ctrl.vacListerSynced)
192-
193-
if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) {
190+
if !cache.WaitForCacheSync(ctx.Done(), ctrl.pvListerSynced, ctrl.pvcListerSynced, ctrl.vacListerSynced) {
194191
klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches")
195192
return false
196193
}
@@ -220,7 +217,7 @@ func (ctrl *modifyController) Run(
220217
// Starts go-routine that deletes expired slowSet entries.
221218
go ctrl.slowSet.Run(stopCh)
222219

223-
for i := 0; i < workers; i++ {
220+
for range workers {
224221
go wait.Until(ctrl.sync, 0, stopCh)
225222
}
226223

pkg/modifycontroller/controller_test.go

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package modifycontroller
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
76
"testing"
@@ -14,7 +13,6 @@ import (
1413
"google.golang.org/grpc/codes"
1514
"google.golang.org/grpc/status"
1615
v1 "k8s.io/api/core/v1"
17-
storagev1beta1 "k8s.io/api/storage/v1beta1"
1816
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1917
"k8s.io/apimachinery/pkg/runtime"
2018
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -321,6 +319,65 @@ func TestInfeasibleRetry(t *testing.T) {
321319
}
322320
}
323321

322+
// Intended to catch any race conditions in the controller
323+
func TestConcurrentSync(t *testing.T) {
324+
cases := []struct {
325+
name string
326+
waitCount int
327+
err error
328+
}{
329+
// TODO: This case is flaky due to fake client lacks resourceVersion support.
330+
// {
331+
// name: "success",
332+
// waitCount: 10,
333+
// },
334+
{
335+
name: "uncertain",
336+
waitCount: 30,
337+
err: nonFinalErr,
338+
},
339+
}
340+
for _, tc := range cases {
341+
t.Run(tc.name, func(t *testing.T) {
342+
client := csi.NewMockClient(testDriverName, true, true, true, true, true)
343+
client.SetModifyError(tc.err)
344+
345+
initialObjects := []runtime.Object{testVacObject, targetVacObject}
346+
for i := range 10 {
347+
initialObjects = append(initialObjects,
348+
&v1.PersistentVolumeClaim{
349+
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: pvcNamespace},
350+
Spec: v1.PersistentVolumeClaimSpec{
351+
VolumeAttributesClassName: &testVac,
352+
VolumeName: fmt.Sprintf("testPV-%d", i),
353+
},
354+
Status: v1.PersistentVolumeClaimStatus{
355+
Phase: v1.ClaimBound,
356+
},
357+
},
358+
&v1.PersistentVolume{
359+
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("testPV-%d", i)},
360+
Spec: v1.PersistentVolumeSpec{
361+
PersistentVolumeSource: v1.PersistentVolumeSource{
362+
CSI: &v1.CSIPersistentVolumeSource{
363+
Driver: testDriverName,
364+
VolumeHandle: fmt.Sprintf("foo-%d", i),
365+
},
366+
},
367+
},
368+
},
369+
)
370+
}
371+
ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects)
372+
go ctrlInstance.Run(3, t.Context())
373+
374+
for client.GetModifyCount() < tc.waitCount {
375+
time.Sleep(20 * time.Millisecond)
376+
}
377+
})
378+
}
379+
}
380+
324381
// setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController
325382
func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) *modifyController {
326383
t.Helper()
@@ -329,11 +386,9 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject
329386

330387
/* Create fake kubeClient, Informers, and ModifyController */
331388
kubeClient, informerFactory := fakeK8s(initialObjects)
332-
pvInformer := informerFactory.Core().V1().PersistentVolumes()
333-
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
334-
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()
335389

336-
driverName, _ := client.GetDriverName(context.TODO())
390+
ctx := t.Context()
391+
driverName, _ := client.GetDriverName(ctx)
337392

338393
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
339394
if err != nil {
@@ -346,26 +401,10 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject
346401
workqueue.DefaultTypedControllerRateLimiter[string]())
347402

348403
/* Start informers and ModifyController*/
349-
stopCh := make(chan struct{})
350-
informerFactory.Start(stopCh)
351-
352-
go controller.Run(1, t.Context())
353-
354-
/* Add initial objects to informer caches */
355-
for _, obj := range initialObjects {
356-
switch obj.(type) {
357-
case *v1.PersistentVolume:
358-
pvInformer.Informer().GetStore().Add(obj)
359-
case *v1.PersistentVolumeClaim:
360-
pvcInformer.Informer().GetStore().Add(obj)
361-
case *storagev1beta1.VolumeAttributesClass:
362-
vacInformer.Informer().GetStore().Add(obj)
363-
default:
364-
t.Fatalf("Test %s: Unknown initalObject type: %+v", t.Name(), obj)
365-
}
366-
}
404+
informerFactory.Start(ctx.Done())
367405

368-
ctrlInstance, _ := controller.(*modifyController)
406+
ctrl := controller.(*modifyController)
407+
ctrl.init(ctx)
369408

370-
return ctrlInstance
409+
return controller.(*modifyController)
371410
}

pkg/modifycontroller/modify_status.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/kubernetes-csi/external-resizer/pkg/util"
2323
v1 "k8s.io/api/core/v1"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25-
"k8s.io/client-go/tools/cache"
2625
"k8s.io/utils/ptr"
2726
)
2827

@@ -61,16 +60,6 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus(
6160
if err != nil {
6261
return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err)
6362
}
64-
// Remove this PVC from the uncertain cache since the status is known now
65-
if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible {
66-
pvcKey, err := cache.MetaNamespaceKeyFunc(pvc)
67-
if err != nil {
68-
return pvc, err
69-
}
70-
71-
ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey)
72-
ctrl.markForSlowRetry(pvc, pvcKey)
73-
}
7463
return updatedPVC, nil
7564
}
7665

@@ -144,15 +133,3 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition)
144133
}
145134
return knownConditions
146135
}
147-
148-
// removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache
149-
func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) {
150-
if ctrl.uncertainPVCs == nil {
151-
return
152-
}
153-
// Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc
154-
_, ok := ctrl.uncertainPVCs[pvcKey]
155-
if ok {
156-
delete(ctrl.uncertainPVCs, pvcKey)
157-
}
158-
}

pkg/modifycontroller/modify_status_test.go

Lines changed: 1 addition & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@ import (
1414
"google.golang.org/grpc/codes"
1515
"google.golang.org/grpc/status"
1616
v1 "k8s.io/api/core/v1"
17-
storagev1beta1 "k8s.io/api/storage/v1beta1"
1817
"k8s.io/apimachinery/pkg/api/resource"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2019
"k8s.io/apimachinery/pkg/runtime"
2120
"k8s.io/apimachinery/pkg/types"
2221
utilfeature "k8s.io/apiserver/pkg/util/feature"
23-
"k8s.io/client-go/tools/cache"
2422
"k8s.io/client-go/util/workqueue"
2523
featuregatetesting "k8s.io/component-base/featuregate/testing"
2624
)
@@ -38,6 +36,7 @@ var (
3836
testDriverName = "mock"
3937
infeasibleErr = status.Errorf(codes.InvalidArgument, "Parameters in VolumeAttributesClass is invalid")
4038
finalErr = status.Errorf(codes.Internal, "Final error")
39+
nonFinalErr = status.Errorf(codes.Aborted, "Non-final error")
4140
pvcConditionInProgress = v1.PersistentVolumeClaimCondition{
4241
Type: v1.PersistentVolumeClaimVolumeModifyingVolume,
4342
Status: v1.ConditionTrue,
@@ -273,110 +272,6 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {
273272
}
274273
}
275274

276-
func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
277-
basePVC := testutil.MakeTestPVC([]v1.PersistentVolumeClaimCondition{})
278-
basePVC.WithModifyVolumeStatus(v1.PersistentVolumeClaimModifyVolumeInProgress)
279-
secondPVC := testutil.GetTestPVC("test-vol0", "2G", "1G", "", "")
280-
secondPVC.Status.Phase = v1.ClaimBound
281-
secondPVC.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{}
282-
secondPVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInfeasible
283-
284-
tests := []struct {
285-
name string
286-
pvc *v1.PersistentVolumeClaim
287-
}{
288-
{
289-
name: "should delete the target pvc but keep the others in the cache",
290-
pvc: basePVC.Get(),
291-
},
292-
}
293-
294-
for _, test := range tests {
295-
tc := test
296-
t.Run(tc.name, func(t *testing.T) {
297-
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
298-
client := csi.NewMockClient("foo", true, true, true, true, true)
299-
driverName, _ := client.GetDriverName(context.TODO())
300-
301-
var initialObjects []runtime.Object
302-
initialObjects = append(initialObjects, test.pvc)
303-
initialObjects = append(initialObjects, secondPVC)
304-
305-
kubeClient, informerFactory := fakeK8s(initialObjects)
306-
pvInformer := informerFactory.Core().V1().PersistentVolumes()
307-
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
308-
podInformer := informerFactory.Core().V1().Pods()
309-
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()
310-
311-
csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
312-
if err != nil {
313-
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
314-
}
315-
controller := NewModifyController(driverName,
316-
csiModifier, kubeClient,
317-
time.Second, 2*time.Minute, false, informerFactory,
318-
workqueue.DefaultTypedControllerRateLimiter[string]())
319-
320-
ctrlInstance, _ := controller.(*modifyController)
321-
322-
stopCh := make(chan struct{})
323-
informerFactory.Start(stopCh)
324-
325-
success := ctrlInstance.init(t.Context())
326-
if !success {
327-
t.Fatal("failed to init controller")
328-
}
329-
330-
for _, obj := range initialObjects {
331-
switch obj.(type) {
332-
case *v1.PersistentVolume:
333-
pvInformer.Informer().GetStore().Add(obj)
334-
case *v1.PersistentVolumeClaim:
335-
pvcInformer.Informer().GetStore().Add(obj)
336-
case *v1.Pod:
337-
podInformer.Informer().GetStore().Add(obj)
338-
case *storagev1beta1.VolumeAttributesClass:
339-
vacInformer.Informer().GetStore().Add(obj)
340-
default:
341-
t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj)
342-
}
343-
}
344-
345-
time.Sleep(time.Second * 2)
346-
347-
pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
348-
if err != nil {
349-
t.Errorf("failed to extract pvc key from pvc %v", tc.pvc)
350-
}
351-
ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey)
352-
353-
deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
354-
if err != nil {
355-
t.Errorf("failed to extract pvc key from pvc %v", tc.pvc)
356-
}
357-
_, ok := ctrlInstance.uncertainPVCs[deletedPVCKey]
358-
if ok {
359-
t.Errorf("pvc %v should be deleted but it is still in the uncertainPVCs cache", tc.pvc)
360-
}
361-
if err != nil {
362-
t.Errorf("err get pvc %v from uncertainPVCs: %v", tc.pvc, err)
363-
}
364-
365-
notDeletedPVCKey, err := cache.MetaNamespaceKeyFunc(secondPVC)
366-
if err != nil {
367-
t.Errorf("failed to extract pvc key from secondPVC %v", secondPVC)
368-
}
369-
_, ok = ctrlInstance.uncertainPVCs[notDeletedPVCKey]
370-
if !ok {
371-
t.Errorf("pvc %v should not be deleted, uncertainPVCs list %v", secondPVC, ctrlInstance.uncertainPVCs)
372-
}
373-
if err != nil {
374-
t.Errorf("err get pvc %v from uncertainPVCs: %v", secondPVC, err)
375-
}
376-
})
377-
}
378-
}
379-
380275
func createTestPV(capacityGB int, pvcName, pvcNamespace string, pvcUID types.UID, volumeMode *v1.PersistentVolumeMode, vacName string) *v1.PersistentVolume {
381276
capacity := testutil.QuantityGB(capacityGB)
382277

0 commit comments

Comments
 (0)