Skip to content

Commit b565b5b

Browse files
committed
Refactor snapshot retrieval to use ContainerOrchestratorUtility
Simplify the getSnapshotsForPVC function by delegating snapshot retrieval to the ContainerOrchestratorUtility interface instead of directly using the snapshot client. This improves testability and maintains consistency with the existing architecture. Changes: - Remove direct dependency on external-snapshotter client in util.go - Update getSnapshotsForPVC to use ContainerOrchestratorUtility.GetSnapshotsForPVC - Remove rest.Config parameter from getSnapshotsForPVC function signature - Add comprehensive unit tests for getSnapshotsForPVC covering: * Uninitialized ContainerOrchestratorUtility scenario * PVC with no snapshots * PVC with multiple snapshots - Implement GetSnapshotsForPVC in FakeK8SOrchestrator for testing - Add TODO comment for future refactoring of pvcToSnapshotsMap key type Benefits: - Improved testability through dependency injection - Reduced coupling to external snapshot client - Consistent error handling - Better alignment with existing orchestrator abstraction pattern
1 parent 205905d commit b565b5b

File tree

11 files changed

+180
-117
lines changed

11 files changed

+180
-117
lines changed

pkg/common/cns-lib/node/nodes.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,23 @@ type Nodes struct {
3737
// Initialize helps initialize node manager and node informer manager.
3838
func (nodes *Nodes) Initialize(ctx context.Context) error {
3939
nodes.cnsNodeManager = GetManager(ctx)
40+
4041
k8sclient, err := k8s.NewClient(ctx)
4142
if err != nil {
4243
log := logger.GetLogger(ctx)
4344
log.Errorf("Creating Kubernetes client failed. Err: %v", err)
4445
return err
4546
}
47+
48+
snapshotterClient, err := k8s.NewSnapshotterClient(ctx)
49+
if err != nil {
50+
log := logger.GetLogger(ctx)
51+
log.Errorf("Creating Snapshotter client failed. Err: %v", err)
52+
return err
53+
}
54+
4655
nodes.cnsNodeManager.SetKubernetesClient(k8sclient)
47-
nodes.informMgr = k8s.NewInformer(ctx, k8sclient, true)
56+
nodes.informMgr = k8s.NewInformer(ctx, k8sclient, snapshotterClient)
4857
err = nodes.informMgr.AddCSINodeListener(ctx, nodes.csiNodeAdd,
4958
nodes.csiNodeUpdate, nodes.csiNodeDelete)
5059
if err != nil {

pkg/common/unittestcommon/utils.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,11 @@ func (c *FakeK8SOrchestrator) SetCSINodeTopologyInstances(instances []interface{
517517
}
518518

519519
func (c *FakeK8SOrchestrator) GetSnapshotsForPVC(ctx context.Context, pvcName, namespace string) []string {
520-
//TODO implement me
521-
panic("implement me")
520+
if strings.Contains(pvcName, "no-snap") {
521+
return []string{}
522+
}
523+
524+
return []string{"snap1", "snap2", "snap3"}
522525
}
523526

524527
// configFromVCSim starts a vcsim instance and returns config for use against the

pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -240,56 +240,44 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) {
240240
return volumeName, found
241241
}
242242

243-
type namespacedName struct {
244-
namespace, name string
245-
}
246-
247-
func (n namespacedName) String() string {
248-
return fmt.Sprintf("%s/%s", n.namespace, n.name)
249-
}
250-
251243
// pvcToSnapshotsMap maps a PVC to its snapshots.
252-
// Key is the namespaced name of the PVC and value is a map.
253-
// The key of the inner map is the namespaced name of the snapshot.
244+
// The primary key is the namespaced name of the PVC and value is a map.
245+
// The key of the inner map is the name of the snapshot.
254246
type pvcToSnapshotsMap struct {
255247
*sync.RWMutex
256-
items map[namespacedName]map[namespacedName]struct{}
248+
items map[k8stypes.NamespacedName]map[string]struct{}
257249
}
258250

259251
func (m *pvcToSnapshotsMap) add(pvc, snapshot, namespace string) {
260252
m.Lock()
261253
defer m.Unlock()
262254

263-
pvcKey := namespacedName{
264-
namespace: namespace,
265-
name: pvc,
255+
pvcKey := k8stypes.NamespacedName{
256+
Namespace: namespace,
257+
Name: pvc,
266258
}
267259
if _, ok := m.items[pvcKey]; !ok {
268-
m.items[pvcKey] = make(map[namespacedName]struct{})
269-
}
270-
snapKey := namespacedName{
271-
namespace: namespace,
272-
name: snapshot,
260+
m.items[pvcKey] = make(map[string]struct{})
273261
}
274-
m.items[pvcKey][snapKey] = struct{}{}
262+
m.items[pvcKey][snapshot] = struct{}{}
275263
}
276264

277265
func (m *pvcToSnapshotsMap) get(pvc, namespace string) []string {
278266
m.RLock()
279267
defer m.RUnlock()
280268

281-
pvcKey := namespacedName{
282-
namespace: namespace,
283-
name: pvc,
269+
pvcKey := k8stypes.NamespacedName{
270+
Namespace: namespace,
271+
Name: pvc,
284272
}
285273
snapMap, ok := m.items[pvcKey]
286274
if !ok {
287275
return []string{}
288276
}
289277

290278
snaps := make([]string, 0, len(snapMap))
291-
for k := range snapMap {
292-
snaps = append(snaps, k.name)
279+
for snap := range snapMap {
280+
snaps = append(snaps, snap)
293281
}
294282
return snaps
295283
}
@@ -298,20 +286,16 @@ func (m *pvcToSnapshotsMap) delete(pvc, snapshot, namespace string) {
298286
m.Lock()
299287
defer m.Unlock()
300288

301-
pvcKey := namespacedName{
302-
namespace: namespace,
303-
name: pvc,
289+
pvcKey := k8stypes.NamespacedName{
290+
Namespace: namespace,
291+
Name: pvc,
304292
}
305293
snapMap, ok := m.items[pvcKey]
306294
if !ok {
307295
return
308296
}
309297

310-
snapKey := namespacedName{
311-
namespace: namespace,
312-
name: snapshot,
313-
}
314-
delete(snapMap, snapKey)
298+
delete(snapMap, snapshot)
315299
if len(snapMap) != 0 {
316300
m.items[pvcKey] = snapMap
317301
return
@@ -402,7 +386,7 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn
402386
k8sOrchestratorInstance.clusterFlavor = controllerClusterFlavor
403387
k8sOrchestratorInstance.k8sClient = k8sClient
404388
k8sOrchestratorInstance.snapshotterClient = snapshotterClient
405-
k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, true)
389+
k8sOrchestratorInstance.informerManager = k8s.NewInformer(ctx, k8sClient, snapshotterClient)
406390
coInstanceErr = initFSS(ctx, k8sClient, controllerClusterFlavor, params)
407391
if coInstanceErr != nil {
408392
log.Errorf("Failed to initialize the orchestrator. Error: %v", coInstanceErr)
@@ -2044,7 +2028,7 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes
20442028

20452029
k8sOrchestratorInstance.pvcToSnapshotsMap = pvcToSnapshotsMap{
20462030
RWMutex: &sync.RWMutex{},
2047-
items: make(map[namespacedName]map[namespacedName]struct{}),
2031+
items: make(map[k8stypes.NamespacedName]map[string]struct{}),
20482032
}
20492033
snapshotAdded := func(obj any) {
20502034
snap, ok := obj.(*snapshotv1.VolumeSnapshot)
@@ -2085,6 +2069,8 @@ func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes
20852069
func(obj any) {
20862070
snapshotAdded(obj)
20872071
},
2072+
// Since the name of PVC associated with a snapshot is immutable,
2073+
// update events do not have any impact on the state of the cache.
20882074
nil,
20892075
func(obj any) {
20902076
snapshotDeleted(obj)

pkg/internalapis/featurestates/featurestates.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,15 @@ func StartSvFSSReplicationService(ctx context.Context, svFeatureStatConfigMapNam
122122
return err
123123
}
124124

125+
// Create the snapshotter client.
126+
snapshotterClient, err := k8s.NewSnapshotterClient(ctx)
127+
if err != nil {
128+
log.Errorf("Creating Snapshotter client failed. Err: %v", err)
129+
return err
130+
}
131+
125132
// Create k8s Informer and watch on configmaps and namespaces.
126-
informer := k8s.NewInformer(ctx, k8sClient, true)
133+
informer := k8s.NewInformer(ctx, k8sClient, snapshotterClient)
127134
// Configmap informer to watch on SV featurestate config-map.
128135
err = informer.AddConfigMapListener(
129136
ctx,

pkg/kubernetes/informers.go

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ package kubernetes
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"sync"
2322
"time"
2423

24+
snapclientset "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned"
2525
"github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions"
2626
"k8s.io/client-go/informers"
2727
v1 "k8s.io/client-go/informers/core/v1"
@@ -46,8 +46,6 @@ const (
4646
)
4747

4848
var (
49-
inClusterInformerManagerInstance *InformerManager = nil
50-
inClusterInformerInstanceLock = &sync.Mutex{}
5149
supervisorInformerManagerInstance *InformerManager = nil
5250
supervisorInformerInstanceLock = &sync.Mutex{}
5351
)
@@ -56,48 +54,30 @@ func noResyncPeriodFunc() time.Duration {
5654
return 0
5755
}
5856

59-
// NewInformer creates a new K8S client based on a service account.
60-
// NOTE: This function expects caller function to pass appropriate client
57+
// NewInformer creates a new K8S informer manager.
58+
// NOTE: This function expects caller function to pass appropriate clients
6159
// as per config to be created Informer for.
62-
// This function creates shared informer factory against the client provided.
63-
func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt bool) *InformerManager {
60+
// This function creates shared informer factories against the clients provided.
61+
func NewInformer(ctx context.Context,
62+
client clientset.Interface,
63+
snapshotClient snapclientset.Interface) *InformerManager {
6464
var informerInstance *InformerManager
6565
log := logger.GetLogger(ctx)
6666

67-
if inClusterClnt {
68-
inClusterInformerInstanceLock.Lock()
69-
defer inClusterInformerInstanceLock.Unlock()
70-
71-
informerInstance = inClusterInformerManagerInstance
72-
} else {
73-
supervisorInformerInstanceLock.Lock()
74-
defer supervisorInformerInstanceLock.Unlock()
75-
76-
informerInstance = supervisorInformerManagerInstance
77-
}
78-
79-
// TODO: check if callers can pass this
80-
snapClient, err := NewSnapshotterClient(ctx)
81-
if err != nil {
82-
// TODO: handle error appropriately
83-
log.Fatalf("unable to initialise snapshot client")
84-
}
67+
supervisorInformerInstanceLock.Lock()
68+
defer supervisorInformerInstanceLock.Unlock()
69+
informerInstance = supervisorInformerManagerInstance
8570

8671
if informerInstance == nil {
8772
informerInstance = &InformerManager{
8873
client: client,
8974
stopCh: signals.SetupSignalHandler().Done(),
9075
informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()),
91-
snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapClient, 0),
76+
snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapshotClient, 0),
9277
}
9378

94-
if inClusterClnt {
95-
inClusterInformerManagerInstance = informerInstance
96-
log.Info("Created new informer factory for in-cluster client")
97-
} else {
98-
supervisorInformerManagerInstance = informerInstance
99-
log.Info("Created new informer factory for supervisor client")
100-
}
79+
supervisorInformerManagerInstance = informerInstance
80+
log.Info("Created new informer factory for supervisor client")
10181
}
10282

10383
return informerInstance
@@ -322,8 +302,6 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) {
322302

323303
go im.snapshotInformerFactory.Start(im.stopCh)
324304
cacheSync := im.snapshotInformerFactory.WaitForCacheSync(im.stopCh)
325-
// TODO: remove
326-
fmt.Print(cacheSync)
327305
for _, isSynced := range cacheSync {
328306
if !isSynced {
329307
return
@@ -332,3 +310,26 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) {
332310

333311
return im.stopCh
334312
}
313+
314+
// NewConfigMapListener creates a new configmap listener in the given namespace.
315+
// NOTE: This creates a NewSharedIndexInformer everytime and does not use the informer factory.
316+
// Only use this function when you need a configmap listener in a different namespace than the
317+
// one already present in the informer factory.
318+
func NewConfigMapListener(ctx context.Context, client clientset.Interface, namespace string,
319+
add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) error {
320+
log := logger.GetLogger(ctx)
321+
configMapInformer := v1.NewFilteredConfigMapInformer(client, namespace, resyncPeriodConfigMapInformer,
322+
cache.Indexers{}, nil)
323+
324+
_, err := configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
325+
AddFunc: add,
326+
UpdateFunc: update,
327+
DeleteFunc: remove,
328+
})
329+
if err != nil {
330+
return logger.LogNewErrorf(log, "failed to add event handler on configmap listener. Error: %v", err)
331+
}
332+
stopCh := make(chan struct{})
333+
go configMapInformer.Run(stopCh)
334+
return nil
335+
}

pkg/kubernetes/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type InformerManager struct {
5151
client clientset.Interface
5252
// main shared informer factory
5353
informerFactory informers.SharedInformerFactory
54-
54+
// snapshot informer factory
5555
snapshotInformerFactory externalversions.SharedInformerFactory
5656

5757
// main signal

pkg/syncer/cnsoperator/controller/cnsunregistervolume/util.go

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"errors"
2222
"strings"
2323

24-
snapshotclient "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned"
2524
vmoperatortypes "github.com/vmware-tanzu/vm-operator/api/v1alpha2"
2625
apierrors "k8s.io/apimachinery/pkg/api/errors"
2726
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -111,7 +110,7 @@ func _getVolumeUsageInfo(ctx context.Context, pvcName string, pvcNamespace strin
111110
return &volumeUsageInfo, nil
112111
}
113112

114-
volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace, *cfg)
113+
volumeUsageInfo.snapshots, volumeUsageInfo.isInUse, err = getSnapshotsForPVC(ctx, pvcName, pvcNamespace)
115114
if err != nil {
116115
return nil, err
117116
}
@@ -222,35 +221,16 @@ func getPodsForPVC(ctx context.Context, pvcName string, pvcNamespace string,
222221
}
223222

224223
// getSnapshotsForPVC returns a list of snapshots that are created for the specified PVC.
225-
func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string,
226-
cfg rest.Config) ([]string, bool, error) {
224+
func getSnapshotsForPVC(ctx context.Context, pvcName string, pvcNamespace string) ([]string, bool, error) {
227225
log := logger.GetLogger(ctx)
228-
c, err := snapshotclient.NewForConfig(&cfg)
229-
if err != nil {
230-
log.Warnf("Failed to create snapshot client for PVC %q in namespace %q. Error: %q",
231-
pvcName, pvcNamespace, err.Error())
232-
return nil, false, errors.New("failed to create snapshot client")
233-
}
234-
235-
// TODO: check if we can use informer cache
236-
list, err := c.SnapshotV1().VolumeSnapshots(pvcNamespace).List(ctx, metav1.ListOptions{})
237-
if err != nil {
238-
log.Warnf("Failed to list VolumeSnapshots in namespace %q for PVC %q. Error: %q",
239-
pvcNamespace, pvcName, err.Error())
240-
return nil, false, errors.New("failed to list VolumeSnapshots")
241-
}
242-
243-
var snapshots []string
244-
for _, snap := range list.Items {
245-
if snap.Spec.Source.PersistentVolumeClaimName == nil ||
246-
*snap.Spec.Source.PersistentVolumeClaimName != pvcName {
247-
continue
248-
}
249-
250-
snapshots = append(snapshots, snap.Name)
226+
if commonco.ContainerOrchestratorUtility == nil {
227+
err := errors.New("ContainerOrchestratorUtility is not initialized")
228+
log.Warn(err)
229+
return nil, false, err
251230
}
252231

253-
return snapshots, len(snapshots) > 0, nil
232+
snaps := commonco.ContainerOrchestratorUtility.GetSnapshotsForPVC(ctx, pvcName, pvcNamespace)
233+
return snaps, len(snaps) > 0, nil
254234
}
255235

256236
// getGuestClustersForPVC returns a list of guest clusters that are using the specified PVC.

0 commit comments

Comments
 (0)