diff --git a/go.mod b/go.mod index b8cb9c48cd..5fd4d7e41e 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-version v1.6.0 github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 - github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 + github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 github.com/onsi/ginkgo/v2 v2.22.0 github.com/onsi/gomega v1.36.0 github.com/pkg/sftp v1.13.6 diff --git a/go.sum b/go.sum index 18e3eeb7bc..c0d7826e44 100644 --- a/go.sum +++ b/go.sum @@ -178,8 +178,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 h1:tVPvlL5N5X598hrO3g9rhyoi6h0LP4RpSJlGHItsbEE= github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1/go.mod h1:pacx+PW7lLlu6kAvpr8Lgq/5fdiAsKxOtXXFHMaLMb8= -github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 h1:Q3jQ1NkFqv5o+F8dMmHd8SfEmlcwNeo1immFApntEwE= -github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 h1:bMqrb3UHgHbP+PW9VwiejfDJU1R0PpXVZNMdeH8WYKI= +github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= diff --git a/pkg/common/cns-lib/node/nodes.go b/pkg/common/cns-lib/node/nodes.go index 2040b323dc..15cb8bafac 100644 --- a/pkg/common/cns-lib/node/nodes.go +++ b/pkg/common/cns-lib/node/nodes.go @@ -37,14 +37,23 @@ type Nodes struct { // Initialize helps initialize node manager and node informer manager. func (nodes *Nodes) Initialize(ctx context.Context) error { nodes.cnsNodeManager = GetManager(ctx) + k8sclient, err := k8s.NewClient(ctx) if err != nil { log := logger.GetLogger(ctx) log.Errorf("Creating Kubernetes client failed. Err: %v", err) return err } + + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log := logger.GetLogger(ctx) + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + nodes.cnsNodeManager.SetKubernetesClient(k8sclient) - nodes.informMgr = k8s.NewInformer(ctx, k8sclient, true) + nodes.informMgr = k8s.NewInformer(ctx, k8sclient, snapshotterClient) err = nodes.informMgr.AddCSINodeListener(ctx, nodes.csiNodeAdd, nodes.csiNodeUpdate, nodes.csiNodeDelete) if err != nil { diff --git a/pkg/common/unittestcommon/utils.go b/pkg/common/unittestcommon/utils.go index 99c8b7cc19..85b05c9b48 100644 --- a/pkg/common/unittestcommon/utils.go +++ b/pkg/common/unittestcommon/utils.go @@ -516,6 +516,14 @@ func (c *FakeK8SOrchestrator) SetCSINodeTopologyInstances(instances []interface{ c.csiNodeTopologyInstances = instances } +func (c *FakeK8SOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + if strings.Contains(pvcName, "no-snap") { + return []string{} + } + + return []string{"snap1", "snap2", "snap3"} +} + // configFromVCSim starts a vcsim instance and returns config for use against the // vcsim instance. The vcsim instance is configured with an empty tls.Config. func configFromVCSim(vcsimParams VcsimParams, isTopologyEnv bool) (*config.Config, func()) { diff --git a/pkg/csi/service/common/commonco/coagnostic.go b/pkg/csi/service/common/commonco/coagnostic.go index 4ff1844d70..13e2d7700a 100644 --- a/pkg/csi/service/common/commonco/coagnostic.go +++ b/pkg/csi/service/common/commonco/coagnostic.go @@ -128,6 +128,8 @@ type COCommonInterface interface { // GetPVCNamespacedNameByUID returns the PVC's namespaced name (namespace/name) for the given UID. // If the PVC is not found in the cache, it returns an empty string and false. GetPVCNamespacedNameByUID(uid string) (k8stypes.NamespacedName, bool) + // GetSnapshotsForPVC returns the names of the snapshots for the given PVC + GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string } // GetContainerOrchestratorInterface returns orchestrator object for a given diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go index 752758eb41..1b11383da2 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go @@ -29,6 +29,7 @@ import ( "sync/atomic" "time" + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" "google.golang.org/grpc/codes" "k8s.io/client-go/util/retry" @@ -239,6 +240,71 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) { return volumeName, found } +// pvcToSnapshotsMap maps a PVC to its snapshots. +// The primary key is the namespaced name of the PVC and value is a map. +// The key of the inner map is the name of the snapshot. +type pvcToSnapshotsMap struct { + sync.RWMutex + items map[k8stypes.NamespacedName]map[string]struct{} +} + +func (m *pvcToSnapshotsMap) add(pvc, snapshot, namespace string) { + m.Lock() + defer m.Unlock() + + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, + } + if _, ok := m.items[pvcKey]; !ok { + m.items[pvcKey] = make(map[string]struct{}) + } + m.items[pvcKey][snapshot] = struct{}{} +} + +func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string { + m.RLock() + defer m.RUnlock() + + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, + } + snapMap, ok := m.items[pvcKey] + if !ok { + return []string{} + } + + snaps := make([]string, 0, len(snapMap)) + for snap := range snapMap { + snaps = append(snaps, snap) + } + return snaps +} + +func (m *pvcToSnapshotsMap) delete(pvc, snapshot, namespace string) { + m.Lock() + defer m.Unlock() + + pvcKey := k8stypes.NamespacedName{ + Namespace: namespace, + Name: pvc, + } + snapMap, ok := m.items[pvcKey] + if !ok { + return + } + + delete(snapMap, snapshot) + if len(snapMap) != 0 { + m.items[pvcKey] = snapMap + return + } + + // delete pvc entry if no snapshots are present + delete(m.items, pvcKey) +} + // K8sOrchestrator defines set of properties specific to K8s. type K8sOrchestrator struct { supervisorFSS FSSConfigMapInfo @@ -251,6 +317,7 @@ type K8sOrchestrator struct { nodeIDToNameMap *nodeIDToNameMap volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled + pvcToSnapshotsMap pvcToSnapshotsMap k8sClient clientset.Interface snapshotterClient snapshotterClientSet.Interface // pvcUIDCache maps PVC UID to its namespaced name (namespace/name). @@ -319,7 +386,7 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn k8sOrchestratorInstance.clusterFlavor = controllerClusterFlavor k8sOrchestratorInstance.k8sClient = k8sClient k8sOrchestratorInstance.snapshotterClient = snapshotterClient - k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, true) + k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient) coInstanceErr = initFSS(ctx, k8sClient, controllerClusterFlavor, params) if coInstanceErr != nil { log.Errorf("Failed to initialize the orchestrator. Error: %v", coInstanceErr) @@ -381,6 +448,11 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn } } + err := initPVCToSnapshotsMap(ctx, controllerClusterFlavor) + if err != nil { + return nil, fmt.Errorf("failed to create PVC to snapshots map. Error: %v", err) + } + k8sOrchestratorInstance.informerManager.Listen() atomic.StoreUint32(&k8sOrchestratorInstanceInitialized, 1) log.Info("k8sOrchestratorInstance initialized") @@ -1945,6 +2017,75 @@ func nodeRemove(obj interface{}) { k8sOrchestratorInstance.nodeIDToNameMap.remove(nodeMoID) } +// handleSnapshotAdded handles the snapshot add event by adding it to the pvcToSnapshotsMap cache. +func handleSnapshotAdded(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) { + log := logger.GetLogger(ctx) + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + pvcMap.add(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully added the snapshot to the cache") +} + +// handleSnapshotDeleted handles the snapshot delete event by removing it from the pvcToSnapshotsMap cache. +func handleSnapshotDeleted(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) { + log := logger.GetLogger(ctx) + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if !ok || snap == nil { + log.Warnf("unrecognized object %+v", obj) + return + } + + if snap.Spec.Source.PersistentVolumeClaimName == nil { + log.Warnf("snapshot is not associated with any PVC. Ignoring it...") + return + } + + pvcMap.delete(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace) + log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name). + With("namespace", snap.Namespace).Debug("successfully removed the snapshot from the cache") +} + +func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error { + log := logger.GetLogger(ctx) + // TODO: check if we need to check the FSS as well + if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload { + // PVC to VolumeSnapshot cache is only required for WCP for now. + log.Info("non WCP cluster detected. skipping initialising PVC to Snapshot cache.") + return nil + } + + k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[k8stypes.NamespacedName]map[string]struct{}), + } + + err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx, + func(obj any) { + handleSnapshotAdded(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap) + }, + // Since the name of PVC associated with a snapshot is immutable, + // update events do not have any impact on the state of the cache. + nil, + func(obj any) { + handleSnapshotDeleted(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap) + }) + if err != nil { + return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err) + } + + return nil +} + // GetNodeIDtoNameMap returns a map containing the nodeID to node name func (c *K8sOrchestrator) GetNodeIDtoNameMap(ctx context.Context) map[string]string { return c.nodeIDToNameMap.items @@ -2259,6 +2400,11 @@ func (c *K8sOrchestrator) GetPVCNamespacedNameByUID(uid string) (k8stypes.Namesp return nsName, true } +// GetSnapshotsForPVC returns the names of the snapshots for the given PVC +func (c *K8sOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvc, namespace string) []string { + return c.pvcToSnapshotsMap.get(pvc, namespace) +} + // GetPVCDataSource Retrieves the VolumeSnapshot source when a PVC from VolumeSnapshot is being created. func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1.ObjectReference, error) { var dataSource v1.ObjectReference diff --git a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go index 55233a1fdc..09cc739bc8 100644 --- a/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go +++ b/pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator_test.go @@ -23,13 +23,14 @@ import ( "sync" "testing" + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + "github.com/stretchr/testify/assert" cnstypes "github.com/vmware/govmomi/cns/types" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - - "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client/fake" wcpcapv1alph1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/wcpcapabilities/v1alpha1" cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config" @@ -368,3 +369,493 @@ func TestSetWcpCapabilitiesMap_Success(t *testing.T) { val, _ = WcpCapabilitiesMap.Load("CapabilityB") assert.Equal(t, false, val) } + +func TestK8sOrchestrator_GetSnapshotsForPVC(t *testing.T) { + t.Run("WhenNoSnapshotsExistForPVC", func(tt *testing.T) { + // Setup + orch := K8sOrchestrator{} + + // Execute + snaps := orch.GetSnapshotsForPVC(context.Background(), "", "") + + // Assert + assert.Empty(tt, snaps) + }) + t.Run("WhenSnapshotsExist", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + orch := K8sOrchestrator{ + pvcToSnapshotsMap: pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + }, + } + exp := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps := orch.GetSnapshotsForPVC(context.Background(), pvc, ns) + + // Assert + assert.ElementsMatch(tt, exp, snaps) + }) +} + +func TestPvcToSnapshotsMap_Add(t *testing.T) { + t.Run("AddSnapshotToNewPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns, snap := "test-pvc", "test-ns", "snap1" + + // Execute + pvcMap.add(pvc, snap, ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, snap) + }) + + t.Run("AddMultipleSnapshotsToSamePVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns := "test-pvc", "test-ns" + snaps := []string{"snap1", "snap2", "snap3"} + + // Execute + for _, snap := range snaps { + pvcMap.add(pvc, snap, ns) + } + + // Assert + result := pvcMap.get(pvc, ns) + assert.Len(tt, result, 3) + for _, snap := range snaps { + assert.Contains(tt, result, snap) + } + }) + + t.Run("AddSameSnapshotTwice", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + pvc, ns, snap := "test-pvc", "test-ns", "snap1" + + // Execute + pvcMap.add(pvc, snap, ns) + pvcMap.add(pvc, snap, ns) // Add same snapshot again + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1, "duplicate snapshot should not be added") + assert.Contains(tt, snaps, snap) + }) +} + +func TestPvcToSnapshotsMap_Get(t *testing.T) { + t.Run("GetNonExistentPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "pvc1"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + snaps := pvcMap.get("non-existent-pvc", "test-ns") + + // Assert + assert.Empty(tt, snaps) + }) + + t.Run("GetExistingPVCWithSnapshots", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + } + exp := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps := pvcMap.get(pvc, ns) + + // Assert + assert.ElementsMatch(tt, exp, snaps) + }) +} + +func TestPvcToSnapshotsMap_Delete(t *testing.T) { + t.Run("DeleteSnapshotFromPVC", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap2") + assert.NotContains(tt, snaps, "snap1") + }) + + t.Run("DeleteLastSnapshotRemovesPVCEntry", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcKey := types.NamespacedName{Namespace: ns, Name: pvc} + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + pvcKey: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Empty(tt, snaps) + // Verify PVC entry is removed from map + pvcMap.RLock() + _, exists := pvcMap.items[pvcKey] + pvcMap.RUnlock() + assert.False(tt, exists, "PVC entry should be removed when last snapshot is deleted") + }) + + t.Run("DeleteNonExistentSnapshot", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "non-existent-snap", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap1") + }) + + t.Run("DeleteFromNonExistentPVC", func(tt *testing.T) { + // Setup + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + pvcMap.delete("non-existent-pvc", "snap1", "test-ns") + + // Assert + assert.Empty(tt, pvcMap.items) + }) + + t.Run("DeleteMultipleSnapshotsSequentially", func(tt *testing.T) { + // Setup + pvc, ns := "test-pvc", "test-ns" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + "snap1": struct{}{}, + "snap2": struct{}{}, + "snap3": struct{}{}, + }, + }, + } + + // Execute + pvcMap.delete(pvc, "snap1", ns) + pvcMap.delete(pvc, "snap2", ns) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, "snap3") + }) +} + +func TestInitPVCToSnapshotsMap(t *testing.T) { + t.Run("SkipForNonWorkloadCluster", func(tt *testing.T) { + // Setup + ctx := context.Background() + + // Execute + err := initPVCToSnapshotsMap(ctx, cnstypes.CnsClusterFlavorVanilla) + + // Assert + assert.NoError(tt, err) + }) + + // since `k8sOrchestratorInstance.informerManager` is a struct type, testing the behaviour of + // AddSnapshotListener is not ideal. Since TestPVCToSnapshotsMapEventHandlers tests + // the event handlers, we're good for now. + // TODO: Add tests to verify actual informer logic +} + +// Test the snapshot event handlers directly +func TestPVCToSnapshotsMapEventHandlers(t *testing.T) { + // Helper to create a VolumeSnapshot object + createSnapshot := func(name, namespace, pvcName string) *snapshotv1.VolumeSnapshot { + return &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: &pvcName, + }, + }, + } + } + + t.Run("HandleSnapshotAdded", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns, snapName := "test-pvc", "test-ns", "test-snap" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + snap := createSnapshot(snapName, ns, pvc) + handleSnapshotAdded(ctx, snap, &pvcMap) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 1) + assert.Contains(tt, snaps, snapName) + }) + + t.Run("HandleSnapshotDeleted", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns, snapName := "test-pvc", "test-ns", "test-snap" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: ns, Name: pvc}: { + snapName: struct{}{}, + }, + }, + } + + // Execute + snap := createSnapshot(snapName, ns, pvc) + handleSnapshotDeleted(ctx, snap, &pvcMap) + + // Assert + snaps := pvcMap.get(pvc, ns) + assert.Empty(tt, snaps) + }) + + t.Run("HandleSnapshotAddedWithNilPVCName", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + snap := &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-snap", + Namespace: "test-ns", + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: nil, + }, + }, + } + handleSnapshotAdded(ctx, snap, &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("HandleSnapshotAddedWithInvalidObject", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + handleSnapshotAdded(ctx, "not-a-snapshot-object", &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("HandleSnapshotAddedWithNilSnapshot", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute + var nilSnap *snapshotv1.VolumeSnapshot + handleSnapshotAdded(ctx, nilSnap, &pvcMap) + + // Assert - snapshot should not be added + assert.Empty(tt, pvcMap.items) + }) + + t.Run("MultipleSnapshotsAddedAndDeleted", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvc, ns := "test-pvc", "test-ns" + snap1, snap2, snap3 := "snap1", "snap2", "snap3" + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute - Add multiple snapshots + handleSnapshotAdded(ctx, createSnapshot(snap1, ns, pvc), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot(snap2, ns, pvc), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot(snap3, ns, pvc), &pvcMap) + + // Assert all added + snaps := pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 3) + + // Execute - Delete one snapshot + handleSnapshotDeleted(ctx, createSnapshot(snap2, ns, pvc), &pvcMap) + + // Assert only 2 remain + snaps = pvcMap.get(pvc, ns) + assert.Len(tt, snaps, 2) + assert.Contains(tt, snaps, snap1) + assert.Contains(tt, snaps, snap3) + assert.NotContains(tt, snaps, snap2) + }) + + t.Run("HandleSnapshotDeletedWithNilPVCName", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "test-pvc"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + snap := &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-snap", + Namespace: "test-ns", + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: nil, + }, + }, + } + handleSnapshotDeleted(ctx, snap, &pvcMap) + + // Assert - map should remain unchanged + assert.Len(tt, pvcMap.items, 1) + }) + + t.Run("HandleSnapshotDeletedWithInvalidObject", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: map[types.NamespacedName]map[string]struct{}{ + {Namespace: "test-ns", Name: "test-pvc"}: { + "snap1": struct{}{}, + }, + }, + } + + // Execute + handleSnapshotDeleted(ctx, "not-a-snapshot-object", &pvcMap) + + // Assert - map should remain unchanged + assert.Len(tt, pvcMap.items, 1) + }) + + t.Run("AddAndDeleteAcrossMultiplePVCsAndNamespaces", func(tt *testing.T) { + // Setup + ctx := context.Background() + pvcMap := pvcToSnapshotsMap{ + RWMutex: sync.RWMutex{}, + items: make(map[types.NamespacedName]map[string]struct{}), + } + + // Execute - Add snapshots for different PVCs and namespaces + handleSnapshotAdded(ctx, createSnapshot("snap1", "ns1", "pvc1"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap2", "ns1", "pvc1"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap3", "ns1", "pvc2"), &pvcMap) + handleSnapshotAdded(ctx, createSnapshot("snap4", "ns2", "pvc1"), &pvcMap) + + // Assert all added correctly + assert.Len(tt, pvcMap.get("pvc1", "ns1"), 2) + assert.Len(tt, pvcMap.get("pvc2", "ns1"), 1) + assert.Len(tt, pvcMap.get("pvc1", "ns2"), 1) + + // Execute - Delete snapshots + handleSnapshotDeleted(ctx, createSnapshot("snap1", "ns1", "pvc1"), &pvcMap) + handleSnapshotDeleted(ctx, createSnapshot("snap3", "ns1", "pvc2"), &pvcMap) + + // Assert correct deletions + assert.Len(tt, pvcMap.get("pvc1", "ns1"), 1) + assert.Contains(tt, pvcMap.get("pvc1", "ns1"), "snap2") + assert.Empty(tt, pvcMap.get("pvc2", "ns1")) + assert.Len(tt, pvcMap.get("pvc1", "ns2"), 1) + }) +} diff --git a/pkg/internalapis/featurestates/featurestates.go b/pkg/internalapis/featurestates/featurestates.go index 255a7e32d8..3ee02e43f1 100644 --- a/pkg/internalapis/featurestates/featurestates.go +++ b/pkg/internalapis/featurestates/featurestates.go @@ -122,8 +122,15 @@ func StartSvFSSReplicationService(ctx context.Context, svFeatureStatConfigMapNam return err } + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + // Create k8s Informer and watch on configmaps and namespaces. - informer := k8s.NewInformer(ctx, k8sClient, true) + informer := k8s.NewInformer(ctx, k8sClient, snapshotterClient) // Configmap informer to watch on SV featurestate config-map. err = informer.AddConfigMapListener( ctx, diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go index 51b659a30e..b273f3004d 100644 --- a/pkg/kubernetes/informers.go +++ b/pkg/kubernetes/informers.go @@ -21,6 +21,8 @@ import ( "sync" "time" + snapclientset "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" + "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions" "k8s.io/client-go/informers" v1 "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -44,50 +46,38 @@ const ( ) var ( - inClusterInformerManagerInstance *InformerManager = nil - inClusterInformerInstanceLock = &sync.Mutex{} - supervisorInformerManagerInstance *InformerManager = nil - supervisorInformerInstanceLock = &sync.Mutex{} + informerManagerInstance *InformerManager = nil + informerInstanceLock = &sync.Mutex{} ) func noResyncPeriodFunc() time.Duration { return 0 } -// NewInformer creates a new K8S client based on a service account. -// NOTE: This function expects caller function to pass appropriate client +// NewInformer creates a new K8S informer manager. +// NOTE: This function expects caller function to pass appropriate clients // as per config to be created Informer for. -// This function creates shared informer factory against the client provided. -func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt bool) *InformerManager { +// This function creates shared informer factories against the clients provided. +func NewInformer(ctx context.Context, + client clientset.Interface, + snapshotClient snapclientset.Interface) *InformerManager { var informerInstance *InformerManager log := logger.GetLogger(ctx) - if inClusterClnt { - inClusterInformerInstanceLock.Lock() - defer inClusterInformerInstanceLock.Unlock() - - informerInstance = inClusterInformerManagerInstance - } else { - supervisorInformerInstanceLock.Lock() - defer supervisorInformerInstanceLock.Unlock() - - informerInstance = supervisorInformerManagerInstance - } + informerInstanceLock.Lock() + defer informerInstanceLock.Unlock() + informerInstance = informerManagerInstance if informerInstance == nil { informerInstance = &InformerManager{ - client: client, - stopCh: signals.SetupSignalHandler().Done(), - informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), + client: client, + stopCh: signals.SetupSignalHandler().Done(), + informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), + snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapshotClient, 0), } - if inClusterClnt { - inClusterInformerManagerInstance = informerInstance - log.Info("Created new informer factory for in-cluster client") - } else { - supervisorInformerManagerInstance = informerInstance - log.Info("Created new informer factory for supervisor client") - } + informerManagerInstance = informerInstance + log.Info("Created new informer factory for supervisor client") } return informerInstance @@ -256,6 +246,30 @@ func (im *InformerManager) AddVolumeAttachmentListener(ctx context.Context, add return nil } +// AddSnapshotListener hooks up add, update, delete callbacks. +func (im *InformerManager) AddSnapshotListener(ctx context.Context, + add func(obj any), + update func(oldObj, newObj any), + remove func(obj any)) error { + log := logger.GetLogger(ctx) + if im.snapshotInformer == nil { + im.snapshotInformer = im.snapshotInformerFactory. + Snapshot().V1().VolumeSnapshots().Informer() + } + + _, err := im.snapshotInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) + if err != nil { + return logger.LogNewErrorf( + log, "failed to add event handler on snapshot listener. Error: %v", err) + } + + return nil +} + // GetPVLister returns PV Lister for the calling informer manager. func (im *InformerManager) GetPVLister() corelisters.PersistentVolumeLister { return im.informerFactory.Core().V1().PersistentVolumes().Lister() @@ -285,6 +299,15 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) { } } + + go im.snapshotInformerFactory.Start(im.stopCh) + cacheSync := im.snapshotInformerFactory.WaitForCacheSync(im.stopCh) + for _, isSynced := range cacheSync { + if !isSynced { + return + } + } + return im.stopCh } diff --git a/pkg/kubernetes/informers_test.go b/pkg/kubernetes/informers_test.go new file mode 100644 index 0000000000..0d70dad3ea --- /dev/null +++ b/pkg/kubernetes/informers_test.go @@ -0,0 +1,246 @@ +package kubernetes + +import ( + "context" + "sync" + "testing" + + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func TestNewInformer(t *testing.T) { + t.Run("CreateNewInformerManager", func(tt *testing.T) { + // Setup - Reset global state + informerInstanceLock.Lock() + informerManagerInstance = nil + informerInstanceLock.Unlock() + + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Assert + assert.NotNil(tt, informerMgr) + assert.NotNil(tt, informerMgr.client) + assert.NotNil(tt, informerMgr.informerFactory) + assert.NotNil(tt, informerMgr.snapshotInformerFactory) + assert.NotNil(tt, informerMgr.stopCh) + assert.Equal(tt, k8sClient, informerMgr.client) + }) + + t.Run("ReturnExistingInformerManagerOnSecondCall", func(tt *testing.T) { + // Note: We cannot reset the global state here because signals.SetupSignalHandler() + // can only be called once per process. This test verifies the singleton behavior + // by calling NewInformer twice without resetting. + + ctx := context.Background() + k8sClient1 := testclient.NewSimpleClientset() + snapshotClient1 := snapshotfake.NewSimpleClientset() + + // Execute - First call (may reuse existing instance from previous test) + informerMgr1 := NewInformer(ctx, k8sClient1, snapshotClient1) + + // Execute - Second call with different clients + k8sClient2 := testclient.NewSimpleClientset() + snapshotClient2 := snapshotfake.NewSimpleClientset() + informerMgr2 := NewInformer(ctx, k8sClient2, snapshotClient2) + + // Assert - Should return the same instance (singleton pattern) + assert.Equal(tt, informerMgr1, informerMgr2, "Should return the same singleton instance") + }) + + t.Run("ThreadSafeInitialization", func(tt *testing.T) { + // Note: We cannot reset the global state because signals.SetupSignalHandler() + // can only be called once. This test verifies thread-safety by calling + // NewInformer concurrently and ensuring all calls return the same instance. + + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute - Call NewInformer concurrently + const numGoroutines = 10 + results := make([]*InformerManager, numGoroutines) + done := make(chan bool, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(index int) { + results[index] = NewInformer(ctx, k8sClient, snapshotClient) + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Assert - All should return the same instance + firstInstance := results[0] + assert.NotNil(tt, firstInstance) + for i := 1; i < numGoroutines; i++ { + assert.Equal(tt, firstInstance, results[i], + "All concurrent calls should return the same instance") + } + }) + + t.Run("VerifyInformerManagerFields", func(tt *testing.T) { + // This test verifies that the informer manager has all expected fields set + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + + // Execute + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Assert - Verify all fields are properly initialized + assert.NotNil(tt, informerMgr) + assert.NotNil(tt, informerMgr.client, "client should be set") + assert.NotNil(tt, informerMgr.informerFactory, "informerFactory should be set") + assert.NotNil(tt, informerMgr.snapshotInformerFactory, "snapshotInformerFactory should be set") + assert.NotNil(tt, informerMgr.stopCh, "stopCh should be set") + + // Verify that the informer factories are usable + assert.NotNil(tt, informerMgr.informerFactory.Core(), "Core informer factory should be accessible") + assert.NotNil(tt, informerMgr.snapshotInformerFactory.Snapshot(), + "Snapshot informer factory should be accessible") + }) +} + +func TestInformerManager_AddSnapshotListener(t *testing.T) { + // Helper to create a VolumeSnapshot object + createSnapshot := func(name, namespace, pvcName string) *snapshotv1.VolumeSnapshot { + return &snapshotv1.VolumeSnapshot{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: snapshotv1.VolumeSnapshotSpec{ + Source: snapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: &pvcName, + }, + }, + } + } + + t.Run("AddSnapshotListenerWithNilHandlers", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Execute - Pass nil handlers (should be allowed) + err := informerMgr.AddSnapshotListener(ctx, nil, nil, nil) + + // Assert + assert.NoError(tt, err) + assert.NotNil(tt, informerMgr.snapshotInformer, "snapshotInformer should be initialized") + }) + + t.Run("HandlersIgnoreInvalidObjectTypes", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + processedCount := 0 + addFunc := func(obj any) { + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + processedCount++ + } + } + + // Execute + err := informerMgr.AddSnapshotListener(ctx, addFunc, nil, nil) + assert.NoError(tt, err) + + // Simulate events with invalid types + addFunc("not-a-snapshot") + addFunc(123) + addFunc(nil) + + // Assert - Invalid objects should be ignored + assert.Equal(tt, 0, processedCount, "Handler should ignore invalid object types") + + // Now send a valid snapshot + snap := createSnapshot("snap1", "default", "pvc1") + addFunc(snap) + assert.Equal(tt, 1, processedCount, "Handler should process valid snapshot") + }) + + t.Run("EventHandlersTrackSnapshots", func(tt *testing.T) { + // Setup + ctx := context.Background() + k8sClient := testclient.NewSimpleClientset() + snapshotClient := snapshotfake.NewSimpleClientset() + informerMgr := NewInformer(ctx, k8sClient, snapshotClient) + + // Track events with dummy maps + addedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + updatedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + deletedSnapshots := make(map[string]*snapshotv1.VolumeSnapshot) + var mu sync.Mutex + + addFunc := func(obj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + addedSnapshots[snap.Name] = snap + } + } + + updateFunc := func(oldObj, newObj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := newObj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + updatedSnapshots[snap.Name] = snap + } + } + + deleteFunc := func(obj any) { + mu.Lock() + defer mu.Unlock() + snap, ok := obj.(*snapshotv1.VolumeSnapshot) + if ok && snap != nil { + deletedSnapshots[snap.Name] = snap + } + } + + // Execute + err := informerMgr.AddSnapshotListener(ctx, addFunc, updateFunc, deleteFunc) + + // Assert + assert.NoError(tt, err) + assert.NotNil(tt, informerMgr.snapshotInformer) + + // Verify handlers are properly set up by simulating events + snap1 := createSnapshot("snap1", "default", "pvc1") + addFunc(snap1) + mu.Lock() + assert.Contains(tt, addedSnapshots, "snap1", "Add handler should track snapshot") + mu.Unlock() + + snap2 := createSnapshot("snap2", "default", "pvc2") + updateFunc(nil, snap2) + mu.Lock() + assert.Contains(tt, updatedSnapshots, "snap2", "Update handler should track snapshot") + mu.Unlock() + + deleteFunc(snap1) + mu.Lock() + assert.Contains(tt, deletedSnapshots, "snap1", "Delete handler should track snapshot") + mu.Unlock() + }) +} diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go index 3022317f70..1e24cd75c3 100644 --- a/pkg/kubernetes/types.go +++ b/pkg/kubernetes/types.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -50,8 +51,11 @@ type InformerManager struct { client clientset.Interface // main shared informer factory informerFactory informers.SharedInformerFactory + // snapshot informer factory + snapshotInformerFactory externalversions.SharedInformerFactory + // main signal - stopCh (<-chan struct{}) + stopCh <-chan struct{} // node informer nodeInformer cache.SharedInformer @@ -83,4 +87,7 @@ type InformerManager struct { // volume attachment informer volumeAttachmentInformer cache.SharedInformer + + // snapshot informer + snapshotInformer cache.SharedInformer } diff --git a/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go b/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go index 7e6b907193..8fe48907bf 100644 --- a/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go +++ b/pkg/syncer/admissionhandler/cnscsi_admissionhandler_test.go @@ -47,6 +47,11 @@ type MockCOCommonInterface struct { mock.Mock } +func (m *MockCOCommonInterface) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + //TODO implement me + panic("implement me") +} + func (m *MockCOCommonInterface) GetPVCNamespacedNameByUID(uid string) (apitypes.NamespacedName, bool) { //TODO implement me panic("implement me") diff --git a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go index 4240944767..937f1ab64a 100644 --- a/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go +++ b/pkg/syncer/cnsoperator/controller/cnsregistervolume/cnsregistervolume_controller_test.go @@ -223,6 +223,11 @@ func (m *mockVolumeManager) SyncVolume(ctx context.Context, type mockCOCommon struct{} +func (m *mockCOCommon) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string { + //TODO implement me + panic("implement me") +} + func (m *mockCOCommon) GetPVCNamespacedNameByUID(uid string) (types.NamespacedName, bool) { //TODO implement me panic("implement me") diff --git a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go index 09180cf4e8..aa7d99190b 100644 --- a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go +++ b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go @@ -21,7 +21,6 @@ import ( "errors" "strings" - snapshotclient "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,7 +110,7 @@ func _getVolumeUsageInfo(ctx context.Context, pvcName string, pvcNamespace strin return &volumeUsageInfo, nil } - volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace, *cfg) + volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace) if err != nil { return nil, err } @@ -222,35 +221,16 @@ func getPodsForPVC(ctx context.Context, pvcName string, pvcNamespace string, } // getSnapshotsForPVC returns a list of snapshots that are created for the specified PVC. -func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string, - cfg rest.Config) ([]string, bool, error) { +func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string) ([]string, bool, error) { log := logger.GetLogger(ctx) - c, err := snapshotclient.NewForConfig(&cfg) - if err != nil { - log.Warnf("Failed to create snapshot client for PVC %q in namespace %q. Error: %q", - pvcName, pvcNamespace, err.Error()) - return nil, false, errors.New("failed to create snapshot client") - } - - // TODO: check if we can use informer cache - list, err := c.SnapshotV1().VolumeSnapshots(pvcNamespace).List(ctx, metav1.ListOptions{}) - if err != nil { - log.Warnf("Failed to list VolumeSnapshots in namespace %q for PVC %q. Error: %q", - pvcNamespace, pvcName, err.Error()) - return nil, false, errors.New("failed to list VolumeSnapshots") - } - - var snapshots []string - for _, snap := range list.Items { - if snap.Spec.Source.PersistentVolumeClaimName == nil || - *snap.Spec.Source.PersistentVolumeClaimName != pvcName { - continue - } - - snapshots = append(snapshots, snap.Name) + if commonco.ContainerOrchestratorUtility == nil { + err := errors.New("ContainerOrchestratorUtility is not initialized") + log.Warn(err) + return nil, false, err } - return snapshots, len(snapshots) > 0, nil + snaps := commonco.ContainerOrchestratorUtility.GetSnapshotsForPVC(ctx, pvcName, pvcNamespace) + return snaps, len(snaps) > 0, nil } // getGuestClustersForPVC returns a list of guest clusters that are using the specified PVC. diff --git a/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go new file mode 100644 index 0000000000..5582c58c75 --- /dev/null +++ b/pkg/syncer/cnsoperator/controller/cnsunregistervolume/util_test.go @@ -0,0 +1,57 @@ +package cnsunregistervolume + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/unittestcommon" + "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco" +) + +func TestGetSnapshotsForPVC(t *testing.T) { + containerOrchestratorUtilityOriginal := commonco.ContainerOrchestratorUtility + defer func() { + commonco.ContainerOrchestratorUtility = containerOrchestratorUtilityOriginal + }() + + t.Run("WhenContainerOrchestratorIsNotInitialised", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = nil + expErr := errors.New("ContainerOrchestratorUtility is not initialized") + + // Execute + _, _, err := getSnapshotsForPVC(context.Background(), "", "") + + // Assert + assert.Equal(tt, expErr, err) + }) + + t.Run("WhenPVCHasNoSnapshots", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = &unittestcommon.FakeK8SOrchestrator{} + + // Execute + snaps, inUse, err := getSnapshotsForPVC(context.Background(), "no-snaps", "") + + // Assert + assert.NoError(tt, err) + assert.False(tt, inUse) + assert.Empty(tt, snaps) + }) + + t.Run("WhenPVCHasSnapshots", func(tt *testing.T) { + // Setup + commonco.ContainerOrchestratorUtility = &unittestcommon.FakeK8SOrchestrator{} + expSnaps := []string{"snap1", "snap2", "snap3"} + + // Execute + snaps, inUse, err := getSnapshotsForPVC(context.Background(), "with-snaps", "") + + // Assert + assert.NoError(tt, err) + assert.True(tt, inUse) + assert.Equal(tt, expSnaps, snaps) + }) +} diff --git a/pkg/syncer/metadatasyncer.go b/pkg/syncer/metadatasyncer.go index 21f300db67..ac687b31d6 100644 --- a/pkg/syncer/metadatasyncer.go +++ b/pkg/syncer/metadatasyncer.go @@ -248,6 +248,13 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl return err } + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return err + } + // Initialize the k8s orchestrator interface. metadataSyncer.coCommonInterface, err = commonco.GetContainerOrchestratorInterface(ctx, common.Kubernetes, clusterFlavor, COInitParams) @@ -633,7 +640,7 @@ func InitMetadataSyncer(ctx context.Context, clusterFlavor cnstypes.CnsClusterFl } // Set up kubernetes resource listeners for metadata syncer. - metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sClient, true) + metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient) err = metadataSyncer.k8sInformerManager.AddPVCListener( ctx, nil, // Add. diff --git a/pkg/syncer/storagepool/service.go b/pkg/syncer/storagepool/service.go index 49ec945acf..bc08c96b0c 100644 --- a/pkg/syncer/storagepool/service.go +++ b/pkg/syncer/storagepool/service.go @@ -120,7 +120,15 @@ func InitStoragePoolService(ctx context.Context, log.Errorf("Creating Kubernetes client failed. Err: %v", err) return } - k8sInformerManager := k8s.NewInformer(ctx, k8sClient, true) + + // Create the snapshotter client. + snapshotterClient, err := k8s.NewSnapshotterClient(ctx) + if err != nil { + log.Errorf("Creating Snapshotter client failed. Err: %v", err) + return + } + + k8sInformerManager := k8s.NewInformer(ctx, k8sClient, snapshotterClient) err = InitNodeAnnotationListener(ctx, k8sInformerManager, scWatchCntlr, spController) if err != nil { log.Errorf("InitNodeAnnotationListener failed. err: %v", err) diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go index d052b2d06d..6865cc1986 100644 --- a/pkg/syncer/syncer_test.go +++ b/pkg/syncer/syncer_test.go @@ -26,6 +26,8 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/google/uuid" + "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" + snapshotfake "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned/fake" cnstypes "github.com/vmware/govmomi/cns/types" "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/vim25/mo" @@ -69,17 +71,18 @@ const ( ) var ( - csiConfig *cnsconfig.Config - ctx context.Context - cnsVCenterConfig *cnsvsphere.VirtualCenterConfig - err error - virtualCenter *cnsvsphere.VirtualCenter - metadataSyncer *metadataSyncInformer - k8sclient clientset.Interface - dc []*cnsvsphere.Datacenter - volumeManager cnsvolumes.Manager - dsList []vimtypes.ManagedObjectReference - cancel context.CancelFunc + csiConfig *cnsconfig.Config + ctx context.Context + cnsVCenterConfig *cnsvsphere.VirtualCenterConfig + err error + virtualCenter *cnsvsphere.VirtualCenter + metadataSyncer *metadataSyncInformer + k8sclient clientset.Interface + snapshotterClient versioned.Interface + dc []*cnsvsphere.Datacenter + volumeManager cnsvolumes.Manager + dsList []vimtypes.ManagedObjectReference + cancel context.CancelFunc ) func TestSyncerWorkflows(t *testing.T) { @@ -169,7 +172,9 @@ func TestSyncerWorkflows(t *testing.T) { // Here we should use a faked client to avoid test inteference with running // metadata syncer pod in real Kubernetes cluster. k8sclient = testclient.NewSimpleClientset() - metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sclient, true) + // Create a fake snapshot client for testing. + snapshotterClient = snapshotfake.NewSimpleClientset() + metadataSyncer.k8sInformerManager = k8s.NewInformer(ctx, k8sclient, snapshotterClient) metadataSyncer.k8sInformerManager.GetPodLister() metadataSyncer.pvLister = metadataSyncer.k8sInformerManager.GetPVLister() metadataSyncer.pvcLister = metadataSyncer.k8sInformerManager.GetPVCLister()