Skip to content

Commit f6ddee9

Browse files
bells17cupnes
authored andcommitted
Add storage capacity scoring to VolumeBinding plugin
1 parent 8b08487 commit f6ddee9

File tree

10 files changed

+512
-104
lines changed

10 files changed

+512
-104
lines changed

pkg/features/kube_features.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,13 @@ const (
677677
// Enables trafficDistribution field on Services.
678678
ServiceTrafficDistribution featuregate.Feature = "ServiceTrafficDistribution"
679679

680+
// owner: @cupnes
681+
// kep: https://kep.k8s.io/4049
682+
//
683+
// Enables scoring nodes by available storage capacity with
684+
// StorageCapacityScoring feature gate (dynamic provisioning only).
685+
StorageCapacityScoring featuregate.Feature = "StorageCapacityScoring"
686+
680687
// owner: @gjkim42 @SergeyKanzhelev @matthyx @tzneal
681688
// kep: http://kep.k8s.io/753
682689
//

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
753753
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.31, remove in 1.33
754754
},
755755

756+
StorageCapacityScoring: {
757+
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
758+
},
759+
756760
StorageVersionMigrator: {
757761
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
758762
},

pkg/scheduler/apis/config/v1/defaults.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,28 @@ func SetDefaults_VolumeBindingArgs(obj *configv1.VolumeBindingArgs) {
193193
obj.BindTimeoutSeconds = ptr.To[int64](600)
194194
}
195195
if len(obj.Shape) == 0 && feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority) {
196-
obj.Shape = []configv1.UtilizationShapePoint{
197-
{
198-
Utilization: 0,
199-
Score: 0,
200-
},
201-
{
202-
Utilization: 100,
203-
Score: int32(config.MaxCustomPriorityScore),
204-
},
196+
if feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring) {
197+
obj.Shape = []configv1.UtilizationShapePoint{
198+
{
199+
Utilization: 0,
200+
Score: int32(config.MaxCustomPriorityScore),
201+
},
202+
{
203+
Utilization: 100,
204+
Score: 0,
205+
},
206+
}
207+
} else {
208+
obj.Shape = []configv1.UtilizationShapePoint{
209+
{
210+
Utilization: 0,
211+
Score: 0,
212+
},
213+
{
214+
Utilization: 100,
215+
Score: int32(config.MaxCustomPriorityScore),
216+
},
217+
}
205218
}
206219
}
207220
}

pkg/scheduler/framework/plugins/feature/feature.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ type Features struct {
3333
EnableSchedulingQueueHint bool
3434
EnableAsyncPreemption bool
3535
EnablePodLevelResources bool
36+
EnableStorageCapacityScoring bool
3637
}

pkg/scheduler/framework/plugins/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func NewInTreeRegistry() runtime.Registry {
5959
EnableSchedulingQueueHint: feature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints),
6060
EnableAsyncPreemption: feature.DefaultFeatureGate.Enabled(features.SchedulerAsyncPreemption),
6161
EnablePodLevelResources: feature.DefaultFeatureGate.Enabled(features.PodLevelResources),
62+
EnableStorageCapacityScoring: feature.DefaultFeatureGate.Enabled(features.StorageCapacityScoring),
6263
}
6364

6465
registry := runtime.Registry{

pkg/scheduler/framework/plugins/volumebinding/binder.go

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,19 @@ func (b *BindingInfo) StorageResource() *StorageResource {
102102
}
103103
}
104104

105+
// DynamicProvision represents a dynamically provisioned volume.
106+
type DynamicProvision struct {
107+
PVC *v1.PersistentVolumeClaim
108+
NodeCapacity *storagev1.CSIStorageCapacity
109+
}
110+
105111
// PodVolumes holds pod's volumes information used in volume scheduling.
106112
type PodVolumes struct {
107113
// StaticBindings are binding decisions for PVCs which can be bound to
108114
// pre-provisioned static PVs.
109115
StaticBindings []*BindingInfo
110116
// DynamicProvisions are PVCs that require dynamic provisioning
111-
DynamicProvisions []*v1.PersistentVolumeClaim
117+
DynamicProvisions []*DynamicProvision
112118
}
113119

114120
// InTreeToCSITranslator contains methods required to check migratable status
@@ -310,7 +316,7 @@ func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolume
310316

311317
var (
312318
staticBindings []*BindingInfo
313-
dynamicProvisions []*v1.PersistentVolumeClaim
319+
dynamicProvisions []*DynamicProvision
314320
)
315321
defer func() {
316322
// Although we do not distinguish nil from empty in this function, for
@@ -377,6 +383,16 @@ func (b *volumeBinder) FindPodVolumes(logger klog.Logger, pod *v1.Pod, podVolume
377383
return
378384
}
379385

386+
// ConvertDynamicProvisionsToPVCs converts a slice of *DynamicProvision to a
387+
// slice of PersistentVolumeClaim
388+
func convertDynamicProvisionsToPVCs(dynamicProvisions []*DynamicProvision) []*v1.PersistentVolumeClaim {
389+
pvcs := make([]*v1.PersistentVolumeClaim, 0, len(dynamicProvisions))
390+
for _, dynamicProvision := range dynamicProvisions {
391+
pvcs = append(pvcs, dynamicProvision.PVC)
392+
}
393+
return pvcs
394+
}
395+
380396
// AssumePodVolumes will take the matching PVs and PVCs to provision in pod's
381397
// volume information for the chosen node, and:
382398
// 1. Update the pvCache with the new prebound PV.
@@ -423,20 +439,21 @@ func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod,
423439
}
424440

425441
// Assume PVCs
426-
newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
427-
for _, claim := range podVolumes.DynamicProvisions {
442+
newProvisionedPVCs := []*DynamicProvision{}
443+
for _, dynamicProvision := range podVolumes.DynamicProvisions {
428444
// The claims from method args can be pointing to watcher cache. We must not
429445
// modify these, therefore create a copy.
430-
claimClone := claim.DeepCopy()
446+
claimClone := dynamicProvision.PVC.DeepCopy()
431447
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, volume.AnnSelectedNode, nodeName)
432448
err = b.pvcCache.Assume(claimClone)
433449
if err != nil {
450+
pvcs := convertDynamicProvisionsToPVCs(newProvisionedPVCs)
434451
b.revertAssumedPVs(newBindings)
435-
b.revertAssumedPVCs(newProvisionedPVCs)
452+
b.revertAssumedPVCs(pvcs)
436453
return
437454
}
438455

439-
newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
456+
newProvisionedPVCs = append(newProvisionedPVCs, &DynamicProvision{PVC: claimClone})
440457
}
441458

442459
podVolumes.StaticBindings = newBindings
@@ -446,8 +463,9 @@ func (b *volumeBinder) AssumePodVolumes(logger klog.Logger, assumedPod *v1.Pod,
446463

447464
// RevertAssumedPodVolumes will revert assumed PV and PVC cache.
448465
func (b *volumeBinder) RevertAssumedPodVolumes(podVolumes *PodVolumes) {
466+
pvcs := convertDynamicProvisionsToPVCs(podVolumes.DynamicProvisions)
449467
b.revertAssumedPVs(podVolumes.StaticBindings)
450-
b.revertAssumedPVCs(podVolumes.DynamicProvisions)
468+
b.revertAssumedPVCs(pvcs)
451469
}
452470

453471
// BindPodVolumes gets the cached bindings and PVCs to provision in pod's volumes information,
@@ -464,7 +482,7 @@ func (b *volumeBinder) BindPodVolumes(ctx context.Context, assumedPod *v1.Pod, p
464482
}()
465483

466484
bindings := podVolumes.StaticBindings
467-
claimsToProvision := podVolumes.DynamicProvisions
485+
claimsToProvision := convertDynamicProvisionsToPVCs(podVolumes.DynamicProvisions)
468486

469487
// Start API operations
470488
err = b.bindAPIUpdate(ctx, assumedPod, bindings, claimsToProvision)
@@ -886,8 +904,8 @@ func (b *volumeBinder) findMatchingVolumes(logger klog.Logger, pod *v1.Pod, clai
886904
// checkVolumeProvisions checks given unbound claims (the claims have gone through func
887905
// findMatchingVolumes, and do not have matching volumes for binding), and return true
888906
// if all of the claims are eligible for dynamic provision.
889-
func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*v1.PersistentVolumeClaim, err error) {
890-
dynamicProvisions = []*v1.PersistentVolumeClaim{}
907+
func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied, sufficientStorage bool, dynamicProvisions []*DynamicProvision, err error) {
908+
dynamicProvisions = []*DynamicProvision{}
891909

892910
// We return early with provisionedClaims == nil if a check
893911
// fails or we encounter an error.
@@ -915,7 +933,7 @@ func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, cl
915933
}
916934

917935
// Check storage capacity.
918-
sufficient, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node)
936+
sufficient, capacity, err := b.hasEnoughCapacity(logger, provisioner, claim, class, node)
919937
if err != nil {
920938
return false, false, nil, err
921939
}
@@ -924,8 +942,10 @@ func (b *volumeBinder) checkVolumeProvisions(logger klog.Logger, pod *v1.Pod, cl
924942
return true, false, nil, nil
925943
}
926944

927-
dynamicProvisions = append(dynamicProvisions, claim)
928-
945+
dynamicProvisions = append(dynamicProvisions, &DynamicProvision{
946+
PVC: claim,
947+
NodeCapacity: capacity,
948+
})
929949
}
930950
logger.V(4).Info("Provisioning for claims of pod that has no matching volumes...", "claimCount", len(claimsToProvision), "pod", klog.KObj(pod), "node", klog.KObj(node))
931951

@@ -945,12 +965,12 @@ func (b *volumeBinder) revertAssumedPVCs(claims []*v1.PersistentVolumeClaim) {
945965
}
946966

947967
// hasEnoughCapacity checks whether the provisioner has enough capacity left for a new volume of the given size
948-
// that is available from the node.
949-
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, error) {
968+
// that is available from the node. This function returns the node capacity based on the PVC's storage class.
969+
func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string, claim *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass, node *v1.Node) (bool, *storagev1.CSIStorageCapacity, error) {
950970
quantity, ok := claim.Spec.Resources.Requests[v1.ResourceStorage]
951971
if !ok {
952972
// No capacity to check for.
953-
return true, nil
973+
return true, nil, nil
954974
}
955975

956976
// Only enabled for CSI drivers which opt into it.
@@ -960,19 +980,19 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
960980
// Either the provisioner is not a CSI driver or the driver does not
961981
// opt into storage capacity scheduling. Either way, skip
962982
// capacity checking.
963-
return true, nil
983+
return true, nil, nil
964984
}
965-
return false, err
985+
return false, nil, err
966986
}
967987
if driver.Spec.StorageCapacity == nil || !*driver.Spec.StorageCapacity {
968-
return true, nil
988+
return true, nil, nil
969989
}
970990

971991
// Look for a matching CSIStorageCapacity object(s).
972992
// TODO (for beta): benchmark this and potentially introduce some kind of lookup structure (https://github.com/kubernetes/enhancements/issues/1698#issuecomment-654356718).
973993
capacities, err := b.csiStorageCapacityLister.List(labels.Everything())
974994
if err != nil {
975-
return false, err
995+
return false, nil, err
976996
}
977997

978998
sizeInBytes := quantity.Value()
@@ -981,15 +1001,15 @@ func (b *volumeBinder) hasEnoughCapacity(logger klog.Logger, provisioner string,
9811001
capacitySufficient(capacity, sizeInBytes) &&
9821002
b.nodeHasAccess(logger, node, capacity) {
9831003
// Enough capacity found.
984-
return true, nil
1004+
return true, capacity, nil
9851005
}
9861006
}
9871007

9881008
// TODO (?): this doesn't give any information about which pools where considered and why
9891009
// they had to be rejected. Log that above? But that might be a lot of log output...
9901010
logger.V(4).Info("Node has no accessible CSIStorageCapacity with enough capacity for PVC",
9911011
"node", klog.KObj(node), "PVC", klog.KObj(claim), "size", sizeInBytes, "storageClass", klog.KObj(storageClass))
992-
return false, nil
1012+
return false, nil, nil
9931013
}
9941014

9951015
func capacitySufficient(capacity *storagev1.CSIStorageCapacity, sizeInBytes int64) bool {

0 commit comments

Comments
 (0)