Skip to content

Commit 352056f

Browse files
authored
Merge pull request kubernetes#127757 from torredil/scheduler-bugfix-5123
scheduler: Improve CSILimits plugin accuracy by using VolumeAttachments
2 parents 66e3401 + 56f2b19 commit 352056f

File tree

5 files changed

+136
-2
lines changed

5 files changed

+136
-2
lines changed

pkg/scheduler/framework/plugins/nodevolumelimits/csi.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
v1 "k8s.io/api/core/v1"
2424
storagev1 "k8s.io/api/storage/v1"
2525
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/labels"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/apimachinery/pkg/util/rand"
2829
corelisters "k8s.io/client-go/listers/core/v1"
@@ -60,6 +61,7 @@ type CSILimits struct {
6061
pvLister corelisters.PersistentVolumeLister
6162
pvcLister corelisters.PersistentVolumeClaimLister
6263
scLister storagelisters.StorageClassLister
64+
vaLister storagelisters.VolumeAttachmentLister
6365

6466
randomVolumeIDPrefix string
6567

@@ -183,6 +185,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
183185
logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
184186
}
185187

188+
// Count CSI volumes from the new pod
186189
newVolumes := make(map[string]string)
187190
if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil {
188191
if apierrors.IsNotFound(err) {
@@ -203,6 +206,7 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
203206
return nil
204207
}
205208

209+
// Count CSI volumes from existing pods
206210
attachedVolumes := make(map[string]string)
207211
for _, existingPod := range nodeInfo.Pods {
208212
if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
@@ -217,6 +221,19 @@ func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v
217221
attachedVolumeCount[driverName]++
218222
}
219223

224+
// Count CSI volumes from VolumeAttachments
225+
volumeAttachments, err := pl.getNodeVolumeAttachmentInfo(logger, node.Name)
226+
if err != nil {
227+
return framework.AsStatus(err)
228+
}
229+
230+
for volumeUniqueName, driverName := range volumeAttachments {
231+
// Avoid double-counting volumes already used by existing pods
232+
if _, exists := attachedVolumes[volumeUniqueName]; !exists {
233+
attachedVolumeCount[driverName]++
234+
}
235+
}
236+
220237
// Count the new volumes count per driver
221238
newVolumeCount := map[string]int{}
222239
for _, driverName := range newVolumes {
@@ -303,7 +320,7 @@ func (pl *CSILimits) filterAttachableVolumes(
303320
continue
304321
}
305322

306-
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
323+
volumeUniqueName := getVolumeUniqueName(driverName, volumeHandle)
307324
result[volumeUniqueName] = driverName
308325
}
309326
return nil
@@ -344,7 +361,7 @@ func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Vol
344361
if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
345362
return nil
346363
}
347-
volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
364+
volumeUniqueName := getVolumeUniqueName(driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
348365
result[volumeUniqueName] = driverName
349366
return nil
350367
}
@@ -453,13 +470,15 @@ func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts fe
453470
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
454471
csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
455472
scLister := informerFactory.Storage().V1().StorageClasses().Lister()
473+
vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister()
456474
csiTranslator := csitrans.New()
457475

458476
return &CSILimits{
459477
csiNodeLister: csiNodesLister,
460478
pvLister: pvLister,
461479
pvcLister: pvcLister,
462480
scLister: scLister,
481+
vaLister: vaLister,
463482
randomVolumeIDPrefix: rand.String(32),
464483
translator: csiTranslator,
465484
}, nil
@@ -480,3 +499,40 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 {
480499
}
481500
return nodeVolumeLimits
482501
}
502+
503+
// getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node.
504+
func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) {
505+
volumeAttachments := make(map[string]string)
506+
vas, err := pl.vaLister.List(labels.Everything())
507+
if err != nil {
508+
return nil, err
509+
}
510+
for _, va := range vas {
511+
if va.Spec.NodeName == nodeName {
512+
if va.Spec.Attacher == "" {
513+
logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va))
514+
continue
515+
}
516+
if va.Spec.Source.PersistentVolumeName == nil {
517+
logger.V(5).Info("VolumeAttachment has no PV name", "VolumeAttachment", klog.KObj(va))
518+
continue
519+
}
520+
pv, err := pl.pvLister.Get(*va.Spec.Source.PersistentVolumeName)
521+
if err != nil {
522+
logger.V(5).Info("Unable to get PV for VolumeAttachment", "VolumeAttachment", klog.KObj(va), "err", err)
523+
continue
524+
}
525+
if pv.Spec.CSI == nil {
526+
logger.V(5).Info("PV is not a CSI volume", "PV", klog.KObj(pv))
527+
continue
528+
}
529+
volumeID := getVolumeUniqueName(va.Spec.Attacher, pv.Spec.CSI.VolumeHandle)
530+
volumeAttachments[volumeID] = va.Spec.Attacher
531+
}
532+
}
533+
return volumeAttachments, nil
534+
}
535+
536+
func getVolumeUniqueName(driverName, volumeHandle string) string {
537+
return fmt.Sprintf("%s/%s", driverName, volumeHandle)
538+
}

pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ func TestCSILimits(t *testing.T) {
265265
extraClaims []v1.PersistentVolumeClaim
266266
filterName string
267267
maxVols int32
268+
vaCount int
268269
driverNames []string
269270
test string
270271
migrationEnabled bool
@@ -273,6 +274,27 @@ func TestCSILimits(t *testing.T) {
273274
wantStatus *framework.Status
274275
wantPreFilterStatus *framework.Status
275276
}{
277+
{
278+
newPod: csiEBSOneVolPod,
279+
existingPods: []*v1.Pod{},
280+
filterName: "csi",
281+
maxVols: 2,
282+
driverNames: []string{ebsCSIDriverName},
283+
vaCount: 2,
284+
test: "should count VolumeAttachments towards volume limit when no pods exist",
285+
limitSource: "csinode",
286+
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded),
287+
},
288+
{
289+
newPod: csiEBSOneVolPod,
290+
existingPods: []*v1.Pod{},
291+
filterName: "csi",
292+
maxVols: 2,
293+
driverNames: []string{ebsCSIDriverName},
294+
vaCount: 1,
295+
test: "should schedule pod when VolumeAttachments count does not exceed limit",
296+
limitSource: "csinode",
297+
},
276298
{
277299
newPod: csiEBSOneVolPod,
278300
existingPods: []*v1.Pod{runningPod, csiEBSTwoVolPod},
@@ -609,6 +631,7 @@ func TestCSILimits(t *testing.T) {
609631
pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...),
610632
pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...),
611633
scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]),
634+
vaLister: getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...),
612635
randomVolumeIDPrefix: rand.String(32),
613636
translator: csiTranslator,
614637
}
@@ -769,6 +792,28 @@ func TestCSILimitsAddedPVCQHint(t *testing.T) {
769792
}
770793
}
771794

795+
func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister {
796+
vaLister := tf.VolumeAttachmentLister{}
797+
for _, driver := range driverNames {
798+
for j := 0; j < count; j++ {
799+
pvName := fmt.Sprintf("csi-%s-%d", driver, j)
800+
va := storagev1.VolumeAttachment{
801+
ObjectMeta: metav1.ObjectMeta{
802+
Name: fmt.Sprintf("va-%s-%d", driver, j),
803+
},
804+
Spec: storagev1.VolumeAttachmentSpec{
805+
NodeName: "node-for-max-pd-test-1",
806+
Attacher: driver,
807+
Source: storagev1.VolumeAttachmentSource{
808+
PersistentVolumeName: &pvName,
809+
},
810+
},
811+
}
812+
vaLister = append(vaLister, va)
813+
}
814+
}
815+
return vaLister
816+
}
772817
func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister {
773818
pvLister := tf.PersistentVolumeLister{}
774819
for _, driver := range driverNames {

pkg/scheduler/testing/framework/fake_listers.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,27 @@ func (classes StorageClassLister) Get(name string) (*storagev1.StorageClass, err
313313
func (classes StorageClassLister) List(selector labels.Selector) ([]*storagev1.StorageClass, error) {
314314
return nil, fmt.Errorf("not implemented")
315315
}
316+
317+
// VolumeAttachmentLister declares a []storagev1.VolumeAttachment type for testing.
318+
type VolumeAttachmentLister []storagev1.VolumeAttachment
319+
320+
var _ storagelisters.VolumeAttachmentLister = VolumeAttachmentLister{}
321+
322+
// List lists all VolumeAttachments in the indexer.
323+
func (val VolumeAttachmentLister) List(selector labels.Selector) (ret []*storagev1.VolumeAttachment, err error) {
324+
var list []*storagev1.VolumeAttachment
325+
for i := range val {
326+
list = append(list, &val[i])
327+
}
328+
return list, nil
329+
}
330+
331+
// Get returns a fake VolumeAttachment object from the fake VolumeAttachments by name.
332+
func (val VolumeAttachmentLister) Get(name string) (*storagev1.VolumeAttachment, error) {
333+
for _, va := range val {
334+
if va.Name == name {
335+
return &va, nil
336+
}
337+
}
338+
return nil, errors.NewNotFound(storagev1.Resource("volumeattachments"), name)
339+
}

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
580580
rbacv1helpers.NewRule("create").Groups(authorizationGroup).Resources("subjectaccessreviews").RuleOrDie(),
581581
// Needed for volume limits
582582
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csinodes").RuleOrDie(),
583+
rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("volumeattachments").RuleOrDie(),
583584
// Needed for namespaceSelector feature in pod affinity
584585
rbacv1helpers.NewRule(Read...).Groups(legacyGroup).Resources("namespaces").RuleOrDie(),
585586
rbacv1helpers.NewRule(Read...).Groups(storageGroup).Resources("csidrivers").RuleOrDie(),

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,14 @@ items:
851851
- get
852852
- list
853853
- watch
854+
- apiGroups:
855+
- storage.k8s.io
856+
resources:
857+
- volumeattachments
858+
verbs:
859+
- get
860+
- list
861+
- watch
854862
- apiGroups:
855863
- ""
856864
resources:

0 commit comments

Comments
 (0)