Skip to content

Commit 961ec42

Browse files
committed
Initial implementation of using informer caches for snapshots
1 parent 950117d commit 961ec42

File tree

5 files changed

+121
-29
lines changed

5 files changed

+121
-29
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/google/uuid v1.6.0
1818
github.com/hashicorp/go-version v1.6.0
1919
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1
20-
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0
20+
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0
2121
github.com/onsi/ginkgo/v2 v2.22.0
2222
github.com/onsi/gomega v1.36.0
2323
github.com/pkg/sftp v1.13.6

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
178178
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
179179
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1 h1:tVPvlL5N5X598hrO3g9rhyoi6h0LP4RpSJlGHItsbEE=
180180
github.com/kubernetes-csi/csi-proxy/v2 v2.0.0-alpha.1/go.mod h1:pacx+PW7lLlu6kAvpr8Lgq/5fdiAsKxOtXXFHMaLMb8=
181-
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0 h1:Q3jQ1NkFqv5o+F8dMmHd8SfEmlcwNeo1immFApntEwE=
182-
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.2.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y=
181+
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 h1:bMqrb3UHgHbP+PW9VwiejfDJU1R0PpXVZNMdeH8WYKI=
182+
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0/go.mod h1:E3vdYxHj2C2q6qo8/Da4g7P+IcwqRZyy3gJBzYybV9Y=
183183
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
184184
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
185185
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=

pkg/csi/service/common/commonco/k8sorchestrator/k8sorchestrator.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync/atomic"
3030
"time"
3131

32+
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
3233
"google.golang.org/grpc/codes"
3334
"k8s.io/client-go/util/retry"
3435

@@ -239,6 +240,31 @@ func (m *volumeIDToNameMap) get(volumeID string) (string, bool) {
239240
return volumeName, found
240241
}
241242

243+
type namespacedName struct {
244+
namespace, name string
245+
}
246+
247+
func (n namespacedName) String() string {
248+
return fmt.Sprintf("%s/%s", n.namespace, n.name)
249+
}
250+
251+
type snapshotInfo struct {
252+
snapshotName string
253+
snapshotContentName string
254+
}
255+
256+
func (s snapshotInfo) String() string {
257+
return fmt.Sprintf("name: %s, content name: %s", s.snapshotName, s.snapshotContentName)
258+
}
259+
260+
// pvcToSnapshotsMap maps a PVC to its snapshots.
261+
// Key is the namespaced name of the PVC and value is a map.
262+
// The key of the inner map is the namespaced name of the snapshot.
263+
type pvcToSnapshotsMap struct {
264+
*sync.RWMutex
265+
items map[namespacedName]map[namespacedName]snapshotInfo
266+
}
267+
242268
// K8sOrchestrator defines set of properties specific to K8s.
243269
type K8sOrchestrator struct {
244270
supervisorFSS FSSConfigMapInfo
@@ -251,6 +277,7 @@ type K8sOrchestrator struct {
251277
nodeIDToNameMap *nodeIDToNameMap
252278
volumeNameToNodesMap *volumeNameToNodesMap // used when ListVolume FSS is enabled
253279
volumeIDToNameMap *volumeIDToNameMap // used when ListVolume FSS is enabled
280+
pvcToSnapshotsMap *pvcToSnapshotsMap
254281
k8sClient clientset.Interface
255282
snapshotterClient snapshotterClientSet.Interface
256283
// pvcUIDCache maps PVC UID to its namespaced name (namespace/name).
@@ -381,6 +408,12 @@ func Newk8sOrchestrator(ctx context.Context, controllerClusterFlavor cnstypes.Cn
381408
}
382409
}
383410

411+
// Initialize the map for pvc to snapshots
412+
err := initPVCToSnapshotsMap(ctx, controllerClusterFlavor)
413+
if err != nil {
414+
return nil, fmt.Errorf("failed to create PVC to snapshots map. Error: %v", err)
415+
}
416+
384417
k8sOrchestratorInstance.informerManager.Listen()
385418
atomic.StoreUint32(&k8sOrchestratorInstanceInitialized, 1)
386419
log.Info("k8sOrchestratorInstance initialized")
@@ -2286,3 +2319,46 @@ func GetPVCDataSource(ctx context.Context, claim *v1.PersistentVolumeClaim) (*v1
22862319
}
22872320
return &dataSource, nil
22882321
}
2322+
2323+
func initPVCToSnapshotsMap(ctx context.Context, controllerClusterFlavor cnstypes.CnsClusterFlavor) error {
2324+
log := logger.GetLogger(ctx)
2325+
if controllerClusterFlavor != cnstypes.CnsClusterFlavorWorkload {
2326+
// PVC to VolumeSnapshot mapping is only required for WCP.
2327+
return nil
2328+
}
2329+
2330+
log.Debugf("Initializing pvc namespaced name to volumesnapshot names map")
2331+
k8sOrchestratorInstance.pvcToSnapshotsMap = &pvcToSnapshotsMap{
2332+
RWMutex: &sync.RWMutex{},
2333+
items: make(map[namespacedName]map[namespacedName]snapshotInfo),
2334+
}
2335+
2336+
snapshotAdded := func(obj interface{}) {
2337+
snap, ok := obj.(*snapshotv1.VolumeSnapshot)
2338+
if !ok || snap == nil {
2339+
log.Warnf("snapshotAdded: unrecognized object %+v", obj)
2340+
return
2341+
}
2342+
2343+
// TODO: implement
2344+
log.Infof("snapshotAdded: snapshot=%v", snap)
2345+
}
2346+
2347+
err := k8sOrchestratorInstance.informerManager.AddSnapshotListener(ctx,
2348+
func(obj interface{}) {
2349+
snapshotAdded(obj)
2350+
},
2351+
func(oldObj, newObj interface{}) {
2352+
// TODO: implement
2353+
log.Info("snapshotUpdated")
2354+
},
2355+
func(obj interface{}) {
2356+
// TODO: implement
2357+
log.Info("snapshotDeleted")
2358+
})
2359+
if err != nil {
2360+
return logger.LogNewErrorf(log, "failed to listen on volumesnapshots. Error: %v", err)
2361+
}
2362+
2363+
return nil
2364+
}

pkg/kubernetes/informers.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24+
"github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions"
2425
"k8s.io/client-go/informers"
2526
v1 "k8s.io/client-go/informers/core/v1"
2627
clientset "k8s.io/client-go/kubernetes"
@@ -74,11 +75,19 @@ func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt
7475
informerInstance = supervisorInformerManagerInstance
7576
}
7677

78+
// TODO: check if callers can pass this
79+
snapClient, err := NewSnapshotterClient(ctx)
80+
if err != nil {
81+
// TODO: handle error appropriately
82+
log.Fatalf("unable to initialise snapshot client")
83+
}
84+
7785
if informerInstance == nil {
7886
informerInstance = &InformerManager{
79-
client: client,
80-
stopCh: signals.SetupSignalHandler().Done(),
81-
informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()),
87+
client: client,
88+
stopCh: signals.SetupSignalHandler().Done(),
89+
informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()),
90+
snapshotInformerFactory: externalversions.NewSharedInformerFactory(snapClient, 0),
8291
}
8392

8493
if inClusterClnt {
@@ -256,6 +265,28 @@ func (im *InformerManager) AddVolumeAttachmentListener(ctx context.Context, add
256265
return nil
257266
}
258267

268+
// AddSnapshotListener hooks up add, update, delete callbacks.
269+
func (im *InformerManager) AddSnapshotListener(ctx context.Context, add func(obj interface{}),
270+
update func(oldObj, newObj interface{}), remove func(obj interface{})) error {
271+
log := logger.GetLogger(ctx)
272+
if im.snapshotInformer == nil {
273+
im.snapshotInformer = im.snapshotInformerFactory.
274+
Snapshot().V1().VolumeSnapshots().Informer()
275+
}
276+
277+
_, err := im.snapshotInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
278+
AddFunc: add,
279+
UpdateFunc: update,
280+
DeleteFunc: remove,
281+
})
282+
if err != nil {
283+
return logger.LogNewErrorf(
284+
log, "failed to add event handler on snapshot listener. Error: %v", err)
285+
}
286+
287+
return nil
288+
}
289+
259290
// GetPVLister returns PV Lister for the calling informer manager.
260291
func (im *InformerManager) GetPVLister() corelisters.PersistentVolumeLister {
261292
return im.informerFactory.Core().V1().PersistentVolumes().Lister()
@@ -287,26 +318,3 @@ func (im *InformerManager) Listen() (stopCh <-chan struct{}) {
287318
}
288319
return im.stopCh
289320
}
290-
291-
// NewConfigMapListener creates a new configmap listener in the given namespace.
292-
// NOTE: This creates a NewSharedIndexInformer everytime and does not use the informer factory.
293-
// Only use this function when you need a configmap listener in a different namespace than the
294-
// one already present in the informer factory.
295-
func NewConfigMapListener(ctx context.Context, client clientset.Interface, namespace string,
296-
add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) error {
297-
log := logger.GetLogger(ctx)
298-
configMapInformer := v1.NewFilteredConfigMapInformer(client, namespace, resyncPeriodConfigMapInformer,
299-
cache.Indexers{}, nil)
300-
301-
_, err := configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
302-
AddFunc: add,
303-
UpdateFunc: update,
304-
DeleteFunc: remove,
305-
})
306-
if err != nil {
307-
return logger.LogNewErrorf(log, "failed to add event handler on configmap listener. Error: %v", err)
308-
}
309-
stopCh := make(chan struct{})
310-
go configMapInformer.Run(stopCh)
311-
return nil
312-
}

pkg/kubernetes/types.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package kubernetes
1818

1919
import (
20+
"github.com/kubernetes-csi/external-snapshotter/client/v8/informers/externalversions"
2021
"k8s.io/client-go/informers"
2122
clientset "k8s.io/client-go/kubernetes"
2223
"k8s.io/client-go/tools/cache"
@@ -50,6 +51,10 @@ type InformerManager struct {
5051
client clientset.Interface
5152
// main shared informer factory
5253
informerFactory informers.SharedInformerFactory
54+
55+
// snapshotInformerFactory - TODO: update this
56+
snapshotInformerFactory externalversions.SharedInformerFactory
57+
5358
// main signal
5459
stopCh (<-chan struct{})
5560

@@ -83,4 +88,7 @@ type InformerManager struct {
8388

8489
// volume attachment informer
8590
volumeAttachmentInformer cache.SharedInformer
91+
92+
// snapshot informer
93+
snapshotInformer cache.SharedInformer
8694
}

0 commit comments

Comments
 (0)