Skip to content

Commit 2965ff7

Browse files
authored
Merge pull request #1182 from leonardoce/informer-lister
Add PV informer and indexer
2 parents ab40e9d + 65bbcad commit 2965ff7

File tree

6 files changed

+123
-87
lines changed

6 files changed

+123
-87
lines changed

cmd/snapshot-controller/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ func main() {
218218
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
219219
factory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
220220
coreFactory.Core().V1().PersistentVolumeClaims(),
221+
coreFactory.Core().V1().PersistentVolumes(),
221222
nodeInformer,
222223
metricsManager,
223224
*resyncPeriod,

pkg/common-controller/framework_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,7 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
12001200
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotContents(),
12011201
informerFactory.Groupsnapshot().V1alpha1().VolumeGroupSnapshotClasses(),
12021202
coreFactory.Core().V1().PersistentVolumeClaims(),
1203+
coreFactory.Core().V1().PersistentVolumes(),
12031204
nil,
12041205
metricsManager,
12051206
60*time.Second,

pkg/common-controller/groupsnapshot_controller_helper.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -529,14 +529,6 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
529529
groupSnapshotContent.Name, err)
530530
}
531531

532-
// TODO(leonardoce): this API server call is very expensive. We need to introduce a
533-
// PV lister on the controller and an indexer on spec.csi.driver + "^" + spec.csi.volumeHandle
534-
// to be used for fast lookups
535-
pvs, err := ctrl.client.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{})
536-
if err != nil {
537-
return groupSnapshotContent, fmt.Errorf("createSnapshotsForGroupSnapshotContent: error get PersistentVolumes list from API server: %v", err)
538-
}
539-
540532
// Phase 1: create the VolumeSnapshotContent and VolumeSnapshot objects
541533
klog.V(4).Infof(
542534
"createSnapshotsForGroupSnapshotContent[%s]: creating volumesnapshots and volumesnapshotcontent for group snapshot content",
@@ -550,12 +542,13 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
550542
snapshotHandle := snapshot.SnapshotHandle
551543
volumeHandle := snapshot.VolumeHandle
552544

553-
pv := utils.GetPersistentVolumeFromHandle(pvs, groupSnapshotContent.Spec.Driver, volumeHandle)
554-
if pv == nil {
545+
pv, err := ctrl.findPersistentVolumeByCSIDriverHandle(groupSnapshotContent.Spec.Driver, volumeHandle)
546+
if err != nil {
555547
klog.Errorf(
556-
"updateGroupSnapshotContentStatus: unable to find PV for volumeHandle:[%s] and CSI driver:[%s]",
548+
"updateGroupSnapshotContentStatus: error while finding PV for volumeHandle:[%s] and CSI driver:[%s]: %s",
557549
volumeHandle,
558-
groupSnapshotContent.Spec.Driver)
550+
groupSnapshotContent.Spec.Driver,
551+
err)
559552
}
560553

561554
volumeSnapshotContentName := getSnapshotContentNameForVolumeGroupSnapshotContent(
@@ -615,7 +608,7 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
615608
// The status will be set by VolumeSnapshot reconciler
616609
}
617610

618-
_, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{})
611+
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(ctx, volumeSnapshotContent, metav1.CreateOptions{})
619612
if err != nil && !apierrs.IsAlreadyExists(err) {
620613
return groupSnapshotContent, fmt.Errorf(
621614
"createSnapshotsForGroupSnapshotContent: creating volumesnapshotcontent %w", err)
@@ -693,6 +686,39 @@ func (ctrl *csiSnapshotCommonController) createSnapshotsForGroupSnapshotContent(
693686
return newGroupSnapshotObj, nil
694687
}
695688

689+
// findPersistentVolumeByCSIDriverHandle looks at an existing PersistentVolume
690+
// by CSI driver name and volume handle.
691+
func (ctrl *csiSnapshotCommonController) findPersistentVolumeByCSIDriverHandle(driverName, volumeHandle string) (*v1.PersistentVolume, error) {
692+
pvList, err := ctrl.pvIndexer.ByIndex(
693+
utils.CSIDriverHandleIndexName,
694+
utils.PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle),
695+
)
696+
switch {
697+
case err != nil:
698+
return nil, err
699+
700+
case len(pvList) == 0:
701+
return nil, nil
702+
703+
case len(pvList) > 1:
704+
klog.Errorf(
705+
"findPersistentVolumeByCSIDriverHandle: multiple PVs found for for volumeHandle:[%s] and CSI driver:[%s]",
706+
volumeHandle,
707+
driverName)
708+
return nil, fmt.Errorf("multiple PVs found")
709+
710+
default:
711+
if pvObject, ok := pvList[0].(*v1.PersistentVolume); ok {
712+
return pvObject, nil
713+
}
714+
715+
klog.Errorf(
716+
"findPersistentVolumeByCSIDriverHandle: found erroneous content in the index")
717+
klog.V(5).Info("findPersistentVolumeByCSIDriverHandle: erroneous content", pvList[0])
718+
return nil, fmt.Errorf("found erroneous indexed content")
719+
}
720+
}
721+
696722
// getSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent.
697723
func getSnapshotNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, volumeHandle string) string {
698724
return fmt.Sprintf("snapshot-%x", sha256.Sum256([]byte(groupSnapshotContentUUID+volumeHandle)))

pkg/common-controller/snapshot_controller_base.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type csiSnapshotCommonController struct {
6262
classListerSynced cache.InformerSynced
6363
pvcLister corelisters.PersistentVolumeClaimLister
6464
pvcListerSynced cache.InformerSynced
65+
pvLister corelisters.PersistentVolumeLister
66+
pvListerSynced cache.InformerSynced
6567
nodeLister corelisters.NodeLister
6668
nodeListerSynced cache.InformerSynced
6769
groupSnapshotLister groupsnapshotlisters.VolumeGroupSnapshotLister
@@ -83,6 +85,8 @@ type csiSnapshotCommonController struct {
8385
enableDistributedSnapshotting bool
8486
preventVolumeModeConversion bool
8587
enableVolumeGroupSnapshots bool
88+
89+
pvIndexer cache.Indexer
8690
}
8791

8892
// NewCSISnapshotController returns a new *csiSnapshotCommonController
@@ -96,6 +100,7 @@ func NewCSISnapshotCommonController(
96100
volumeGroupSnapshotContentInformer groupsnapshotinformers.VolumeGroupSnapshotContentInformer,
97101
volumeGroupSnapshotClassInformer groupsnapshotinformers.VolumeGroupSnapshotClassInformer,
98102
pvcInformer coreinformers.PersistentVolumeClaimInformer,
103+
pvInformer coreinformers.PersistentVolumeInformer,
99104
nodeInformer coreinformers.NodeInformer,
100105
metricsManager metrics.MetricsManager,
101106
resyncPeriod time.Duration,
@@ -128,6 +133,22 @@ func NewCSISnapshotCommonController(
128133
ctrl.pvcLister = pvcInformer.Lister()
129134
ctrl.pvcListerSynced = pvcInformer.Informer().HasSynced
130135

136+
ctrl.pvLister = pvInformer.Lister()
137+
ctrl.pvListerSynced = pvInformer.Informer().HasSynced
138+
139+
pvInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
140+
utils.CSIDriverHandleIndexName: func(obj interface{}) ([]string, error) {
141+
if pv, ok := obj.(*v1.PersistentVolume); ok {
142+
if key := utils.PersistentVolumeKeyFunc(pv); key != "" {
143+
return []string{key}, nil
144+
}
145+
}
146+
147+
return nil, nil
148+
},
149+
})
150+
ctrl.pvIndexer = pvInformer.Informer().GetIndexer()
151+
131152
volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod(
132153
cache.ResourceEventHandlerFuncs{
133154
AddFunc: func(obj interface{}) { ctrl.enqueueSnapshotWork(obj) },
@@ -212,7 +233,13 @@ func (ctrl *csiSnapshotCommonController) Run(workers int, stopCh <-chan struct{}
212233
klog.Infof("Starting snapshot controller")
213234
defer klog.Infof("Shutting snapshot controller")
214235

215-
informersSynced := []cache.InformerSynced{ctrl.snapshotListerSynced, ctrl.contentListerSynced, ctrl.classListerSynced, ctrl.pvcListerSynced}
236+
informersSynced := []cache.InformerSynced{
237+
ctrl.snapshotListerSynced,
238+
ctrl.contentListerSynced,
239+
ctrl.classListerSynced,
240+
ctrl.pvcListerSynced,
241+
ctrl.pvListerSynced,
242+
}
216243
if ctrl.enableDistributedSnapshotting {
217244
informersSynced = append(informersSynced, ctrl.nodeListerSynced)
218245
}

pkg/utils/pvs.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,26 @@ limitations under the License.
1616

1717
package utils
1818

19-
import v1 "k8s.io/api/core/v1"
20-
21-
// GetPersistentVolumeFromHandle looks for the PV having a certain CSI driver name
22-
// and corresponding to a volume with a given handle, in a PV List.
23-
// If the PV is not found, returns nil
24-
func GetPersistentVolumeFromHandle(pvList *v1.PersistentVolumeList, driverName, volumeHandle string) *v1.PersistentVolume {
25-
for i := range pvList.Items {
26-
if pvList.Items[i].Spec.CSI == nil {
27-
continue
28-
}
29-
30-
if pvList.Items[i].Spec.CSI.Driver == driverName && pvList.Items[i].Spec.CSI.VolumeHandle == volumeHandle {
31-
return &pvList.Items[i]
32-
}
19+
import (
20+
"fmt"
21+
22+
v1 "k8s.io/api/core/v1"
23+
)
24+
25+
const CSIDriverHandleIndexName = "ByVolumeHandle"
26+
27+
// PersistentVolumeKeyFunc maps a persistent volume to a string usable
28+
// as KeyFunc to recover it from the CSI driver name and the volume handle.
29+
// If the passed PV is not CSI-based, it will return the empty string
30+
func PersistentVolumeKeyFunc(pv *v1.PersistentVolume) string {
31+
if pv != nil && pv.Spec.CSI != nil {
32+
return fmt.Sprintf("%s^%s", pv.Spec.CSI.Driver, pv.Spec.CSI.VolumeHandle)
3333
}
34+
return ""
35+
}
3436

35-
return nil
37+
// PersistentVolumeKeyFuncByCSIDriverHandle returns the key to be used form
38+
// the individual data components
39+
func PersistentVolumeKeyFuncByCSIDriverHandle(driverName, volumeHandle string) string {
40+
return fmt.Sprintf("%s^%s", driverName, volumeHandle)
3641
}

pkg/utils/pvs_test.go

Lines changed: 34 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,84 +23,60 @@ import (
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
)
2525

26-
func TestGetPersistentVolumeFromHandle(t *testing.T) {
26+
func TestPersistentVolumeKeyFunc(t *testing.T) {
2727
testDriverName := "hostpath.csi.k8s.io"
2828
testVolumeHandle := "df39ea9e-1296-11ef-adde-baf37ed30dae"
2929
testPvName := "pv-name"
3030

31-
pvListTest := v1.PersistentVolumeList{
32-
Items: []v1.PersistentVolume{
33-
{
34-
ObjectMeta: metav1.ObjectMeta{
35-
Name: testPvName,
36-
},
37-
Spec: v1.PersistentVolumeSpec{
38-
PersistentVolumeSource: v1.PersistentVolumeSource{
39-
CSI: &v1.CSIPersistentVolumeSource{
40-
Driver: testDriverName,
41-
VolumeHandle: testVolumeHandle,
42-
},
43-
},
31+
csiPV := v1.PersistentVolume{
32+
ObjectMeta: metav1.ObjectMeta{
33+
Name: testPvName,
34+
},
35+
Spec: v1.PersistentVolumeSpec{
36+
PersistentVolumeSource: v1.PersistentVolumeSource{
37+
CSI: &v1.CSIPersistentVolumeSource{
38+
Driver: testDriverName,
39+
VolumeHandle: testVolumeHandle,
4440
},
4541
},
46-
{
47-
ObjectMeta: metav1.ObjectMeta{
48-
Name: "pv-no-csi",
49-
},
50-
Spec: v1.PersistentVolumeSpec{
51-
PersistentVolumeSource: v1.PersistentVolumeSource{
52-
HostPath: &v1.HostPathVolumeSource{},
53-
},
54-
},
42+
},
43+
}
44+
hostPathPV := v1.PersistentVolume{
45+
ObjectMeta: metav1.ObjectMeta{
46+
Name: "pv-no-csi",
47+
},
48+
Spec: v1.PersistentVolumeSpec{
49+
PersistentVolumeSource: v1.PersistentVolumeSource{
50+
HostPath: &v1.HostPathVolumeSource{},
5551
},
5652
},
5753
}
5854

5955
tests := []struct {
60-
testName string
61-
driverName string
62-
volumeHandle string
63-
pvList v1.PersistentVolumeList
64-
pvName string
56+
testName string
57+
pv *v1.PersistentVolume
58+
expectedKey string
6559
}{
6660
{
67-
testName: "empty-pv-list",
68-
driverName: testDriverName,
69-
volumeHandle: testVolumeHandle,
70-
pvName: "",
71-
},
72-
{
73-
testName: "pv-in-list",
74-
driverName: testDriverName,
75-
volumeHandle: testVolumeHandle,
76-
pvList: pvListTest,
77-
pvName: testPvName,
61+
testName: "nil-pv",
62+
pv: nil,
63+
expectedKey: "",
7864
},
7965
{
80-
testName: "not-existing-volume-handle",
81-
driverName: testDriverName,
82-
volumeHandle: "not-existing-volume-handle",
83-
pvList: pvListTest,
84-
pvName: "",
66+
testName: "csi-pv",
67+
pv: &csiPV,
68+
expectedKey: "hostpath.csi.k8s.io^df39ea9e-1296-11ef-adde-baf37ed30dae",
8569
},
8670
{
87-
testName: "invalid-driver-name",
88-
driverName: "invalid-driver-name",
89-
volumeHandle: testVolumeHandle,
90-
pvList: pvListTest,
91-
pvName: "",
71+
testName: "hostpath-pv",
72+
pv: &hostPathPV,
73+
expectedKey: "",
9274
},
9375
}
9476
for _, tt := range tests {
95-
got := GetPersistentVolumeFromHandle(&tt.pvList, tt.driverName, tt.volumeHandle)
96-
if got == nil {
97-
if len(tt.pvName) != 0 {
98-
t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got, tt.pvName)
99-
}
100-
} else {
101-
if tt.pvName != got.Name {
102-
t.Errorf("%v: GetPersistentVolumeFromHandle = %v WANT %v", tt.testName, got.Name, tt.pvName)
103-
}
77+
got := PersistentVolumeKeyFunc(tt.pv)
78+
if got != tt.expectedKey {
79+
t.Errorf("%v: PersistentVolumeKeyFunc = %#v WANT %#v", tt.testName, got, tt.expectedKey)
10480
}
10581
}
10682
}

0 commit comments

Comments
 (0)