Skip to content

fix uncertain cache #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
k8s.io/component-base v0.33.1
k8s.io/csi-translation-lib v0.33.0
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20241210054802-24370beab758
)

require (
Expand Down Expand Up @@ -69,7 +70,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (ctrl *resizeController) Run(workers int, ctx context.Context) {
go ctrl.slowSet.Run(stopCh)
}

for i := 0; i < workers; i++ {
for range workers {
go wait.Until(ctrl.syncPVCs, 0, stopCh)
}

Expand Down
13 changes: 5 additions & 8 deletions pkg/modifycontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package modifycontroller
import (
"context"
"fmt"
"sync"
"time"

"github.com/kubernetes-csi/external-resizer/pkg/util"
Expand Down Expand Up @@ -61,7 +62,7 @@ type modifyController struct {
vacListerSynced cache.InformerSynced
extraModifyMetadata bool
// the key of the map is {PVC_NAMESPACE}/{PVC_NAME}
uncertainPVCs map[string]v1.PersistentVolumeClaim
uncertainPVCs sync.Map
// slowSet tracks PVCs for which modification failed with infeasible error and should be retried at slower rate.
slowSet *slowset.SlowSet
}
Expand Down Expand Up @@ -121,7 +122,6 @@ func NewModifyController(
}

func (ctrl *modifyController) initUncertainPVCs() error {
ctrl.uncertainPVCs = make(map[string]v1.PersistentVolumeClaim)
allPVCs, err := ctrl.pvcLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list pvcs when init uncertain pvcs: %v", err)
Expand All @@ -133,7 +133,7 @@ func (ctrl *modifyController) initUncertainPVCs() error {
if err != nil {
return err
}
ctrl.uncertainPVCs[pvcKey] = *pvc.DeepCopy()
ctrl.uncertainPVCs.Store(pvcKey, pvc)
}
}

Expand Down Expand Up @@ -187,10 +187,7 @@ func (ctrl *modifyController) deletePVC(obj interface{}) {
}

func (ctrl *modifyController) init(ctx context.Context) bool {
informersSyncd := []cache.InformerSynced{ctrl.pvListerSynced, ctrl.pvcListerSynced}
informersSyncd = append(informersSyncd, ctrl.vacListerSynced)

if !cache.WaitForCacheSync(ctx.Done(), informersSyncd...) {
if !cache.WaitForCacheSync(ctx.Done(), ctrl.pvListerSynced, ctrl.pvcListerSynced, ctrl.vacListerSynced) {
klog.ErrorS(nil, "Cannot sync pod, pv, pvc or vac caches")
return false
}
Expand Down Expand Up @@ -220,7 +217,7 @@ func (ctrl *modifyController) Run(
// Starts go-routine that deletes expired slowSet entries.
go ctrl.slowSet.Run(stopCh)

for i := 0; i < workers; i++ {
for range workers {
go wait.Until(ctrl.sync, 0, stopCh)
}

Expand Down
89 changes: 64 additions & 25 deletions pkg/modifycontroller/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package modifycontroller

import (
"context"
"errors"
"fmt"
"testing"
Expand All @@ -14,7 +13,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -321,6 +319,65 @@ func TestInfeasibleRetry(t *testing.T) {
}
}

// Intended to catch any race conditions in the controller
func TestConcurrentSync(t *testing.T) {
cases := []struct {
name string
waitCount int
err error
}{
// TODO: This case is flaky due to fake client lacks resourceVersion support.
// {
// name: "success",
// waitCount: 10,
// },
Comment on lines +329 to +333
Copy link
Contributor Author

@huww98 huww98 Jul 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a PVC is marked as completed, it is re-queued. In the next cycle, we may still read out the PVC with InProgress status. Then we should have conflict when trying to mark it InProgress again. However, fake client does not supports resourceVersion. So when we pass an empty patch to it, it returns the latest PVC, with ModifyVolumeStatus == nil, which will make markControllerModifyVolumeCompleted panic.

Should we somehow make metadata.generation work for PVC? So that we can cache the already synced generation+VACName, and will know the PVC in the cache is outdated, and just skip that cycle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand how it works, the metadata.generation field in a Kubernetes object is a sequence number that is only incremented when there is a change to the object's spec.

But ModifyVolumeStatus is in Status.ModifyVolumeStatus, therefore, caching the generation number would be ineffective? It wouldn't tell you if the status has been updated, so you would still be working with stale data?

Also Hemant raised an issue: #514 that we will fix before GA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan is: we maintain a global sync.Map which maps from object UID to the generation that is has been fully reconciled. We add to this map when ControllerModifyVolume finishes with OK and specVacName matches targetVACName. We clear it when the object is deleted. When the next cycle starts, we just skip if the generation matches. This should ensure we will not sync the same VAC at the same generation twice.

Anyway, I think this is minor and should not block this PR.

Also Hemant raised an issue: #514 that we will fix before GA.

Yes, I think this PR should fix that one.

{
name: "uncertain",
waitCount: 30,
err: nonFinalErr,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
client := csi.NewMockClient(testDriverName, true, true, true, true, true, false)
client.SetModifyError(tc.err)

initialObjects := []runtime.Object{testVacObject, targetVacObject}
for i := range 10 {
initialObjects = append(initialObjects,
&v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: pvcNamespace},
Spec: v1.PersistentVolumeClaimSpec{
VolumeAttributesClassName: &testVac,
VolumeName: fmt.Sprintf("testPV-%d", i),
},
Status: v1.PersistentVolumeClaimStatus{
Phase: v1.ClaimBound,
},
},
&v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("testPV-%d", i)},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: testDriverName,
VolumeHandle: fmt.Sprintf("foo-%d", i),
},
},
},
},
)
}
ctrlInstance := setupFakeK8sEnvironment(t, client, initialObjects)
go ctrlInstance.Run(3, t.Context())

for client.GetModifyCount() < tc.waitCount {
time.Sleep(20 * time.Millisecond)
}
})
}
}

// setupFakeK8sEnvironment creates fake K8s environment and starts Informers and ModifyController
func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObjects []runtime.Object) *modifyController {
t.Helper()
Expand All @@ -329,11 +386,9 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject

/* Create fake kubeClient, Informers, and ModifyController */
kubeClient, informerFactory := fakeK8s(initialObjects)
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()

driverName, _ := client.GetDriverName(context.TODO())
ctx := t.Context()
driverName, _ := client.GetDriverName(ctx)

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
Expand All @@ -346,26 +401,10 @@ func setupFakeK8sEnvironment(t *testing.T, client *csi.MockClient, initialObject
workqueue.DefaultTypedControllerRateLimiter[string]())

/* Start informers and ModifyController*/
stopCh := make(chan struct{})
informerFactory.Start(stopCh)

go controller.Run(1, t.Context())

/* Add initial objects to informer caches */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add them manually, as informer will read them from kubeClient. ctrl.init(ctx) will wait for cache sync.

for _, obj := range initialObjects {
switch obj.(type) {
case *v1.PersistentVolume:
pvInformer.Informer().GetStore().Add(obj)
case *v1.PersistentVolumeClaim:
pvcInformer.Informer().GetStore().Add(obj)
case *storagev1beta1.VolumeAttributesClass:
vacInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Test %s: Unknown initalObject type: %+v", t.Name(), obj)
}
}
informerFactory.Start(ctx.Done())

ctrlInstance, _ := controller.(*modifyController)
ctrlInstance := controller.(*modifyController)
ctrlInstance.init(ctx)

return ctrlInstance
}
23 changes: 0 additions & 23 deletions pkg/modifycontroller/modify_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/kubernetes-csi/external-resizer/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)

Expand Down Expand Up @@ -61,16 +60,6 @@ func (ctrl *modifyController) markControllerModifyVolumeStatus(
if err != nil {
return pvc, fmt.Errorf("mark PVC %q as modify volume failed, errored with: %v", pvc.Name, err)
}
// Remove this PVC from the uncertain cache since the status is known now
if modifyVolumeStatus == v1.PersistentVolumeClaimModifyVolumeInfeasible {
pvcKey, err := cache.MetaNamespaceKeyFunc(pvc)
if err != nil {
return pvc, err
}

ctrl.removePVCFromModifyVolumeUncertainCache(pvcKey)
ctrl.markForSlowRetry(pvc, pvcKey)
Comment on lines -71 to -72
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done by caller, no need to do it again.

}
return updatedPVC, nil
}

Expand Down Expand Up @@ -144,15 +133,3 @@ func clearModifyVolumeConditions(conditions []v1.PersistentVolumeClaimCondition)
}
return knownConditions
}

// removePVCFromModifyVolumeUncertainCache removes the pvc from the uncertain cache
func (ctrl *modifyController) removePVCFromModifyVolumeUncertainCache(pvcKey string) {
if ctrl.uncertainPVCs == nil {
return
}
// Format of the key of the uncertainPVCs is NAMESPACE/NAME of the pvc
_, ok := ctrl.uncertainPVCs[pvcKey]
if ok {
delete(ctrl.uncertainPVCs, pvcKey)
}
}
107 changes: 1 addition & 106 deletions pkg/modifycontroller/modify_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
Expand All @@ -38,6 +36,7 @@ var (
testDriverName = "mock"
infeasibleErr = status.Errorf(codes.InvalidArgument, "Parameters in VolumeAttributesClass is invalid")
finalErr = status.Errorf(codes.Internal, "Final error")
nonFinalErr = status.Errorf(codes.Aborted, "Non-final error")
pvcConditionInProgress = v1.PersistentVolumeClaimCondition{
Type: v1.PersistentVolumeClaimVolumeModifyingVolume,
Status: v1.ConditionTrue,
Expand Down Expand Up @@ -273,110 +272,6 @@ func TestMarkControllerModifyVolumeCompleted(t *testing.T) {
}
}

func TestRemovePVCFromModifyVolumeUncertainCache(t *testing.T) {
basePVC := testutil.MakeTestPVC([]v1.PersistentVolumeClaimCondition{})
basePVC.WithModifyVolumeStatus(v1.PersistentVolumeClaimModifyVolumeInProgress)
secondPVC := testutil.GetTestPVC("test-vol0", "2G", "1G", "", "")
secondPVC.Status.Phase = v1.ClaimBound
secondPVC.Status.ModifyVolumeStatus = &v1.ModifyVolumeStatus{}
secondPVC.Status.ModifyVolumeStatus.Status = v1.PersistentVolumeClaimModifyVolumeInfeasible

tests := []struct {
name string
pvc *v1.PersistentVolumeClaim
}{
{
name: "should delete the target pvc but keep the others in the cache",
pvc: basePVC.Get(),
},
}

for _, test := range tests {
tc := test
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeAttributesClass, true)
client := csi.NewMockClient("foo", true, true, true, true, true, false)
driverName, _ := client.GetDriverName(context.TODO())

var initialObjects []runtime.Object
initialObjects = append(initialObjects, test.pvc)
initialObjects = append(initialObjects, secondPVC)

kubeClient, informerFactory := fakeK8s(initialObjects)
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
podInformer := informerFactory.Core().V1().Pods()
vacInformer := informerFactory.Storage().V1beta1().VolumeAttributesClasses()

csiModifier, err := modifier.NewModifierFromClient(client, 15*time.Second, kubeClient, informerFactory, false, driverName)
if err != nil {
t.Fatalf("Test %s: Unable to create modifier: %v", test.name, err)
}
controller := NewModifyController(driverName,
csiModifier, kubeClient,
time.Second, 2*time.Minute, false, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string]())

ctrlInstance, _ := controller.(*modifyController)

stopCh := make(chan struct{})
informerFactory.Start(stopCh)

success := ctrlInstance.init(t.Context())
if !success {
t.Fatal("failed to init controller")
}

for _, obj := range initialObjects {
switch obj.(type) {
case *v1.PersistentVolume:
pvInformer.Informer().GetStore().Add(obj)
case *v1.PersistentVolumeClaim:
pvcInformer.Informer().GetStore().Add(obj)
case *v1.Pod:
podInformer.Informer().GetStore().Add(obj)
case *storagev1beta1.VolumeAttributesClass:
vacInformer.Informer().GetStore().Add(obj)
default:
t.Fatalf("Test %s: Unknown initalObject type: %+v", test.name, obj)
}
}

time.Sleep(time.Second * 2)

pvcKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
if err != nil {
t.Errorf("failed to extract pvc key from pvc %v", tc.pvc)
}
ctrlInstance.removePVCFromModifyVolumeUncertainCache(pvcKey)

deletedPVCKey, err := cache.MetaNamespaceKeyFunc(tc.pvc)
if err != nil {
t.Errorf("failed to extract pvc key from pvc %v", tc.pvc)
}
_, ok := ctrlInstance.uncertainPVCs[deletedPVCKey]
if ok {
t.Errorf("pvc %v should be deleted but it is still in the uncertainPVCs cache", tc.pvc)
}
if err != nil {
t.Errorf("err get pvc %v from uncertainPVCs: %v", tc.pvc, err)
}

notDeletedPVCKey, err := cache.MetaNamespaceKeyFunc(secondPVC)
if err != nil {
t.Errorf("failed to extract pvc key from secondPVC %v", secondPVC)
}
_, ok = ctrlInstance.uncertainPVCs[notDeletedPVCKey]
if !ok {
t.Errorf("pvc %v should not be deleted, uncertainPVCs list %v", secondPVC, ctrlInstance.uncertainPVCs)
}
if err != nil {
t.Errorf("err get pvc %v from uncertainPVCs: %v", secondPVC, err)
}
})
}
}

func createTestPV(capacityGB int, pvcName, pvcNamespace string, pvcUID types.UID, volumeMode *v1.PersistentVolumeMode, vacName string) *v1.PersistentVolume {
capacity := testutil.QuantityGB(capacityGB)

Expand Down
Loading