Skip to content

Commit 9708a1b

Browse files
authored
Merge pull request #75 from wackxu/pvclister
add pvcLister to snapshot controller
2 parents 1e1a432 + 85504e1 commit 9708a1b

File tree

5 files changed

+29
-7
lines changed

5 files changed

+29
-7
lines changed

cmd/csi-snapshotter/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
3838
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
3939
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
40+
coreinformers "k8s.io/client-go/informers"
4041
)
4142

4243
const (
@@ -95,6 +96,7 @@ func main() {
9596
}
9697

9798
factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)
99+
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, *resyncPeriod)
98100

99101
// Create CRD resource
100102
aeclientset, err := apiextensionsclient.NewForConfig(config)
@@ -165,6 +167,7 @@ func main() {
165167
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
166168
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
167169
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
170+
coreFactory.Core().V1().PersistentVolumeClaims(),
168171
*createSnapshotContentRetryCount,
169172
*createSnapshotContentInterval,
170173
csiConn,
@@ -177,6 +180,7 @@ func main() {
177180
// run...
178181
stopCh := make(chan struct{})
179182
factory.Start(stopCh)
183+
coreFactory.Start(stopCh)
180184
go ctrl.Run(threads, stopCh)
181185

182186
// ...until SIGINT

pkg/controller/framework_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ import (
4646
"k8s.io/apimachinery/pkg/util/diff"
4747
"k8s.io/apimachinery/pkg/util/wait"
4848
"k8s.io/apimachinery/pkg/watch"
49+
coreinformers "k8s.io/client-go/informers"
4950
"k8s.io/client-go/kubernetes"
5051
kubefake "k8s.io/client-go/kubernetes/fake"
5152
"k8s.io/client-go/kubernetes/scheme"
53+
corelisters "k8s.io/client-go/listers/core/v1"
5254
core "k8s.io/client-go/testing"
5355
"k8s.io/client-go/tools/cache"
5456
"k8s.io/client-go/tools/record"
@@ -720,6 +722,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
720722
informerFactory = informers.NewSharedInformerFactory(clientset, NoResyncPeriodFunc())
721723
}
722724

725+
coreFactory := coreinformers.NewSharedInformerFactory(kubeClient, NoResyncPeriodFunc())
726+
723727
// Construct controller
724728
csiConnection := &fakeCSIConnection{
725729
t: t,
@@ -735,6 +739,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
735739
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
736740
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
737741
informerFactory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
742+
coreFactory.Core().V1().PersistentVolumeClaims(),
738743
3,
739744
5*time.Millisecond,
740745
csiConnection,
@@ -1052,9 +1057,14 @@ func runSyncTests(t *testing.T, tests []controllerTest, snapshotClasses []*crdv1
10521057
reactor.contents[content.Name] = content
10531058
}
10541059
}
1060+
1061+
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
10551062
for _, claim := range test.initialClaims {
10561063
reactor.claims[claim.Name] = claim
1064+
pvcIndexer.Add(claim)
10571065
}
1066+
ctrl.pvcLister = corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
1067+
10581068
for _, volume := range test.initialVolumes {
10591069
reactor.volumes[volume.Name] = volume
10601070
}

pkg/controller/snapshot_controller.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,12 @@ func (ctrl *csiSnapshotController) isSnapshotContentBeingUsed(content *crdv1.Vol
470470

471471
// isVolumeBeingCreatedFromSnapshot checks if an volume is being created from the snapshot.
472472
func (ctrl *csiSnapshotController) isVolumeBeingCreatedFromSnapshot(snapshot *crdv1.VolumeSnapshot) bool {
473-
pvcList, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).List(metav1.ListOptions{})
473+
pvcList, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).List(labels.Everything())
474474
if err != nil {
475-
glog.Errorf("Failed to retrieve PVCs from the API server to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err)
475+
glog.Errorf("Failed to retrieve PVCs from the lister to check if volume snapshot %s is being used by a volume: %q", snapshotKey(snapshot), err)
476476
return false
477477
}
478-
for _, pvc := range pvcList.Items {
478+
for _, pvc := range pvcList {
479479
if pvc.Spec.DataSource != nil && len(pvc.Spec.DataSource.Name) > 0 && pvc.Spec.DataSource.Name == snapshot.Name {
480480
if pvc.Spec.DataSource.Kind == snapshotKind && *(pvc.Spec.DataSource.APIGroup) == snapshotAPIGroup {
481481
if pvc.Status.Phase == v1.ClaimPending {
@@ -947,9 +947,9 @@ func (ctrl *csiSnapshotController) getClaimFromVolumeSnapshot(snapshot *crdv1.Vo
947947
return nil, fmt.Errorf("the snapshot source does not have the right APIGroup. Expected empty string, Got %s", *(snapshot.Spec.Source.APIGroup))
948948
}
949949

950-
pvc, err := ctrl.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{})
950+
pvc, err := ctrl.pvcLister.PersistentVolumeClaims(snapshot.Namespace).Get(pvcName)
951951
if err != nil {
952-
return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err)
952+
return nil, fmt.Errorf("failed to retrieve PVC %s from the lister: %q", pvcName, err)
953953
}
954954

955955
return pvc, nil

pkg/controller/snapshot_controller_base.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ import (
3030
"k8s.io/apimachinery/pkg/api/errors"
3131
"k8s.io/apimachinery/pkg/labels"
3232
"k8s.io/apimachinery/pkg/util/wait"
33+
coreinformers "k8s.io/client-go/informers/core/v1"
3334
"k8s.io/client-go/kubernetes"
3435
"k8s.io/client-go/kubernetes/scheme"
3536
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
37+
corelisters "k8s.io/client-go/listers/core/v1"
3638
"k8s.io/client-go/tools/cache"
3739
"k8s.io/client-go/tools/record"
3840
"k8s.io/client-go/util/workqueue"
@@ -53,6 +55,8 @@ type csiSnapshotController struct {
5355
contentListerSynced cache.InformerSynced
5456
classLister storagelisters.VolumeSnapshotClassLister
5557
classListerSynced cache.InformerSynced
58+
pvcLister corelisters.PersistentVolumeClaimLister
59+
pvcListerSynced cache.InformerSynced
5660

5761
snapshotStore cache.Store
5862
contentStore cache.Store
@@ -74,6 +78,7 @@ func NewCSISnapshotController(
7478
volumeSnapshotInformer storageinformers.VolumeSnapshotInformer,
7579
volumeSnapshotContentInformer storageinformers.VolumeSnapshotContentInformer,
7680
volumeSnapshotClassInformer storageinformers.VolumeSnapshotClassInformer,
81+
pvcInformer coreinformers.PersistentVolumeClaimInformer,
7782
createSnapshotContentRetryCount int,
7883
createSnapshotContentInterval time.Duration,
7984
conn connection.CSIConnection,
@@ -104,6 +109,9 @@ func NewCSISnapshotController(
104109
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
105110
}
106111

112+
ctrl.pvcLister = pvcInformer.Lister()
113+
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced
114+
107115
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
108116
cache.ResourceEventHandlerFuncs{
109117
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
@@ -139,7 +147,7 @@ func (ctrl *csiSnapshotController) Run(workers int, stopCh <-chan struct{}) {
139147
glog.Infof("Starting CSI snapshotter")
140148
defer glog.Infof("Shutting CSI snapshotter")
141149

142-
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced) {
150+
if !cache.WaitForCacheSync(stopCh, ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced) {
143151
glog.Errorf("Cannot sync caches")
144152
return
145153
}

pkg/controller/snapshot_create_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func TestCreateSnapshotSync(t *testing.T) {
255255
initialContents: nocontents,
256256
expectedContents: nocontents,
257257
initialSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, nil, nil, nil),
258-
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the API server: \\\"cannot find claim claim7-4\\\"\""), nil, nil),
258+
expectedSnapshots: newSnapshotArray("snap7-4", classGold, "", "snapuid7-4", "claim7-4", false, newVolumeError("Failed to create snapshot: failed to get input parameters to create snapshot snap7-4: \"failed to retrieve PVC claim7-4 from the lister: \\\"persistentvolumeclaim \\\\\\\"claim7-4\\\\\\\" not found\\\"\""), nil, nil),
259259
initialVolumes: newVolumeArray("volume7-4", "pv-uid7-4", "pv-handle7-4", "1Gi", "pvc-uid7-4", "claim7-4", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classEmpty),
260260
expectedEvents: []string{"Warning SnapshotCreationFailed"},
261261
errors: noerrors,

0 commit comments

Comments
 (0)