Skip to content

Commit ecba6cd

Browse files
mortentcici37
authored andcommitted
Allocator updates
1 parent ece1d76 commit ecba6cd

File tree

4 files changed

+816
-131
lines changed

4 files changed

+816
-131
lines changed

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,12 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
454454
if err != nil {
455455
return nil, statusError(logger, err)
456456
}
457-
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, pl.enableDeviceTaints, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
457+
features := structured.Features{
458+
AdminAccess: pl.enableAdminAccess,
459+
PrioritizedList: pl.enablePrioritizedList,
460+
DeviceTaints: pl.enableDeviceTaints,
461+
}
462+
allocator, err := structured.NewAllocator(ctx, features, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
458463
if err != nil {
459464
return nil, statusError(logger, err)
460465
}

staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go

Lines changed: 142 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
v1 "k8s.io/api/core/v1"
2828
resourceapi "k8s.io/api/resource/v1beta1"
2929
"k8s.io/apimachinery/pkg/util/sets"
30+
"k8s.io/dynamic-resource-allocation/api"
3031
draapi "k8s.io/dynamic-resource-allocation/api"
3132
"k8s.io/dynamic-resource-allocation/cel"
3233
"k8s.io/dynamic-resource-allocation/resourceclaim"
@@ -48,39 +49,39 @@ type deviceClassLister interface {
4849
// available and the current state of the cluster (claims, classes, resource
4950
// slices).
5051
type Allocator struct {
51-
adminAccessEnabled bool
52-
prioritizedListEnabled bool
53-
deviceTaintsEnabled bool
54-
claimsToAllocate []*resourceapi.ResourceClaim
55-
allocatedDevices sets.Set[DeviceID]
56-
classLister deviceClassLister
57-
slices []*resourceapi.ResourceSlice
58-
celCache *cel.Cache
52+
features Features
53+
claimsToAllocate []*resourceapi.ResourceClaim
54+
allocatedDevices sets.Set[DeviceID]
55+
classLister deviceClassLister
56+
slices []*resourceapi.ResourceSlice
57+
celCache *cel.Cache
58+
}
59+
60+
type Features struct {
61+
AdminAccess bool
62+
PrioritizedList bool
63+
PartitionableDevices bool
5964
}
6065

6166
// NewAllocator returns an allocator for a certain set of claims or an error if
6267
// some problem was detected which makes it impossible to allocate claims.
6368
//
6469
// The returned Allocator can be used multiple times and is thread-safe.
6570
func NewAllocator(ctx context.Context,
66-
adminAccessEnabled bool,
67-
prioritizedListEnabled bool,
68-
deviceTaintsEnabled bool,
71+
features Features,
6972
claimsToAllocate []*resourceapi.ResourceClaim,
7073
allocatedDevices sets.Set[DeviceID],
7174
classLister deviceClassLister,
7275
slices []*resourceapi.ResourceSlice,
7376
celCache *cel.Cache,
7477
) (*Allocator, error) {
7578
return &Allocator{
76-
adminAccessEnabled: adminAccessEnabled,
77-
prioritizedListEnabled: prioritizedListEnabled,
78-
deviceTaintsEnabled: deviceTaintsEnabled,
79-
claimsToAllocate: claimsToAllocate,
80-
allocatedDevices: allocatedDevices,
81-
classLister: classLister,
82-
slices: slices,
83-
celCache: celCache,
79+
features: features,
80+
claimsToAllocate: claimsToAllocate,
81+
allocatedDevices: allocatedDevices,
82+
classLister: classLister,
83+
slices: slices,
84+
celCache: celCache,
8485
}, nil
8586
}
8687

@@ -126,7 +127,7 @@ func (a *Allocator) Allocate(ctx context.Context, node *v1.Node) (finalResult []
126127
defer alloc.logger.V(5).Info("Done with allocation", "success", len(finalResult) == len(alloc.claimsToAllocate), "err", finalErr)
127128

128129
// First determine all eligible pools.
129-
pools, err := GatherPools(ctx, alloc.slices, node)
130+
pools, err := GatherPools(ctx, alloc.slices, node, a.features.PartitionableDevices)
130131
if err != nil {
131132
return nil, fmt.Errorf("gather pool information: %w", err)
132133
}
@@ -513,9 +514,9 @@ type requestData struct {
513514
}
514515

515516
type deviceWithID struct {
516-
id DeviceID
517-
basic *draapi.BasicDevice
518-
slice *draapi.ResourceSlice
517+
id DeviceID
518+
device *draapi.Device
519+
slice *draapi.ResourceSlice
519520
}
520521

521522
type internalAllocationResult struct {
@@ -526,6 +527,7 @@ type internalDeviceResult struct {
526527
request string // name of the request (if no subrequests) or the subrequest
527528
parentRequest string // name of the request which contains the subrequest, empty otherwise
528529
id DeviceID
530+
device *draapi.Device
529531
slice *draapi.ResourceSlice
530532
adminAccess *bool
531533
}
@@ -621,7 +623,7 @@ func (m *matchAttributeConstraint) add(requestName, subRequestName string, devic
621623
return true
622624
}
623625

624-
func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.BasicDevice, deviceID DeviceID) {
626+
func (m *matchAttributeConstraint) remove(requestName, subRequestName string, device *draapi.Device, deviceID DeviceID) {
625627
if m.requestNames.Len() > 0 && !m.matches(requestName, subRequestName) {
626628
// Device not affected by constraint.
627629
return
@@ -640,7 +642,7 @@ func (m *matchAttributeConstraint) matches(requestName, subRequestName string) b
640642
}
641643
}
642644

643-
func lookupAttribute(device *draapi.BasicDevice, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
645+
func lookupAttribute(device *draapi.Device, deviceID DeviceID, attributeName draapi.FullyQualifiedName) *draapi.DeviceAttribute {
644646
// Fully-qualified match?
645647
if attr, ok := device.Attributes[draapi.QualifiedName(attributeName)]; ok {
646648
return &attr
@@ -807,9 +809,9 @@ func (alloc *allocator) allocateOne(r deviceIndices, allocateSubRequest bool) (b
807809

808810
// Finally treat as allocated and move on to the next device.
809811
device := deviceWithID{
810-
id: deviceID,
811-
basic: slice.Spec.Devices[deviceIndex].Basic,
812-
slice: slice,
812+
id: deviceID,
813+
device: &slice.Spec.Devices[deviceIndex],
814+
slice: slice,
813815
}
814816
allocated, deallocate, err := alloc.allocateDevice(r, device, false)
815817
if err != nil {
@@ -888,7 +890,7 @@ func (alloc *allocator) isSelectable(r requestIndices, requestData requestData,
888890

889891
}
890892

891-
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDevice, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
893+
func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.Device, deviceID DeviceID, class *resourceapi.DeviceClass, selectors []resourceapi.DeviceSelector) (bool, error) {
892894
for i, selector := range selectors {
893895
expr := alloc.celCache.GetOrCompile(selector.CEL.Expression)
894896
if expr.Error != nil {
@@ -903,13 +905,15 @@ func (alloc *allocator) selectorsMatch(r requestIndices, device *draapi.BasicDev
903905
return false, fmt.Errorf("claim %s: selector #%d: CEL compile error: %w", klog.KObj(alloc.claimsToAllocate[r.claimIndex]), i, expr.Error)
904906
}
905907

906-
// If this conversion turns out to be expensive, the CEL package could be converted
907-
// to use unique strings.
908-
var d resourceapi.BasicDevice
909-
if err := draapi.Convert_api_BasicDevice_To_v1beta1_BasicDevice(device, &d, nil); err != nil {
910-
return false, fmt.Errorf("convert BasicDevice: %w", err)
908+
attributes := make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute)
909+
if err := draapi.Convert_api_Attributes_To_v1beta1_Attributes(device.Attributes, attributes); err != nil {
910+
return false, fmt.Errorf("convert attributes: %w", err)
911+
}
912+
capacity := make(map[resourceapi.QualifiedName]resourceapi.DeviceCapacity)
913+
if err := draapi.Convert_api_Capacity_To_v1beta1_Capacity(device.Capacity, capacity); err != nil {
914+
return false, fmt.Errorf("convert capacity: %w", err)
911915
}
912-
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: d.Attributes, Capacity: d.Capacity})
916+
matches, details, err := expr.DeviceMatches(alloc.ctx, cel.Device{Driver: deviceID.Driver.String(), Attributes: attributes, Capacity: capacity})
913917
if class != nil {
914918
alloc.logger.V(7).Info("CEL result", "device", deviceID, "class", klog.KObj(class), "selector", i, "expression", selector.CEL.Expression, "matches", matches, "actualCost", ptr.Deref(details.ActualCost(), 0), "err", err)
915919
} else {
@@ -949,6 +953,17 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
949953
return false, nil, nil
950954
}
951955

956+
// If a device consumes capacity from a capacity pool, verify that
957+
// there is sufficient capacity available.
958+
ok, err := alloc.checkAvailableCapacity(device)
959+
if err != nil {
960+
return false, nil, err
961+
}
962+
if !ok {
963+
alloc.logger.V(7).Info("Insufficient capacity", "device", device.id)
964+
return false, nil, nil
965+
}
966+
952967
var parentRequestName string
953968
var baseRequestName string
954969
var subRequestName string
@@ -968,7 +983,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
968983

969984
// It's available. Now check constraints.
970985
for i, constraint := range alloc.constraints[r.claimIndex] {
971-
added := constraint.add(baseRequestName, subRequestName, device.basic, device.id)
986+
added := constraint.add(baseRequestName, subRequestName, device.device, device.id)
972987
if !added {
973988
if must {
974989
// It does not make sense to declare a claim where a constraint prevents getting
@@ -978,7 +993,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
978993

979994
// Roll back for all previous constraints before we return.
980995
for e := 0; e < i; e++ {
981-
alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.basic, device.id)
996+
alloc.constraints[r.claimIndex][e].remove(baseRequestName, subRequestName, device.device, device.id)
982997
}
983998
return false, nil, nil
984999
}
@@ -994,6 +1009,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
9941009
request: request.name(),
9951010
parentRequest: parentRequestName,
9961011
id: device.id,
1012+
device: device.device,
9971013
slice: device.slice,
9981014
}
9991015
if request.adminAccess() {
@@ -1004,7 +1020,7 @@ func (alloc *allocator) allocateDevice(r deviceIndices, device deviceWithID, mus
10041020

10051021
return true, func() {
10061022
for _, constraint := range alloc.constraints[r.claimIndex] {
1007-
constraint.remove(baseRequestName, subRequestName, device.basic, device.id)
1023+
constraint.remove(baseRequestName, subRequestName, device.device, device.id)
10081024
}
10091025
if !request.adminAccess() {
10101026
alloc.allocatingDevices[device.id] = false
@@ -1033,48 +1049,126 @@ func taintTolerated(taint resourceapi.DeviceTaint, request requestAccessor) bool
10331049
return false
10341050
}
10351051

1052+
func (alloc *allocator) checkAvailableCapacity(device deviceWithID) (bool, error) {
1053+
slice := device.slice
1054+
1055+
referencedCapacityPools := sets.New[api.UniqueString]()
1056+
for _, consumedCapacity := range device.device.ConsumesCapacity {
1057+
referencedCapacityPools.Insert(consumedCapacity.CapacityPool)
1058+
}
1059+
1060+
// Create a structure that captures the initial capacity for all pools
1061+
// referenced by the device.
1062+
availableCapacities := make(map[api.UniqueString]map[api.QualifiedName]api.DeviceCapacity)
1063+
for _, capacityPool := range slice.Spec.CapacityPools {
1064+
if !referencedCapacityPools.Has(capacityPool.Name) {
1065+
continue
1066+
}
1067+
poolCapacity := make(map[api.QualifiedName]api.DeviceCapacity)
1068+
for name, cap := range capacityPool.Capacity {
1069+
poolCapacity[name] = cap
1070+
}
1071+
availableCapacities[capacityPool.Name] = poolCapacity
1072+
}
1073+
1074+
// Update the data structure to reflect capacity already in use.
1075+
for _, device := range slice.Spec.Devices {
1076+
deviceID := DeviceID{
1077+
Driver: slice.Spec.Driver,
1078+
Pool: slice.Spec.Pool.Name,
1079+
Device: device.Name,
1080+
}
1081+
if !(alloc.allocatedDevices.Has(deviceID) || alloc.allocatingDevices[deviceID]) {
1082+
continue
1083+
}
1084+
for _, consumedCapacity := range device.ConsumesCapacity {
1085+
poolCapacity := availableCapacities[consumedCapacity.CapacityPool]
1086+
for name, cap := range consumedCapacity.Capacity {
1087+
existingCap, ok := poolCapacity[name]
1088+
if !ok {
1089+
// Just continue for now, but this probably should be an error.
1090+
continue
1091+
}
1092+
// This can potentially result in negative available capacity. That is fine,
1093+
// we just treat it as no capacity available.
1094+
existingCap.Value.Sub(cap.Value)
1095+
poolCapacity[name] = existingCap
1096+
}
1097+
}
1098+
}
1099+
1100+
// Check if all consumed capacities for the device can be satisfied.
1101+
for _, deviceConsumedCapacity := range device.device.ConsumesCapacity {
1102+
poolCapacity := availableCapacities[deviceConsumedCapacity.CapacityPool]
1103+
for name, cap := range deviceConsumedCapacity.Capacity {
1104+
availableCap, found := poolCapacity[name]
1105+
// If the device requests a capacity that doesn't exist in
1106+
// the pool, it can not be allocated.
1107+
if !found {
1108+
return false, nil
1109+
}
1110+
// If the device requests more capacity than is available, it
1111+
// can not be allocated.
1112+
if availableCap.Value.Cmp(cap.Value) < 0 {
1113+
return false, nil
1114+
}
1115+
}
1116+
}
1117+
1118+
return true, nil
1119+
}
1120+
10361121
// createNodeSelector constructs a node selector for the allocation, if needed,
10371122
// otherwise it returns nil.
10381123
func (alloc *allocator) createNodeSelector(result []internalDeviceResult) (*v1.NodeSelector, error) {
10391124
// Selector with one term. That term gets extended with additional
10401125
// requirements from the different devices.
1041-
nodeSelector := &v1.NodeSelector{
1126+
ns := &v1.NodeSelector{
10421127
NodeSelectorTerms: []v1.NodeSelectorTerm{{}},
10431128
}
10441129

10451130
for i := range result {
10461131
slice := result[i].slice
1047-
if slice.Spec.NodeName != draapi.NullUniqueString {
1132+
var nodeName draapi.UniqueString
1133+
var nodeSelector *v1.NodeSelector
1134+
if slice.Spec.PerDeviceNodeSelection {
1135+
nodeName = result[i].device.NodeName
1136+
nodeSelector = result[i].device.NodeSelector
1137+
} else {
1138+
nodeName = slice.Spec.NodeName
1139+
nodeSelector = slice.Spec.NodeSelector
1140+
}
1141+
if nodeName != draapi.NullUniqueString {
10481142
// At least one device is local to one node. This
10491143
// restricts the allocation to that node.
10501144
return &v1.NodeSelector{
10511145
NodeSelectorTerms: []v1.NodeSelectorTerm{{
10521146
MatchFields: []v1.NodeSelectorRequirement{{
10531147
Key: "metadata.name",
10541148
Operator: v1.NodeSelectorOpIn,
1055-
Values: []string{slice.Spec.NodeName.String()},
1149+
Values: []string{nodeName.String()},
10561150
}},
10571151
}},
10581152
}, nil
10591153
}
1060-
if slice.Spec.NodeSelector != nil {
1061-
switch len(slice.Spec.NodeSelector.NodeSelectorTerms) {
1154+
if nodeSelector != nil {
1155+
switch len(nodeSelector.NodeSelectorTerms) {
10621156
case 0:
10631157
// Nothing?
10641158
case 1:
10651159
// Add all terms if they are not present already.
1066-
addNewNodeSelectorRequirements(slice.Spec.NodeSelector.NodeSelectorTerms[0].MatchFields, &nodeSelector.NodeSelectorTerms[0].MatchFields)
1067-
addNewNodeSelectorRequirements(slice.Spec.NodeSelector.NodeSelectorTerms[0].MatchExpressions, &nodeSelector.NodeSelectorTerms[0].MatchExpressions)
1160+
addNewNodeSelectorRequirements(nodeSelector.NodeSelectorTerms[0].MatchFields, &ns.NodeSelectorTerms[0].MatchFields)
1161+
addNewNodeSelectorRequirements(nodeSelector.NodeSelectorTerms[0].MatchExpressions, &ns.NodeSelectorTerms[0].MatchExpressions)
10681162
default:
10691163
// This shouldn't occur, validation must prevent creation of such slices.
1070-
return nil, fmt.Errorf("unsupported ResourceSlice.NodeSelector with %d terms", len(slice.Spec.NodeSelector.NodeSelectorTerms))
1164+
return nil, fmt.Errorf("unsupported ResourceSlice.NodeSelector with %d terms", len(nodeSelector.NodeSelectorTerms))
10711165
}
10721166
}
10731167
}
10741168

1075-
if len(nodeSelector.NodeSelectorTerms[0].MatchFields) > 0 || len(nodeSelector.NodeSelectorTerms[0].MatchExpressions) > 0 {
1169+
if len(ns.NodeSelectorTerms[0].MatchFields) > 0 || len(ns.NodeSelectorTerms[0].MatchExpressions) > 0 {
10761170
// We have a valid node selector.
1077-
return nodeSelector, nil
1171+
return ns, nil
10781172
}
10791173

10801174
// Available everywhere.

0 commit comments

Comments
 (0)