Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-version v1.6.0
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.36.0
github.com/pkg/sftp v1.13.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 h1:tVPvlL5N5X598hrO3g9rhyoi6h0LP4RpSJlGHItsbEE=
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1/go.mod h1:pacx+PW7lLlu6kAvpr8Lgq/5fdiAsKxOtXXFHMaLMb8=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 h1:Q3jQ1NkFqv5o+F8dMmHd8SfEmlcwNeo1immFApntEwE=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 h1:bMqrb3UHgHbP+PW9VwiejfDJU1R0PpXVZNMdeH8WYKI=
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
Expand Down
11 changes: 10 additions & 1 deletion pkg/common/cns-lib/node/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ type Nodes struct {
// Initialize helps initialize node manager and node informer manager.
func (nodes *Nodes) Initialize(ctx context.Context) error {
nodes.cnsNodeManager = GetManager(ctx)

k8sclient, err := k8s.NewClient(ctx)
if err != nil {
log := logger.GetLogger(ctx)
log.Errorf("Creating Kubernetes client failed. Err: %v", err)
return err
}

snapshotterClient, err := k8s.NewSnapshotterClient(ctx)
if err != nil {
log := logger.GetLogger(ctx)
log.Errorf("Creating Snapshotter client failed. Err: %v", err)
return err
}

nodes.cnsNodeManager.SetKubernetesClient(k8sclient)
nodes.informMgr = k8s.NewInformer(ctx, k8sclient, true)
nodes.informMgr = k8s.NewInformer(ctx, k8sclient, snapshotterClient)
err = nodes.informMgr.AddCSINodeListener(ctx, nodes.csiNodeAdd,
nodes.csiNodeUpdate, nodes.csiNodeDelete)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/common/unittestcommon/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,14 @@ func (c *FakeK8SOrchestrator) SetCSINodeTopologyInstances(instances []interface{
c.csiNodeTopologyInstances = instances
}

func (c *FakeK8SOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string {
if strings.Contains(pvcName, "no-snap") {
return []string{}
}

return []string{"snap1", "snap2", "snap3"}
}

// configFromVCSim starts a vcsim instance and returns config for use against the
// vcsim instance. The vcsim instance is configured with an empty tls.Config.
func configFromVCSim(vcsimParams VcsimParams, isTopologyEnv bool) (*config.Config, func()) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/csi/service/common/commonco/coagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type COCommonInterface interface {
// GetPVCNamespacedNameByUID returns the PVC's namespaced name (namespace/name) for the given UID.
// If the PVC is not found in the cache, it returns an empty string and false.
GetPVCNamespacedNameByUID(uid string) (k8stypes.NamespacedName, bool)
// GetSnapshotsForPVC returns the names of the snapshots for the given PVC
GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string
}

// GetContainerOrchestratorInterface returns orchestrator object for a given
Expand Down
148 changes: 147 additions & 1 deletion pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync/atomic"
"time"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
"google.golang.org/grpc/codes"
"k8s.io/client-go/util/retry"

Expand Down Expand Up @@ -239,6 +240,71 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) {
return volumeName, found
}

// pvcToSnapshotsMap maps a PVC to its snapshots.
// The primary key is the namespaced name of the PVC and value is a map.
// The key of the inner map is the name of the snapshot.
type pvcToSnapshotsMap struct {
sync.RWMutex
items map[k8stypes.NamespacedName]map[string]struct{}
}

func (m *pvcToSnapshotsMap) add(pvc, snapshot, namespace string) {
m.Lock()
defer m.Unlock()

pvcKey := k8stypes.NamespacedName{
Namespace: namespace,
Name: pvc,
}
if _, ok := m.items[pvcKey]; !ok {
m.items[pvcKey] = make(map[string]struct{})
}
m.items[pvcKey][snapshot] = struct{}{}
}

func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string {
m.RLock()
defer m.RUnlock()

pvcKey := k8stypes.NamespacedName{
Namespace: namespace,
Name: pvc,
}
snapMap, ok := m.items[pvcKey]
if !ok {
return []string{}
}

snaps := make([]string, 0, len(snapMap))
for snap := range snapMap {
snaps = append(snaps, snap)
}
return snaps
}

func (m *pvcToSnapshotsMap) delete(pvc, snapshot, namespace string) {
m.Lock()
defer m.Unlock()

pvcKey := k8stypes.NamespacedName{
Namespace: namespace,
Name: pvc,
}
snapMap, ok := m.items[pvcKey]
if !ok {
return
}

delete(snapMap, snapshot)
if len(snapMap) != 0 {
m.items[pvcKey] = snapMap
return
}

// delete pvc entry if no snapshots are present
delete(m.items, pvcKey)
}

// K8sOrchestrator defines set of properties specific to K8s.
type K8sOrchestrator struct {
supervisorFSS FSSConfigMapInfo
Expand All @@ -251,6 +317,7 @@ type K8sOrchestrator struct {
nodeIDToNameMap *nodeIDToNameMap
volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled
volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled
pvcToSnapshotsMap pvcToSnapshotsMap
k8sClient clientset.Interface
snapshotterClient snapshotterClientSet.Interface
// pvcUIDCache maps PVC UID to its namespaced name (namespace/name).
Expand Down Expand Up @@ -319,7 +386,7 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn
k8sOrchestratorInstance.clusterFlavor = controllerClusterFlavor
k8sOrchestratorInstance.k8sClient = k8sClient
k8sOrchestratorInstance.snapshotterClient = snapshotterClient
k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, true)
k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient)
coInstanceErr = initFSS(ctx, k8sClient, controllerClusterFlavor, params)
if coInstanceErr != nil {
log.Errorf("Failed to initialize the orchestrator. Error: %v", coInstanceErr)
Expand Down Expand Up @@ -381,6 +448,11 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn
}
}

err := initPVCToSnapshotsMap(ctx, controllerClusterFlavor)
if err != nil {
return nil, fmt.Errorf("failed to create PVC to snapshots map. Error: %v", err)
}

k8sOrchestratorInstance.informerManager.Listen()
atomic.StoreUint32(&k8sOrchestratorInstanceInitialized, 1)
log.Info("k8sOrchestratorInstance initialized")
Expand Down Expand Up @@ -1945,6 +2017,75 @@ func nodeRemove(obj interface{}) {
k8sOrchestratorInstance.nodeIDToNameMap.remove(nodeMoID)
}

// handleSnapshotAdded handles the snapshot add event by adding it to the pvcToSnapshotsMap cache.
func handleSnapshotAdded(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) {
log := logger.GetLogger(ctx)
snap, ok := obj.(*snapshotv1.VolumeSnapshot)
if !ok || snap == nil {
log.Warnf("unrecognized object %+v", obj)
return
}

if snap.Spec.Source.PersistentVolumeClaimName == nil {
log.Warnf("snapshot is not associated with any PVC. Ignoring it...")
return
}

pvcMap.add(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace)
log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name).
With("namespace", snap.Namespace).Debug("successfully added the snapshot to the cache")
}

// handleSnapshotDeleted handles the snapshot delete event by removing it from the pvcToSnapshotsMap cache.
func handleSnapshotDeleted(ctx context.Context, obj any, pvcMap *pvcToSnapshotsMap) {
log := logger.GetLogger(ctx)
snap, ok := obj.(*snapshotv1.VolumeSnapshot)
if !ok || snap == nil {
log.Warnf("unrecognized object %+v", obj)
return
}

if snap.Spec.Source.PersistentVolumeClaimName == nil {
log.Warnf("snapshot is not associated with any PVC. Ignoring it...")
return
}

pvcMap.delete(*snap.Spec.Source.PersistentVolumeClaimName, snap.Name, snap.Namespace)
log.With("pvc", *snap.Spec.Source.PersistentVolumeClaimName).With("snapshot", snap.Name).
With("namespace", snap.Namespace).Debug("successfully removed the snapshot from the cache")
}

func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error {
log := logger.GetLogger(ctx)
// TODO: check if we need to check the FSS as well
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload {
// PVC to VolumeSnapshot cache is only required for WCP for now.
log.Info("non WCP cluster detected. skipping initialising PVC to Snapshot cache.")
return nil
}

k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{
RWMutex: sync.RWMutex{},
items: make(map[k8stypes.NamespacedName]map[string]struct{}),
}

err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This informer should be conditionally added. In Vanilla cluster, VolumeSnapshot CRD may not be installed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xing-yang great catch! Let me fix that

func(obj any) {
handleSnapshotAdded(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap)
},
// Since the name of PVC associated with a snapshot is immutable,
// update events do not have any impact on the state of the cache.
nil,
func(obj any) {
handleSnapshotDeleted(ctx, obj, &k8sOrchestratorInstance.pvcToSnapshotsMap)
})
if err != nil {
return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err)
}

return nil
}

// GetNodeIDtoNameMap returns a map containing the nodeID to node name
func (c *K8sOrchestrator) GetNodeIDtoNameMap(ctx context.Context) map[string]string {
return c.nodeIDToNameMap.items
Expand Down Expand Up @@ -2259,6 +2400,11 @@ func (c *K8sOrchestrator) GetPVCNamespacedNameByUID(uid string) (k8stypes.Namesp
return nsName, true
}

// GetSnapshotsForPVC returns the names of the snapshots for the given PVC
func (c *K8sOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvc, namespace string) []string {
return c.pvcToSnapshotsMap.get(pvc, namespace)
}

// GetPVCDataSource Retrieves the VolumeSnapshot source when a PVC from VolumeSnapshot is being created.
func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1.ObjectReference, error) {
var dataSource v1.ObjectReference
Expand Down
Loading