From 961ec42bda490505fba89a3dbf084daf61cd3cf5 Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Fri, 21 Nov 2025 21:33:17 +0530 Subject: [PATCH 1/8] Initial implementation of using informer caches for snapshots --- go.mod | 2 +- go.sum | 4 +- .../k8sorchestrator/k8sorchestrator.go | 76 +++++++++++++++++++ pkg/kubernetes/informers.go | 60 ++++++++------- pkg/kubernetes/types.go | 8 ++ 5 files changed, 121 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index b8cb9c48cd..5fd4d7e41e 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-version v1.6.0 github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 - github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 + github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.0 github.com/pkg/sftp v1.13.6 diff --git a/go.sum b/go.sum index 18e3eeb7bc..c0d7826e44 100644 --- a/go.sum +++ b/go.sum @@ -178,8 +178,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 h1:tVPvlL5N5X598hrO3g9rhyoi6h0LP4RpSJlGHItsbEE= github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1/go.mod h1:pacx+PW7lLlu6kAvpr8Lgq/5fdiAsKxOtXXFHMaLMb8= -github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 h1:Q3jQ1NkFqv5o+F8dMmHd8SfEmlcwNeo1immFApntEwE= -github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 h1:bMqrb3UHgHbP+PW9VwiejfDJU1R0PpXVZNMdeH8WYKI= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 752758eb41..34ae86861f 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -29,6 +29,7 @@ import ( "sync/atomic" "time" + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" "google.golang.org/grpc/codes" "k8s.io/client-go/util/retry" @@ -239,6 +240,31 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) { return volumeName, found } +type namespacedName struct { + namespace, name string +} + +func (n namespacedName) String() string { + return fmt.Sprintf("%s/%s", n.namespace, n.name) +} + +type snapshotInfo struct { + snapshotName string + snapshotContentName string +} + +func (s snapshotInfo) String() string { + return fmt.Sprintf("name: %s, content name: %s", s.snapshotName, s.snapshotContentName) +} + +// pvcToSnapshotsMap maps a PVC to its snapshots. +// Key is the namespaced name of the PVC and value is a map. +// The key of the inner map is the namespaced name of the snapshot. +type pvcToSnapshotsMap struct { + *sync.RWMutex + items map[namespacedName]map[namespacedName]snapshotInfo +} + // K8sOrchestrator defines set of properties specific to K8s. type K8sOrchestrator struct { supervisorFSS FSSConfigMapInfo @@ -251,6 +277,7 @@ type K8sOrchestrator struct { nodeIDToNameMap *nodeIDToNameMap volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled + pvcToSnapshotsMap *pvcToSnapshotsMap k8sClient clientset.Interface snapshotterClient snapshotterClientSet.Interface // pvcUIDCache maps PVC UID to its namespaced name (namespace/name). @@ -381,6 +408,12 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn } } + // Initialize the map for pvc to snapshots + err := initPVCToSnapshotsMap(ctx, controllerClusterFlavor) + if err != nil { + return nil, fmt.Errorf("failed to create PVC to snapshots map. Error: %v", err) + } + k8sOrchestratorInstance.informerManager.Listen() atomic.StoreUint32(&k8sOrchestratorInstanceInitialized, 1) log.Info("k8sOrchestratorInstance initialized") @@ -2286,3 +2319,46 @@ func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1 } return &dataSource, nil } + +func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { + log := logger.GetLogger(ctx) + if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload { + // PVC to VolumeSnapshot mapping is only required for WCP. + return nil + } + + log.Debugf("Initializing pvc namespaced name to volumesnapshot names map") + k8sOrchestratorInstance.pvcToSnapshotsMap = &pvcToSnapshotsMap{ + RWMutex: &sync.RWMutex{}, + items: make(map[namespacedName]map[namespacedName]snapshotInfo), + } + + snapshotAdded := func(obj interface{}) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("snapshotAdded: unrecognized object %+v", obj) + return + } + + // TODO: implement + log.Infof("snapshotAdded: snapshot=%v", snap) + } + + err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, + func(obj interface{}) { + snapshotAdded(obj) + }, + func(oldObj, newObj interface{}) { + // TODO: implement + log.Info("snapshotUpdated") + }, + func(obj interface{}) { + // TODO: implement + log.Info("snapshotDeleted") + }) + if err != nil { + return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) + } + + return nil +} diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index 51b659a30e..b6a95224e6 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions" "k8s.io/client-go/informers" v1 "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -74,11 +75,19 @@ func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt informerInstance = supervisorInformerManagerInstance } + // TODO: check if callers can pass this + snapClient, err := NewSnapshotterClient(ctx) + if err != nil { + // TODO: handle error appropriately + log.Fatalf("unable to initialise snapshot client") + } + if informerInstance == nil { informerInstance = &InformerManager{ - client: client, - stopCh: signals.SetupSignalHandler().Done(), - informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), + client: client, + stopCh: signals.SetupSignalHandler().Done(), + informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), + snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapClient, 0), } if inClusterClnt { @@ -256,6 +265,28 @@ func (im *InformerManager) AddVolumeAttachmentListener(ctx context.Context, add return nil } +// AddSnapshotListener hooks up add, update, delete callbacks. +func (im *InformerManager) AddSnapshotListener(ctx context.Context, add func(obj interface{}), + update func(oldObj, newObj interface{}), remove func(obj interface{})) error { + log := logger.GetLogger(ctx) + if im.snapshotInformer == nil { + im.snapshotInformer = im.snapshotInformerFactory. + Snapshot().V1().VolumeSnapshots().Informer() + } + + _, err := im.snapshotInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) + if err != nil { + return logger.LogNewErrorf( + log, "failed to add event handler on snapshot listener. Error: %v", err) + } + + return nil +} + // GetPVLister returns PV Lister for the calling informer manager. func (im *InformerManager) GetPVLister() corelisters.PersistentVolumeLister { return im.informerFactory.Core().V1().PersistentVolumes().Lister() @@ -287,26 +318,3 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) { } return im.stopCh } - -// NewConfigMapListener creates a new configmap listener in the given namespace. -// NOTE: This creates a NewSharedIndexInformer everytime and does not use the informer factory. -// Only use this function when you need a configmap listener in a different namespace than the -// one already present in the informer factory. -func NewConfigMapListener(ctx context.Context, client clientset.Interface, namespace string, - add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) error { - log := logger.GetLogger(ctx) - configMapInformer := v1.NewFilteredConfigMapInformer(client, namespace, resyncPeriodConfigMapInformer, - cache.Indexers{}, nil) - - _, err := configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: add, - UpdateFunc: update, - DeleteFunc: remove, - }) - if err != nil { - return logger.LogNewErrorf(log, "failed to add event handler on configmap listener. Error: %v", err) - } - stopCh := make(chan struct{}) - go configMapInformer.Run(stopCh) - return nil -} diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go index 3022317f70..c3aebe79e0 100644 --- a/pkg/kubernetes/types.go +++ b/pkg/kubernetes/types.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -50,6 +51,10 @@ type InformerManager struct { client clientset.Interface // main shared informer factory informerFactory informers.SharedInformerFactory + + // snapshotInformerFactory - TODO: update this + snapshotInformerFactory externalversions.SharedInformerFactory + // main signal stopCh (<-chan struct{}) @@ -83,4 +88,7 @@ type InformerManager struct { // volume attachment informer volumeAttachmentInformer cache.SharedInformer + + // snapshot informer + snapshotInformer cache.SharedInformer } From 00fd7a312c07595235b15b5d9f111229e558fa3a Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Sat, 22 Nov 2025 01:31:44 +0530 Subject: [PATCH 2/8] Updated Listen logic of informer manager to start snapshot sync --- pkg/kubernetes/informers.go | 12 ++++++++++++ pkg/kubernetes/types.go | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index b6a95224e6..3a6032a267 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -18,6 +18,7 @@ package kubernetes import ( "context" + "fmt" "sync" "time" @@ -316,5 +317,16 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) { } } + + go im.snapshotInformerFactory.Start(im.stopCh) + cacheSync := im.snapshotInformerFactory.WaitForCacheSync(im.stopCh) + // TODO: remove + fmt.Print(cacheSync) + for _, isSynced := range cacheSync { + if !isSynced { + return + } + } + return im.stopCh } diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go index c3aebe79e0..cce4c08dc9 100644 --- a/pkg/kubernetes/types.go +++ b/pkg/kubernetes/types.go @@ -56,7 +56,7 @@ type InformerManager struct { snapshotInformerFactory externalversions.SharedInformerFactory // main signal - stopCh (<-chan struct{}) + stopCh <-chan struct{} // node informer nodeInformer cache.SharedInformer From 604c9fca5eb0e6c35c2a8292bcdac7196c015973 Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Sat, 22 Nov 2025 02:24:06 +0530 Subject: [PATCH 3/8] Implemented handlers for snapshot create and delete events --- .../k8sorchestrator/k8sorchestrator.go | 84 ++++++++++++++----- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 34ae86861f..2fe990eeff 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -248,21 +248,12 @@ func (n namespacedName) String() string { return fmt.Sprintf("%s/%s", n.namespace, n.name) } -type snapshotInfo struct { - snapshotName string - snapshotContentName string -} - -func (s snapshotInfo) String() string { - return fmt.Sprintf("name: %s, content name: %s", s.snapshotName, s.snapshotContentName) -} - // pvcToSnapshotsMap maps a PVC to its snapshots. // Key is the namespaced name of the PVC and value is a map. // The key of the inner map is the namespaced name of the snapshot. type pvcToSnapshotsMap struct { *sync.RWMutex - items map[namespacedName]map[namespacedName]snapshotInfo + items map[namespacedName]map[namespacedName]struct{} } // K8sOrchestrator defines set of properties specific to K8s. @@ -277,7 +268,7 @@ type K8sOrchestrator struct { nodeIDToNameMap *nodeIDToNameMap volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled - pvcToSnapshotsMap *pvcToSnapshotsMap + pvcToSnapshotsMap pvcToSnapshotsMap k8sClient clientset.Interface snapshotterClient snapshotterClientSet.Interface // pvcUIDCache maps PVC UID to its namespaced name (namespace/name). @@ -2327,34 +2318,85 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes return nil } - log.Debugf("Initializing pvc namespaced name to volumesnapshot names map") - k8sOrchestratorInstance.pvcToSnapshotsMap = &pvcToSnapshotsMap{ + k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ RWMutex: &sync.RWMutex{}, - items: make(map[namespacedName]map[namespacedName]snapshotInfo), + items: make(map[namespacedName]map[namespacedName]struct{}), } - snapshotAdded := func(obj interface{}) { + snapshotAdded := func(obj any) { snap, ok := obj.(*snapshotv1.VolumeSnapshot) if !ok || snap == nil { log.Warnf("snapshotAdded: unrecognized object %+v", obj) return } - // TODO: implement log.Infof("snapshotAdded: snapshot=%v", snap) + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshotAdded: snapshot is not associated with any PVC. Ignoring it...") + return + } + + k8sOrchestratorInstance.pvcToSnapshotsMap.Lock() + defer k8sOrchestratorInstance.pvcToSnapshotsMap.Unlock() + + pvcName := namespacedName{ + namespace: snap.Namespace, + name: *snap.Spec.Source.PersistentVolumeClaimName, + } + if _, ok := k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName]; !ok { + k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName] = make(map[namespacedName]struct{}) + } + snapName := namespacedName{ + namespace: snap.Namespace, + name: snap.Name, + } + k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName][snapName] = struct{}{} + log.With("pvc", pvcName).With("snapshot", snapName).Debug("successfully added the snapshot to the cache") + } + + snapshotDeleted := func(obj any) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("snapshotDeleted: unrecognized object %+v", obj) + return + } + + log.Infof("snapshotDeleted: snapshot=%v", snap) + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshotDeleted: snapshot is not associated with any PVC. Ignoring it...") + return + } + + k8sOrchestratorInstance.pvcToSnapshotsMap.Lock() + defer k8sOrchestratorInstance.pvcToSnapshotsMap.Unlock() + + pvcName := namespacedName{ + namespace: snap.Namespace, + name: *snap.Spec.Source.PersistentVolumeClaimName, + } + if _, ok := k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName]; !ok { + log.Infof("snapshotDeleted: associated PVC %s not found in the map. Ignoring the delete event...", pvcName) + return + } + + snapName := namespacedName{ + namespace: snap.Namespace, + name: snap.Name, + } + delete(k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName], snapName) + log.With("pvc", pvcName).With("snapshot", snapName).Debug("successfully removed the snapshot from the cache") } err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, - func(obj interface{}) { + func(obj any) { snapshotAdded(obj) }, - func(oldObj, newObj interface{}) { + func(oldObj, newObj any) { // TODO: implement log.Info("snapshotUpdated") }, - func(obj interface{}) { - // TODO: implement - log.Info("snapshotDeleted") + func(obj any) { + snapshotDeleted(obj) }) if err != nil { return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) From eddf93b70de9c2582dae5a8272a6fedd8c1b2fce Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Sat, 22 Nov 2025 02:33:07 +0530 Subject: [PATCH 4/8] Refactoring --- .../common/commonco/k8sorchestrator/k8sorchestrator.go | 4 ++-- pkg/kubernetes/types.go | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 2fe990eeff..ad8627bee6 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -399,7 +399,6 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn } } - // Initialize the map for pvc to snapshots err := initPVCToSnapshotsMap(ctx, controllerClusterFlavor) if err != nil { return nil, fmt.Errorf("failed to create PVC to snapshots map. Error: %v", err) @@ -2314,7 +2313,8 @@ func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1 func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { log := logger.GetLogger(ctx) if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload { - // PVC to VolumeSnapshot mapping is only required for WCP. + // PVC to VolumeSnapshot cache is only required for WCP for now. + log.Info("non WCP cluster detected. skipping initialising PVC to Snapshot cache.") return nil } diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go index cce4c08dc9..e8a522e002 100644 --- a/pkg/kubernetes/types.go +++ b/pkg/kubernetes/types.go @@ -51,8 +51,7 @@ type InformerManager struct { client clientset.Interface // main shared informer factory informerFactory informers.SharedInformerFactory - - // snapshotInformerFactory - TODO: update this + snapshotInformerFactory externalversions.SharedInformerFactory // main signal From e838277c16387e95f7a7b5f4be718b674ead990c Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Sat, 22 Nov 2025 17:20:46 +0530 Subject: [PATCH 5/8] Added add, get and remove methods on the pvcToSnapshotsMap Added GetSnapshotsForPVC method on COCommonInterface interface and it's implementors Updated GetSnapshotsForPVC method of K8sOrchestrator to return the snapshots associated with the given PVC --- pkg/common/unittestcommon/utils.go | 5 + pkg/csi/service/common/commonco/coagnostic.go | 2 + .../k8sorchestrator/k8sorchestrator.go | 233 +++++++++++------- pkg/kubernetes/informers.go | 6 +- .../cnscsi_admissionhandler_test.go | 5 + .../cnsregistervolume_controller_test.go | 5 + 6 files changed, 159 insertions(+), 97 deletions(-) diff --git a/pkg/common/unittestcommon/utils.go b/pkg/common/unittestcommon/utils.go index 99c8b7cc19..c5e9a83863 100644 --- a/pkg/common/unittestcommon/utils.go +++ b/pkg/common/unittestcommon/utils.go @@ -516,6 +516,11 @@ func (c *FakeK8SOrchestrator) SetCSINodeTopologyInstances(instances []interface{ c.csiNodeTopologyInstances = instances } +func (c *FakeK8SOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + //TODO implement me + panic("implement me") +} + // configFromVCSim starts a vcsim instance and returns config for use against the // vcsim instance. The vcsim instance is configured with an empty tls.Config. func configFromVCSim(vcsimParams VcsimParams, isTopologyEnv bool) (*config.Config, func()) { diff --git a/pkg/csi/service/common/commonco/coagnostic.go b/pkg/csi/service/common/commonco/coagnostic.go index 4ff1844d70..13e2d7700a 100644 --- a/pkg/csi/service/common/commonco/coagnostic.go +++ b/pkg/csi/service/common/commonco/coagnostic.go @@ -128,6 +128,8 @@ type COCommonInterface interface { // GetPVCNamespacedNameByUID returns the PVC's namespaced name (namespace/name) for the given UID. // If the PVC is not found in the cache, it returns an empty string and false. GetPVCNamespacedNameByUID(uid string) (k8stypes.NamespacedName, bool) + // GetSnapshotsForPVC returns the names of the snapshots for the given PVC + GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string } // GetContainerOrchestratorInterface returns orchestrator object for a given diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index ad8627bee6..bb5ef68b52 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -256,6 +256,71 @@ type pvcToSnapshotsMap struct { items map[namespacedName]map[namespacedName]struct{} } +func (m *pvcToSnapshotsMap) add(pvc, snapshot, namespace string) { + m.Lock() + defer m.Unlock() + + pvcKey := namespacedName{ + namespace: namespace, + name: pvc, + } + if _, ok := m.items[pvcKey]; !ok { + m.items[pvcKey] = make(map[namespacedName]struct{}) + } + snapKey := namespacedName{ + namespace: namespace, + name: snapshot, + } + m.items[pvcKey][snapKey] = struct{}{} +} + +func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string { + m.RLock() + defer m.RUnlock() + + pvcKey := namespacedName{ + namespace: namespace, + name: pvc, + } + snapMap, ok := m.items[pvcKey] + if !ok { + return []string{} + } + + snaps := make([]string, 0, len(snapMap)) + for k := range snapMap { + snaps = append(snaps, k.name) + } + return snaps +} + +func (m *pvcToSnapshotsMap) delete(pvc, snapshot, namespace string) { + m.Lock() + defer m.Unlock() + + pvcKey := namespacedName{ + namespace: namespace, + name: pvc, + } + snapMap, ok := m.items[pvcKey] + if !ok { + return + } + + snapKey := namespacedName{ + namespace: namespace, + name: snapshot, + } + delete(snapMap, snapKey) + if len(snapMap) != 0 { + m.items[pvcKey] = snapMap + return + } + + // delete pvc entry if no snapshots are present + delete(m.items, pvcKey) +} + // K8sOrchestrator defines set of properties specific to K8s. type K8sOrchestrator struct { supervisorFSS FSSConfigMapInfo @@ -1968,6 +2033,74 @@ func nodeRemove(obj interface{}) { k8sOrchestratorInstance.nodeIDToNameMap.remove(nodeMoID) } +func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { + log := logger.GetLogger(ctx) + // TODO: check if we need to check the FSS as well + if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload { + // PVC to VolumeSnapshot cache is only required for WCP for now. + log.Info("non WCP cluster detected. skipping initialising PVC to Snapshot cache.") + return nil + } + + k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ + RWMutex: &sync.RWMutex{}, + items: make(map[namespacedName]map[namespacedName]struct{}), + } + snapshotAdded := func(obj any) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + log.Infof("snapshotAdded: snapshot=%v", snap) + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + k8sOrchestratorInstance.pvcToSnapshotsMap.add( + *snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully added the snapshot to the cache") + } + snapshotDeleted := func(obj any) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + log.Infof("snapshotDeleted: snapshot=%v", snap) + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + k8sOrchestratorInstance.pvcToSnapshotsMap.delete( + *snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully removed the snapshot from the cache") + } + + err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, + func(obj any) { + snapshotAdded(obj) + }, + func(oldObj, newObj any) { + // TODO: implement or remove as we probably don't need to take any action + log.Info("snapshotUpdated") + }, + func(obj any) { + snapshotDeleted(obj) + }) + if err != nil { + return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) + } + + return nil +} + // GetNodeIDtoNameMap returns a map containing the nodeID to node name func (c *K8sOrchestrator) GetNodeIDtoNameMap(ctx context.Context) map[string]string { return c.nodeIDToNameMap.items @@ -2282,6 +2415,11 @@ func (c *K8sOrchestrator) GetPVCNamespacedNameByUID(uid string) (k8stypes.Namesp return nsName, true } +// GetSnapshotsForPVC returns the names of the snapshots for the given PVC +func (c *K8sOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvc, namespace string) []string { + return c.pvcToSnapshotsMap.get(pvc, namespace) +} + // GetPVCDataSource Retrieves the VolumeSnapshot source when a PVC from VolumeSnapshot is being created. func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1.ObjectReference, error) { var dataSource v1.ObjectReference @@ -2309,98 +2447,3 @@ func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1 } return &dataSource, nil } - -func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { - log := logger.GetLogger(ctx) - if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload { - // PVC to VolumeSnapshot cache is only required for WCP for now. - log.Info("non WCP cluster detected. skipping initialising PVC to Snapshot cache.") - return nil - } - - k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ - RWMutex: &sync.RWMutex{}, - items: make(map[namespacedName]map[namespacedName]struct{}), - } - - snapshotAdded := func(obj any) { - snap, ok := obj.(*snapshotv1.VolumeSnapshot) - if !ok || snap == nil { - log.Warnf("snapshotAdded: unrecognized object %+v", obj) - return - } - - log.Infof("snapshotAdded: snapshot=%v", snap) - if snap.Spec.Source.PersistentVolumeClaimName == nil { - log.Warnf("snapshotAdded: snapshot is not associated with any PVC. Ignoring it...") - return - } - - k8sOrchestratorInstance.pvcToSnapshotsMap.Lock() - defer k8sOrchestratorInstance.pvcToSnapshotsMap.Unlock() - - pvcName := namespacedName{ - namespace: snap.Namespace, - name: *snap.Spec.Source.PersistentVolumeClaimName, - } - if _, ok := k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName]; !ok { - k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName] = make(map[namespacedName]struct{}) - } - snapName := namespacedName{ - namespace: snap.Namespace, - name: snap.Name, - } - k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName][snapName] = struct{}{} - log.With("pvc", pvcName).With("snapshot", snapName).Debug("successfully added the snapshot to the cache") - } - - snapshotDeleted := func(obj any) { - snap, ok := obj.(*snapshotv1.VolumeSnapshot) - if !ok || snap == nil { - log.Warnf("snapshotDeleted: unrecognized object %+v", obj) - return - } - - log.Infof("snapshotDeleted: snapshot=%v", snap) - if snap.Spec.Source.PersistentVolumeClaimName == nil { - log.Warnf("snapshotDeleted: snapshot is not associated with any PVC. Ignoring it...") - return - } - - k8sOrchestratorInstance.pvcToSnapshotsMap.Lock() - defer k8sOrchestratorInstance.pvcToSnapshotsMap.Unlock() - - pvcName := namespacedName{ - namespace: snap.Namespace, - name: *snap.Spec.Source.PersistentVolumeClaimName, - } - if _, ok := k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName]; !ok { - log.Infof("snapshotDeleted: associated PVC %s not found in the map. Ignoring the delete event...", pvcName) - return - } - - snapName := namespacedName{ - namespace: snap.Namespace, - name: snap.Name, - } - delete(k8sOrchestratorInstance.pvcToSnapshotsMap.items[pvcName], snapName) - log.With("pvc", pvcName).With("snapshot", snapName).Debug("successfully removed the snapshot from the cache") - } - - err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, - func(obj any) { - snapshotAdded(obj) - }, - func(oldObj, newObj any) { - // TODO: implement - log.Info("snapshotUpdated") - }, - func(obj any) { - snapshotDeleted(obj) - }) - if err != nil { - return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) - } - - return nil -} diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index 3a6032a267..bbd2bfa3d2 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -267,8 +267,10 @@ func (im *InformerManager) AddVolumeAttachmentListener(ctx context.Context, add } // AddSnapshotListener hooks up add, update, delete callbacks. -func (im *InformerManager) AddSnapshotListener(ctx context.Context, add func(obj interface{}), - update func(oldObj, newObj interface{}), remove func(obj interface{})) error { +func (im *InformerManager) AddSnapshotListener(ctx context.Context, + add func(obj interface{}), + update func(oldObj, newObj interface{}), + remove func(obj interface{})) error { log := logger.GetLogger(ctx) if im.snapshotInformer == nil { im.snapshotInformer = im.snapshotInformerFactory. diff --git a/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go b/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go index 7e6b907193..8fe48907bf 100644 --- a/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go +++ b/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go @@ -47,6 +47,11 @@ type MockCOCommonInterface struct { mock.Mock } +func (m *MockCOCommonInterface) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + //TODO implement me + panic("implement me") +} + func (m *MockCOCommonInterface) GetPVCNamespacedNameByUID(uid string) (apitypes.NamespacedName, bool) { //TODO implement me panic("implement me") diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go index 4240944767..937f1ab64a 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go @@ -223,6 +223,11 @@ func (m *mockVolumeManager) SyncVolume(ctx context.Context, type mockCOCommon struct{} +func (m *mockCOCommon) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + //TODO implement me + panic("implement me") +} + func (m *mockCOCommon) GetPVCNamespacedNameByUID(uid string) (types.NamespacedName, bool) { //TODO implement me panic("implement me") From 205905dd8ed1479c155897ed0ea12e74f88a0513 Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Sat, 22 Nov 2025 17:30:47 +0530 Subject: [PATCH 6/8] Removed update event handler as it's not required --- .../common/commonco/k8sorchestrator/k8sorchestrator.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index bb5ef68b52..42293e3a25 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -2053,7 +2053,6 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes return } - log.Infof("snapshotAdded: snapshot=%v", snap) if snap.Spec.Source.PersistentVolumeClaimName == nil { log.Warnf("snapshot is not associated with any PVC. Ignoring it...") return @@ -2071,7 +2070,6 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes return } - log.Infof("snapshotDeleted: snapshot=%v", snap) if snap.Spec.Source.PersistentVolumeClaimName == nil { log.Warnf("snapshot is not associated with any PVC. Ignoring it...") return @@ -2087,10 +2085,7 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes func(obj any) { snapshotAdded(obj) }, - func(oldObj, newObj any) { - // TODO: implement or remove as we probably don't need to take any action - log.Info("snapshotUpdated") - }, + nil, func(obj any) { snapshotDeleted(obj) }) From b565b5b1db955c4b19c5043cae9ce6648597e5fb Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Mon, 24 Nov 2025 13:15:12 +0530 Subject: [PATCH 7/8] Refactor snapshot retrieval to use ContainerOrchestratorUtility Simplify the getSnapshotsForPVC function by delegating snapshot retrieval to the ContainerOrchestratorUtility interface instead of directly using the snapshot client. This improves testability and maintains consistency with the existing architecture. Changes: - Remove direct dependency on external-snapshotter client in util.go - Update getSnapshotsForPVC to use ContainerOrchestratorUtility.GetSnapshotsForPVC - Remove rest.Config parameter from getSnapshotsForPVC function signature - Add comprehensive unit tests for getSnapshotsForPVC covering: * Uninitialized ContainerOrchestratorUtility scenario * PVC with no snapshots * PVC with multiple snapshots - Implement GetSnapshotsForPVC in FakeK8SOrchestrator for testing - Add TODO comment for future refactoring of pvcToSnapshotsMap key type Benefits: - Improved testability through dependency injection - Reduced coupling to external snapshot client - Consistent error handling - Better alignment with existing orchestrator abstraction pattern --- pkg/common/cns-lib/node/nodes.go | 11 ++- pkg/common/unittestcommon/utils.go | 7 +- .../k8sorchestrator/k8sorchestrator.go | 56 ++++++--------- .../featurestates/featurestates.go | 9 ++- pkg/kubernetes/informers.go | 71 ++++++++++--------- pkg/kubernetes/types.go | 2 +- .../controller/cnsunregistervolume/util.go | 36 +++------- .../cnsunregistervolume/util_test.go | 57 +++++++++++++++ pkg/syncer/metadatasyncer.go | 9 ++- pkg/syncer/storagepool/service.go | 10 ++- pkg/syncer/syncer_test.go | 29 ++++---- 11 files changed, 180 insertions(+), 117 deletions(-) create mode 100644 pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go diff --git a/pkg/common/cns-lib/node/nodes.go b/pkg/common/cns-lib/node/nodes.go index 2040b323dc..15cb8bafac 100644 --- a/pkg/common/cns-lib/node/nodes.go +++ b/pkg/common/cns-lib/node/nodes.go @@ -37,14 +37,23 @@ type Nodes struct { // Initialize helps initialize node manager and node informer manager. func (nodes *Nodes) Initialize(ctx context.Context) error { nodes.cnsNodeManager = GetManager(ctx) + k8sclient, err := k8s.NewClient(ctx) if err != nil { log := logger.GetLogger(ctx) log.Errorf("Creating Kubernetes client failed. Err: %v", err) return err } + + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log := logger.GetLogger(ctx) + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + nodes.cnsNodeManager.SetKubernetesClient(k8sclient) - nodes.informMgr = k8s.NewInformer(ctx, k8sclient, true) + nodes.informMgr = k8s.NewInformer(ctx, k8sclient, snapshotterClient) err = nodes.informMgr.AddCSINodeListener(ctx, nodes.csiNodeAdd, nodes.csiNodeUpdate, nodes.csiNodeDelete) if err != nil { diff --git a/pkg/common/unittestcommon/utils.go b/pkg/common/unittestcommon/utils.go index c5e9a83863..85b05c9b48 100644 --- a/pkg/common/unittestcommon/utils.go +++ b/pkg/common/unittestcommon/utils.go @@ -517,8 +517,11 @@ func (c *FakeK8SOrchestrator) SetCSINodeTopologyInstances(instances []interface{ } func (c *FakeK8SOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { - //TODO implement me - panic("implement me") + if strings.Contains(pvcName, "no-snap") { + return []string{} + } + + return []string{"snap1", "snap2", "snap3"} } // configFromVCSim starts a vcsim instance and returns config for use against the diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 42293e3a25..42fc8687c6 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -240,47 +240,35 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) { return volumeName, found } -type namespacedName struct { - namespace, name string -} - -func (n namespacedName) String() string { - return fmt.Sprintf("%s/%s", n.namespace, n.name) -} - // pvcToSnapshotsMap maps a PVC to its snapshots. -// Key is the namespaced name of the PVC and value is a map. -// The key of the inner map is the namespaced name of the snapshot. +// The primary key is the namespaced name of the PVC and value is a map. +// The key of the inner map is the name of the snapshot. type pvcToSnapshotsMap struct { *sync.RWMutex - items map[namespacedName]map[namespacedName]struct{} + items map[k8stypes.NamespacedName]map[string]struct{} } func (m *pvcToSnapshotsMap) add(pvc, snapshot, namespace string) { m.Lock() defer m.Unlock() - pvcKey := namespacedName{ - namespace: namespace, - name: pvc, + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, } if _, ok := m.items[pvcKey]; !ok { - m.items[pvcKey] = make(map[namespacedName]struct{}) - } - snapKey := namespacedName{ - namespace: namespace, - name: snapshot, + m.items[pvcKey] = make(map[string]struct{}) } - m.items[pvcKey][snapKey] = struct{}{} + m.items[pvcKey][snapshot] = struct{}{} } func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string { m.RLock() defer m.RUnlock() - pvcKey := namespacedName{ - namespace: namespace, - name: pvc, + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, } snapMap, ok := m.items[pvcKey] if !ok { @@ -288,8 +276,8 @@ func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string { } snaps := make([]string, 0, len(snapMap)) - for k := range snapMap { - snaps = append(snaps, k.name) + for snap := range snapMap { + snaps = append(snaps, snap) } return snaps } @@ -298,20 +286,16 @@ func (m *pvcToSnapshotsMap) delete(pvc, snapshot, namespace string) { m.Lock() defer m.Unlock() - pvcKey := namespacedName{ - namespace: namespace, - name: pvc, + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, } snapMap, ok := m.items[pvcKey] if !ok { return } - snapKey := namespacedName{ - namespace: namespace, - name: snapshot, - } - delete(snapMap, snapKey) + delete(snapMap, snapshot) if len(snapMap) != 0 { m.items[pvcKey] = snapMap return @@ -402,7 +386,7 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn k8sOrchestratorInstance.clusterFlavor = controllerClusterFlavor k8sOrchestratorInstance.k8sClient = k8sClient k8sOrchestratorInstance.snapshotterClient = snapshotterClient - k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, true) + k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient) coInstanceErr = initFSS(ctx, k8sClient, controllerClusterFlavor, params) if coInstanceErr != nil { log.Errorf("Failed to initialize the orchestrator. Error: %v", coInstanceErr) @@ -2044,7 +2028,7 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ RWMutex: &sync.RWMutex{}, - items: make(map[namespacedName]map[namespacedName]struct{}), + items: make(map[k8stypes.NamespacedName]map[string]struct{}), } snapshotAdded := func(obj any) { snap, ok := obj.(*snapshotv1.VolumeSnapshot) @@ -2085,6 +2069,8 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes func(obj any) { snapshotAdded(obj) }, + // Since the name of PVC associated with a snapshot is immutable, + // update events do not have any impact on the state of the cache. nil, func(obj any) { snapshotDeleted(obj) diff --git a/pkg/internalapis/featurestates/featurestates.go b/pkg/internalapis/featurestates/featurestates.go index 255a7e32d8..3ee02e43f1 100644 --- a/pkg/internalapis/featurestates/featurestates.go +++ b/pkg/internalapis/featurestates/featurestates.go @@ -122,8 +122,15 @@ func StartSvFSSReplicationService(ctx context.Context, svFeatureStatConfigMapNam return err } + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + // Create k8s Informer and watch on configmaps and namespaces. - informer := k8s.NewInformer(ctx, k8sClient, true) + informer := k8s.NewInformer(ctx, k8sClient, snapshotterClient) // Configmap informer to watch on SV featurestate config-map. err = informer.AddConfigMapListener( ctx, diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index bbd2bfa3d2..f8d3d14a47 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -18,10 +18,10 @@ package kubernetes import ( "context" - "fmt" "sync" "time" + snapclientset "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions" "k8s.io/client-go/informers" v1 "k8s.io/client-go/informers/core/v1" @@ -46,8 +46,6 @@ const ( ) var ( - inClusterInformerManagerInstance *InformerManager = nil - inClusterInformerInstanceLock = &sync.Mutex{} supervisorInformerManagerInstance *InformerManager = nil supervisorInformerInstanceLock = &sync.Mutex{} ) @@ -56,48 +54,30 @@ func noResyncPeriodFunc() time.Duration { return 0 } -// NewInformer creates a new K8S client based on a service account. -// NOTE: This function expects caller function to pass appropriate client +// NewInformer creates a new K8S informer manager. +// NOTE: This function expects caller function to pass appropriate clients // as per config to be created Informer for. -// This function creates shared informer factory against the client provided. -func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt bool) *InformerManager { +// This function creates shared informer factories against the clients provided. +func NewInformer(ctx context.Context, + client clientset.Interface, + snapshotClient snapclientset.Interface) *InformerManager { var informerInstance *InformerManager log := logger.GetLogger(ctx) - if inClusterClnt { - inClusterInformerInstanceLock.Lock() - defer inClusterInformerInstanceLock.Unlock() - - informerInstance = inClusterInformerManagerInstance - } else { - supervisorInformerInstanceLock.Lock() - defer supervisorInformerInstanceLock.Unlock() - - informerInstance = supervisorInformerManagerInstance - } - - // TODO: check if callers can pass this - snapClient, err := NewSnapshotterClient(ctx) - if err != nil { - // TODO: handle error appropriately - log.Fatalf("unable to initialise snapshot client") - } + supervisorInformerInstanceLock.Lock() + defer supervisorInformerInstanceLock.Unlock() + informerInstance = supervisorInformerManagerInstance if informerInstance == nil { informerInstance = &InformerManager{ client: client, stopCh: signals.SetupSignalHandler().Done(), informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), - snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapClient, 0), + snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapshotClient, 0), } - if inClusterClnt { - inClusterInformerManagerInstance = informerInstance - log.Info("Created new informer factory for in-cluster client") - } else { - supervisorInformerManagerInstance = informerInstance - log.Info("Created new informer factory for supervisor client") - } + supervisorInformerManagerInstance = informerInstance + log.Info("Created new informer factory for supervisor client") } return informerInstance @@ -322,8 +302,6 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) { go im.snapshotInformerFactory.Start(im.stopCh) cacheSync := im.snapshotInformerFactory.WaitForCacheSync(im.stopCh) - // TODO: remove - fmt.Print(cacheSync) for _, isSynced := range cacheSync { if !isSynced { return @@ -332,3 +310,26 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) { return im.stopCh } + +// NewConfigMapListener creates a new configmap listener in the given namespace. +// NOTE: This creates a NewSharedIndexInformer everytime and does not use the informer factory. +// Only use this function when you need a configmap listener in a different namespace than the +// one already present in the informer factory. +func NewConfigMapListener(ctx context.Context, client clientset.Interface, namespace string, + add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) error { + log := logger.GetLogger(ctx) + configMapInformer := v1.NewFilteredConfigMapInformer(client, namespace, resyncPeriodConfigMapInformer, + cache.Indexers{}, nil) + + _, err := configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) + if err != nil { + return logger.LogNewErrorf(log, "failed to add event handler on configmap listener. Error: %v", err) + } + stopCh := make(chan struct{}) + go configMapInformer.Run(stopCh) + return nil +} diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go index e8a522e002..1e24cd75c3 100644 --- a/pkg/kubernetes/types.go +++ b/pkg/kubernetes/types.go @@ -51,7 +51,7 @@ type InformerManager struct { client clientset.Interface // main shared informer factory informerFactory informers.SharedInformerFactory - + // snapshot informer factory snapshotInformerFactory externalversions.SharedInformerFactory // main signal diff --git a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go index 09180cf4e8..aa7d99190b 100644 --- a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go +++ b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go @@ -21,7 +21,6 @@ import ( "errors" "strings" - snapshotclient "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,7 +110,7 @@ func _getVolumeUsageInfo(ctx context.Context, pvcName string, pvcNamespace strin return &volumeUsageInfo, nil } - volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace, *cfg) + volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace) if err != nil { return nil, err } @@ -222,35 +221,16 @@ func getPodsForPVC(ctx context.Context, pvcName string, pvcNamespace string, } // getSnapshotsForPVC returns a list of snapshots that are created for the specified PVC. -func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string, - cfg rest.Config) ([]string, bool, error) { +func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string) ([]string, bool, error) { log := logger.GetLogger(ctx) - c, err := snapshotclient.NewForConfig(&cfg) - if err != nil { - log.Warnf("Failed to create snapshot client for PVC %q in namespace %q. Error: %q", - pvcName, pvcNamespace, err.Error()) - return nil, false, errors.New("failed to create snapshot client") - } - - // TODO: check if we can use informer cache - list, err := c.SnapshotV1().VolumeSnapshots(pvcNamespace).List(ctx, metav1.ListOptions{}) - if err != nil { - log.Warnf("Failed to list VolumeSnapshots in namespace %q for PVC %q. Error: %q", - pvcNamespace, pvcName, err.Error()) - return nil, false, errors.New("failed to list VolumeSnapshots") - } - - var snapshots []string - for _, snap := range list.Items { - if snap.Spec.Source.PersistentVolumeClaimName == nil || - *snap.Spec.Source.PersistentVolumeClaimName != pvcName { - continue - } - - snapshots = append(snapshots, snap.Name) + if commonco.ContainerOrchestratorUtility == nil { + err := errors.New("ContainerOrchestratorUtility is not initialized") + log.Warn(err) + return nil, false, err } - return snapshots, len(snapshots) > 0, nil + snaps := commonco.ContainerOrchestratorUtility.GetSnapshotsForPVC(ctx, pvcName, pvcNamespace) + return snaps, len(snaps) > 0, nil } // getGuestClustersForPVC returns a list of guest clusters that are using the specified PVC. diff --git a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go new file mode 100644 index 0000000000..5582c58c75 --- /dev/null +++ b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go @@ -0,0 +1,57 @@ +package cnsunregistervolume + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/unittestcommon" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco" +) + +func TestGetSnapshotsForPVC(t *testing.T) { + containerOrchestratorUtilityOriginal := commonco.ContainerOrchestratorUtility + defer func() { + commonco.ContainerOrchestratorUtility = containerOrchestratorUtilityOriginal + }() + + t.Run("WhenContainerOrchestratorIsNotInitialised", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = nil + expErr := errors.New("ContainerOrchestratorUtility is not initialized") + + // Execute + _, _, err := getSnapshotsForPVC(context.Background(), "", "") + + // Assert + assert.Equal(tt, expErr, err) + }) + + t.Run("WhenPVCHasNoSnapshots", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = &unittestcommon.FakeK8SOrchestrator{} + + // Execute + snaps, inUse, err := getSnapshotsForPVC(context.Background(), "no-snaps", "") + + // Assert + assert.NoError(tt, err) + assert.False(tt, inUse) + assert.Empty(tt, snaps) + }) + + t.Run("WhenPVCHasSnapshots", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = &unittestcommon.FakeK8SOrchestrator{} + expSnaps := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps, inUse, err := getSnapshotsForPVC(context.Background(), "with-snaps", "") + + // Assert + assert.NoError(tt, err) + assert.True(tt, inUse) + assert.Equal(tt, expSnaps, snaps) + }) +} diff --git a/pkg/syncer/metadatasyncer.go b/pkg/syncer/metadatasyncer.go index 21f300db67..ac687b31d6 100644 --- a/pkg/syncer/metadatasyncer.go +++ b/pkg/syncer/metadatasyncer.go @@ -248,6 +248,13 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl return err } + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + // Initialize the k8s orchestrator interface. metadataSyncer.coCommonInterface, err = commonco.GetContainerOrchestratorInterface(ctx, common.Kubernetes, clusterFlavor, COInitParams) @@ -633,7 +640,7 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl } // Set up kubernetes resource listeners for metadata syncer. - metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sClient, true) + metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient) err = metadataSyncer.k8sInformerManager.AddPVCListener( ctx, nil, // Add. diff --git a/pkg/syncer/storagepool/service.go b/pkg/syncer/storagepool/service.go index 49ec945acf..bc08c96b0c 100644 --- a/pkg/syncer/storagepool/service.go +++ b/pkg/syncer/storagepool/service.go @@ -120,7 +120,15 @@ func InitStoragePoolService(ctx context.Context, log.Errorf("Creating Kubernetes client failed. Err: %v", err) return } - k8sInformerManager := k8s.NewInformer(ctx, k8sClient, true) + + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return + } + + k8sInformerManager := k8s.NewInformer(ctx, k8sClient, snapshotterClient) err = InitNodeAnnotationListener(ctx, k8sInformerManager, scWatchCntlr, spController) if err != nil { log.Errorf("InitNodeAnnotationListener failed. err: %v", err) diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index d052b2d06d..6865cc1986 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -26,6 +26,8 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/google/uuid" + "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" + snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake" cnstypes "github.com/vmware/govmomi/cns/types" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/vim25/mo" @@ -69,17 +71,18 @@ const ( ) var ( - csiConfig *cnsconfig.Config - ctx context.Context - cnsVCenterConfig *cnsvsphere.VirtualCenterConfig - err error - virtualCenter *cnsvsphere.VirtualCenter - metadataSyncer *metadataSyncInformer - k8sclient clientset.Interface - dc []*cnsvsphere.Datacenter - volumeManager cnsvolumes.Manager - dsList []vimtypes.ManagedObjectReference - cancel context.CancelFunc + csiConfig *cnsconfig.Config + ctx context.Context + cnsVCenterConfig *cnsvsphere.VirtualCenterConfig + err error + virtualCenter *cnsvsphere.VirtualCenter + metadataSyncer *metadataSyncInformer + k8sclient clientset.Interface + snapshotterClient versioned.Interface + dc []*cnsvsphere.Datacenter + volumeManager cnsvolumes.Manager + dsList []vimtypes.ManagedObjectReference + cancel context.CancelFunc ) func TestSyncerWorkflows(t *testing.T) { @@ -169,7 +172,9 @@ func TestSyncerWorkflows(t *testing.T) { // Here we should use a faked client to avoid test inteference with running // metadata syncer pod in real Kubernetes cluster. k8sclient = testclient.NewSimpleClientset() - metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sclient, true) + // Create a fake snapshot client for testing. + snapshotterClient = snapshotfake.NewSimpleClientset() + metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sclient, snapshotterClient) metadataSyncer.k8sInformerManager.GetPodLister() metadataSyncer.pvLister = metadataSyncer.k8sInformerManager.GetPVLister() metadataSyncer.pvcLister = metadataSyncer.k8sInformerManager.GetPVCLister() From a3aefcd4851d32ef92ec87d649b724ebf98f50e1 Mon Sep 17 00:00:00 2001 From: Satyanarayana Kolluri Date: Wed, 26 Nov 2025 14:01:54 +0530 Subject: [PATCH 8/8] Removed pointer reference to RWMutext of pvcToSnapshotsMap Extracted the snapshot event handlers to improve the testability Added unit tests for K8s and Informers --- .../k8sorchestrator/k8sorchestrator.go | 80 +-- .../k8sorchestrator/k8sorchestrator_test.go | 497 +++++++++++++++++- pkg/kubernetes/informers.go | 18 +- pkg/kubernetes/informers_test.go | 246 +++++++++ 4 files changed, 791 insertions(+), 50 deletions(-) create mode 100644 pkg/kubernetes/informers_test.go diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 42fc8687c6..1b11383da2 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -244,7 +244,7 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) { // The primary key is the namespaced name of the PVC and value is a map. // The key of the inner map is the name of the snapshot. type pvcToSnapshotsMap struct { - *sync.RWMutex + sync.RWMutex items map[k8stypes.NamespacedName]map[string]struct{} } @@ -2017,6 +2017,44 @@ func nodeRemove(obj interface{}) { k8sOrchestratorInstance.nodeIDToNameMap.remove(nodeMoID) } +// handleSnapshotAdded handles the snapshot add event by adding it to the pvcToSnapshotsMap cache. +func handleSnapshotAdded(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) { + log := logger.GetLogger(ctx) + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + pvcMap.add(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully added the snapshot to the cache") +} + +// handleSnapshotDeleted handles the snapshot delete event by removing it from the pvcToSnapshotsMap cache. +func handleSnapshotDeleted(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) { + log := logger.GetLogger(ctx) + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + pvcMap.delete(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully removed the snapshot from the cache") +} + func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { log := logger.GetLogger(ctx) // TODO: check if we need to check the FSS as well @@ -2027,53 +2065,19 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes } k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ - RWMutex: &sync.RWMutex{}, + RWMutex: sync.RWMutex{}, items: make(map[k8stypes.NamespacedName]map[string]struct{}), } - snapshotAdded := func(obj any) { - snap, ok := obj.(*snapshotv1.VolumeSnapshot) - if !ok || snap == nil { - log.Warnf("unrecognized object %+v", obj) - return - } - - if snap.Spec.Source.PersistentVolumeClaimName == nil { - log.Warnf("snapshot is not associated with any PVC. Ignoring it...") - return - } - - k8sOrchestratorInstance.pvcToSnapshotsMap.add( - *snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) - log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). - With("namespace", snap.Namespace).Debug("successfully added the snapshot to the cache") - } - snapshotDeleted := func(obj any) { - snap, ok := obj.(*snapshotv1.VolumeSnapshot) - if !ok || snap == nil { - log.Warnf("unrecognized object %+v", obj) - return - } - - if snap.Spec.Source.PersistentVolumeClaimName == nil { - log.Warnf("snapshot is not associated with any PVC. Ignoring it...") - return - } - - k8sOrchestratorInstance.pvcToSnapshotsMap.delete( - *snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) - log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). - With("namespace", snap.Namespace).Debug("successfully removed the snapshot from the cache") - } err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, func(obj any) { - snapshotAdded(obj) + handleSnapshotAdded(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap) }, // Since the name of PVC associated with a snapshot is immutable, // update events do not have any impact on the state of the cache. nil, func(obj any) { - snapshotDeleted(obj) + handleSnapshotDeleted(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap) }) if err != nil { return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go index 55233a1fdc..09cc739bc8 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go @@ -23,13 +23,14 @@ import ( "sync" "testing" + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + "github.com/stretchr/testify/assert" cnstypes "github.com/vmware/govmomi/cns/types" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client/fake" wcpcapv1alph1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/wcpcapabilities/v1alpha1" cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config" @@ -368,3 +369,493 @@ func TestSetWcpCapabilitiesMap_Success(t *testing.T) { val, _ = WcpCapabilitiesMap.Load("CapabilityB") assert.Equal(t, false, val) } + +func TestK8sOrchestrator_GetSnapshotsForPVC(t *testing.T) { + t.Run("WhenNoSnapshotsExistForPVC", func(tt *testing.T) { + // Setup + orch := K8sOrchestrator{} + + // Execute + snaps := orch.GetSnapshotsForPVC(context.Background(), "", "") + + // Assert + assert.Empty(tt, snaps) + }) + t.Run("WhenSnapshotsExist", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + orch := K8sOrchestrator{ + pvcToSnapshotsMap: pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + }, + } + exp := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps := orch.GetSnapshotsForPVC(context.Background(), pvc, ns) + + // Assert + assert.ElementsMatch(tt, exp, snaps) + }) +} + +func TestPvcToSnapshotsMap_Add(t *testing.T) { + t.Run("AddSnapshotToNewPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns, snap := "test-pvc", "test-ns", "snap1" + + // Execute + pvcMap.add(pvc, snap, ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, snap) + }) + + t.Run("AddMultipleSnapshotsToSamePVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns := "test-pvc", "test-ns" + snaps := []string{"snap1", "snap2", "snap3"} + + // Execute + for _, snap := range snaps { + pvcMap.add(pvc, snap, ns) + } + + // Assert + result := pvcMap.get(pvc, ns) + assert.Len(tt, result, 3) + for _, snap := range snaps { + assert.Contains(tt, result, snap) + } + }) + + t.Run("AddSameSnapshotTwice", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns, snap := "test-pvc", "test-ns", "snap1" + + // Execute + pvcMap.add(pvc, snap, ns) + pvcMap.add(pvc, snap, ns) // Add same snapshot again + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1, "duplicate snapshot should not be added") + assert.Contains(tt, snaps, snap) + }) +} + +func TestPvcToSnapshotsMap_Get(t *testing.T) { + t.Run("GetNonExistentPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "pvc1"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + snaps := pvcMap.get("non-existent-pvc", "test-ns") + + // Assert + assert.Empty(tt, snaps) + }) + + t.Run("GetExistingPVCWithSnapshots", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + } + exp := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps := pvcMap.get(pvc, ns) + + // Assert + assert.ElementsMatch(tt, exp, snaps) + }) +} + +func TestPvcToSnapshotsMap_Delete(t *testing.T) { + t.Run("DeleteSnapshotFromPVC", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap2") + assert.NotContains(tt, snaps, "snap1") + }) + + t.Run("DeleteLastSnapshotRemovesPVCEntry", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcKey := types.NamespacedName{Namespace: ns, Name: pvc} + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + pvcKey: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Empty(tt, snaps) + // Verify PVC entry is removed from map + pvcMap.RLock() + _, exists := pvcMap.items[pvcKey] + pvcMap.RUnlock() + assert.False(tt, exists, "PVC entry should be removed when last snapshot is deleted") + }) + + t.Run("DeleteNonExistentSnapshot", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "non-existent-snap", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap1") + }) + + t.Run("DeleteFromNonExistentPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + pvcMap.delete("non-existent-pvc", "snap1", "test-ns") + + // Assert + assert.Empty(tt, pvcMap.items) + }) + + t.Run("DeleteMultipleSnapshotsSequentially", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + pvcMap.delete(pvc, "snap2", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap3") + }) +} + +func TestInitPVCToSnapshotsMap(t *testing.T) { + t.Run("SkipForNonWorkloadCluster", func(tt *testing.T) { + // Setup + ctx := context.Background() + + // Execute + err := initPVCToSnapshotsMap(ctx, cnstypes.CnsClusterFlavorVanilla) + + // Assert + assert.NoError(tt, err) + }) + + // since `k8sOrchestratorInstance.informerManager` is a struct type, testing the behaviour of + // AddSnapshotListener is not ideal. Since TestPVCToSnapshotsMapEventHandlers tests + // the event handlers, we're good for now. + // TODO: Add tests to verify actual informer logic +} + +// Test the snapshot event handlers directly +func TestPVCToSnapshotsMapEventHandlers(t *testing.T) { + // Helper to create a VolumeSnapshot object + createSnapshot := func(name, namespace, pvcName string) *snapshotv1.VolumeSnapshot { + return &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: &pvcName, + }, + }, + } + } + + t.Run("HandleSnapshotAdded", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns, snapName := "test-pvc", "test-ns", "test-snap" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + snap := createSnapshot(snapName, ns, pvc) + handleSnapshotAdded(ctx, snap, &pvcMap) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, snapName) + }) + + t.Run("HandleSnapshotDeleted", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns, snapName := "test-pvc", "test-ns", "test-snap" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + snapName: struct{}{}, + }, + }, + } + + // Execute + snap := createSnapshot(snapName, ns, pvc) + handleSnapshotDeleted(ctx, snap, &pvcMap) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Empty(tt, snaps) + }) + + t.Run("HandleSnapshotAddedWithNilPVCName", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + snap := &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-snap", + Namespace: "test-ns", + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: nil, + }, + }, + } + handleSnapshotAdded(ctx, snap, &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("HandleSnapshotAddedWithInvalidObject", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + handleSnapshotAdded(ctx, "not-a-snapshot-object", &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("HandleSnapshotAddedWithNilSnapshot", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + var nilSnap *snapshotv1.VolumeSnapshot + handleSnapshotAdded(ctx, nilSnap, &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("MultipleSnapshotsAddedAndDeleted", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns := "test-pvc", "test-ns" + snap1, snap2, snap3 := "snap1", "snap2", "snap3" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute - Add multiple snapshots + handleSnapshotAdded(ctx, createSnapshot(snap1, ns, pvc), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot(snap2, ns, pvc), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot(snap3, ns, pvc), &pvcMap) + + // Assert all added + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 3) + + // Execute - Delete one snapshot + handleSnapshotDeleted(ctx, createSnapshot(snap2, ns, pvc), &pvcMap) + + // Assert only 2 remain + snaps = pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 2) + assert.Contains(tt, snaps, snap1) + assert.Contains(tt, snaps, snap3) + assert.NotContains(tt, snaps, snap2) + }) + + t.Run("HandleSnapshotDeletedWithNilPVCName", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "test-pvc"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + snap := &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-snap", + Namespace: "test-ns", + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: nil, + }, + }, + } + handleSnapshotDeleted(ctx, snap, &pvcMap) + + // Assert - map should remain unchanged + assert.Len(tt, pvcMap.items, 1) + }) + + t.Run("HandleSnapshotDeletedWithInvalidObject", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "test-pvc"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + handleSnapshotDeleted(ctx, "not-a-snapshot-object", &pvcMap) + + // Assert - map should remain unchanged + assert.Len(tt, pvcMap.items, 1) + }) + + t.Run("AddAndDeleteAcrossMultiplePVCsAndNamespaces", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute - Add snapshots for different PVCs and namespaces + handleSnapshotAdded(ctx, createSnapshot("snap1", "ns1", "pvc1"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap2", "ns1", "pvc1"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap3", "ns1", "pvc2"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap4", "ns2", "pvc1"), &pvcMap) + + // Assert all added correctly + assert.Len(tt, pvcMap.get("pvc1", "ns1"), 2) + assert.Len(tt, pvcMap.get("pvc2", "ns1"), 1) + assert.Len(tt, pvcMap.get("pvc1", "ns2"), 1) + + // Execute - Delete snapshots + handleSnapshotDeleted(ctx, createSnapshot("snap1", "ns1", "pvc1"), &pvcMap) + handleSnapshotDeleted(ctx, createSnapshot("snap3", "ns1", "pvc2"), &pvcMap) + + // Assert correct deletions + assert.Len(tt, pvcMap.get("pvc1", "ns1"), 1) + assert.Contains(tt, pvcMap.get("pvc1", "ns1"), "snap2") + assert.Empty(tt, pvcMap.get("pvc2", "ns1")) + assert.Len(tt, pvcMap.get("pvc1", "ns2"), 1) + }) +} diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index f8d3d14a47..b273f3004d 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -46,8 +46,8 @@ const ( ) var ( - supervisorInformerManagerInstance *InformerManager = nil - supervisorInformerInstanceLock = &sync.Mutex{} + informerManagerInstance *InformerManager = nil + informerInstanceLock = &sync.Mutex{} ) func noResyncPeriodFunc() time.Duration { @@ -64,9 +64,9 @@ func NewInformer(ctx context.Context, var informerInstance *InformerManager log := logger.GetLogger(ctx) - supervisorInformerInstanceLock.Lock() - defer supervisorInformerInstanceLock.Unlock() - informerInstance = supervisorInformerManagerInstance + informerInstanceLock.Lock() + defer informerInstanceLock.Unlock() + informerInstance = informerManagerInstance if informerInstance == nil { informerInstance = &InformerManager{ @@ -76,7 +76,7 @@ func NewInformer(ctx context.Context, snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapshotClient, 0), } - supervisorInformerManagerInstance = informerInstance + informerManagerInstance = informerInstance log.Info("Created new informer factory for supervisor client") } @@ -248,9 +248,9 @@ func (im *InformerManager) AddVolumeAttachmentListener(ctx context.Context, add // AddSnapshotListener hooks up add, update, delete callbacks. func (im *InformerManager) AddSnapshotListener(ctx context.Context, - add func(obj interface{}), - update func(oldObj, newObj interface{}), - remove func(obj interface{})) error { + add func(obj any), + update func(oldObj, newObj any), + remove func(obj any)) error { log := logger.GetLogger(ctx) if im.snapshotInformer == nil { im.snapshotInformer = im.snapshotInformerFactory. diff --git a/pkg/kubernetes/informers_test.go b/pkg/kubernetes/informers_test.go new file mode 100644 index 0000000000..0d70dad3ea --- /dev/null +++ b/pkg/kubernetes/informers_test.go @@ -0,0 +1,246 @@ +package kubernetes + +import ( + "context" + "sync" + "testing" + + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func TestNewInformer(t *testing.T) { + t.Run("CreateNewInformerManager", func(tt *testing.T) { + // Setup - Reset global state + informerInstanceLock.Lock() + informerManagerInstance = nil + informerInstanceLock.Unlock() + + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Assert + assert.NotNil(tt, informerMgr) + assert.NotNil(tt, informerMgr.client) + assert.NotNil(tt, informerMgr.informerFactory) + assert.NotNil(tt, informerMgr.snapshotInformerFactory) + assert.NotNil(tt, informerMgr.stopCh) + assert.Equal(tt, k8sClient, informerMgr.client) + }) + + t.Run("ReturnExistingInformerManagerOnSecondCall", func(tt *testing.T) { + // Note: We cannot reset the global state here because signals.SetupSignalHandler() + // can only be called once per process. This test verifies the singleton behavior + // by calling NewInformer twice without resetting. + + ctx := context.Background() + k8sClient1 := testclient.NewSimpleClientset() + snapshotClient1 := snapshotfake.NewSimpleClientset() + + // Execute - First call (may reuse existing instance from previous test) + informerMgr1 := NewInformer(ctx, k8sClient1, snapshotClient1) + + // Execute - Second call with different clients + k8sClient2 := testclient.NewSimpleClientset() + snapshotClient2 := snapshotfake.NewSimpleClientset() + informerMgr2 := NewInformer(ctx, k8sClient2, snapshotClient2) + + // Assert - Should return the same instance (singleton pattern) + assert.Equal(tt, informerMgr1, informerMgr2, "Should return the same singleton instance") + }) + + t.Run("ThreadSafeInitialization", func(tt *testing.T) { + // Note: We cannot reset the global state because signals.SetupSignalHandler() + // can only be called once. This test verifies thread-safety by calling + // NewInformer concurrently and ensuring all calls return the same instance. + + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute - Call NewInformer concurrently + const numGoroutines = 10 + results := make([]*InformerManager, numGoroutines) + done := make(chan bool, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(index int) { + results[index] = NewInformer(ctx, k8sClient, snapshotClient) + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Assert - All should return the same instance + firstInstance := results[0] + assert.NotNil(tt, firstInstance) + for i := 1; i < numGoroutines; i++ { + assert.Equal(tt, firstInstance, results[i], + "All concurrent calls should return the same instance") + } + }) + + t.Run("VerifyInformerManagerFields", func(tt *testing.T) { + // This test verifies that the informer manager has all expected fields set + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Assert - Verify all fields are properly initialized + assert.NotNil(tt, informerMgr) + assert.NotNil(tt, informerMgr.client, "client should be set") + assert.NotNil(tt, informerMgr.informerFactory, "informerFactory should be set") + assert.NotNil(tt, informerMgr.snapshotInformerFactory, "snapshotInformerFactory should be set") + assert.NotNil(tt, informerMgr.stopCh, "stopCh should be set") + + // Verify that the informer factories are usable + assert.NotNil(tt, informerMgr.informerFactory.Core(), "Core informer factory should be accessible") + assert.NotNil(tt, informerMgr.snapshotInformerFactory.Snapshot(), + "Snapshot informer factory should be accessible") + }) +} + +func TestInformerManager_AddSnapshotListener(t *testing.T) { + // Helper to create a VolumeSnapshot object + createSnapshot := func(name, namespace, pvcName string) *snapshotv1.VolumeSnapshot { + return &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: &pvcName, + }, + }, + } + } + + t.Run("AddSnapshotListenerWithNilHandlers", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Execute - Pass nil handlers (should be allowed) + err := informerMgr.AddSnapshotListener(ctx, nil, nil, nil) + + // Assert + assert.NoError(tt, err) + assert.NotNil(tt, informerMgr.snapshotInformer, "snapshotInformer should be initialized") + }) + + t.Run("HandlersIgnoreInvalidObjectTypes", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + processedCount := 0 + addFunc := func(obj any) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + processedCount++ + } + } + + // Execute + err := informerMgr.AddSnapshotListener(ctx, addFunc, nil, nil) + assert.NoError(tt, err) + + // Simulate events with invalid types + addFunc("not-a-snapshot") + addFunc(123) + addFunc(nil) + + // Assert - Invalid objects should be ignored + assert.Equal(tt, 0, processedCount, "Handler should ignore invalid object types") + + // Now send a valid snapshot + snap := createSnapshot("snap1", "default", "pvc1") + addFunc(snap) + assert.Equal(tt, 1, processedCount, "Handler should process valid snapshot") + }) + + t.Run("EventHandlersTrackSnapshots", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Track events with dummy maps + addedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + updatedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + deletedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + var mu sync.Mutex + + addFunc := func(obj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + addedSnapshots[snap.Name] = snap + } + } + + updateFunc := func(oldObj, newObj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := newObj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + updatedSnapshots[snap.Name] = snap + } + } + + deleteFunc := func(obj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + deletedSnapshots[snap.Name] = snap + } + } + + // Execute + err := informerMgr.AddSnapshotListener(ctx, addFunc, updateFunc, deleteFunc) + + // Assert + assert.NoError(tt, err) + assert.NotNil(tt, informerMgr.snapshotInformer) + + // Verify handlers are properly set up by simulating events + snap1 := createSnapshot("snap1", "default", "pvc1") + addFunc(snap1) + mu.Lock() + assert.Contains(tt, addedSnapshots, "snap1", "Add handler should track snapshot") + mu.Unlock() + + snap2 := createSnapshot("snap2", "default", "pvc2") + updateFunc(nil, snap2) + mu.Lock() + assert.Contains(tt, updatedSnapshots, "snap2", "Update handler should track snapshot") + mu.Unlock() + + deleteFunc(snap1) + mu.Lock() + assert.Contains(tt, deletedSnapshots, "snap1", "Delete handler should track snapshot") + mu.Unlock() + }) +}