Skip to content

Commit 2229a78

Browse files
committed
DRA: Update allocator for Prioritized Alternatives in Device Requests
1 parent cc35f9b commit 2229a78

File tree

8 files changed

+1232
-184
lines changed

8 files changed

+1232
-184
lines changed

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"slices"
24+
"strings"
2425
"sync"
2526

2627
"github.com/google/go-cmp/cmp" //nolint:depguard
@@ -103,6 +104,7 @@ type informationForClaim struct {
103104
type DynamicResources struct {
104105
enabled bool
105106
enableAdminAccess bool
107+
enablePrioritizedList bool
106108
enableSchedulingQueueHint bool
107109

108110
fh framework.Handle
@@ -121,6 +123,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
121123
pl := &DynamicResources{
122124
enabled: true,
123125
enableAdminAccess: fts.EnableDRAAdminAccess,
126+
enablePrioritizedList: fts.EnableDRAPrioritizedList,
124127
enableSchedulingQueueHint: fts.EnableSchedulingQueueHint,
125128

126129
fh: fh,
@@ -405,20 +408,19 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
405408
// initial set of potential nodes before we ask the
406409
// driver(s) for information about the specific pod.
407410
for _, request := range claim.Spec.Devices.Requests {
408-
if request.DeviceClassName == "" {
409-
return nil, statusError(logger, fmt.Errorf("request %s: unsupported request type", request.Name))
410-
}
411-
412-
_, err := pl.draManager.DeviceClasses().Get(request.DeviceClassName)
413-
if err != nil {
414-
// If the class cannot be retrieved, allocation cannot proceed.
415-
if apierrors.IsNotFound(err) {
416-
// Here we mark the pod as "unschedulable", so it'll sleep in
417-
// the unscheduleable queue until a DeviceClass event occurs.
418-
return nil, statusUnschedulable(logger, fmt.Sprintf("request %s: device class %s does not exist", request.Name, request.DeviceClassName))
411+
// The requirements differ depending on whether the request has a list of
412+
// alternative subrequests defined in the firstAvailable field.
413+
if len(request.FirstAvailable) == 0 {
414+
if status := pl.validateDeviceClass(logger, request.DeviceClassName, request.Name); status != nil {
415+
return nil, status
416+
}
417+
} else {
418+
for _, subRequest := range request.FirstAvailable {
419+
qualRequestName := strings.Join([]string{request.Name, subRequest.Name}, "/")
420+
if status := pl.validateDeviceClass(logger, subRequest.DeviceClassName, qualRequestName); status != nil {
421+
return nil, status
422+
}
419423
}
420-
// Other error, retry with backoff.
421-
return nil, statusError(logger, fmt.Errorf("request %s: look up device class: %w", request.Name, err))
422424
}
423425
}
424426
}
@@ -447,7 +449,7 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
447449
if err != nil {
448450
return nil, statusError(logger, err)
449451
}
450-
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
452+
allocator, err := structured.NewAllocator(ctx, pl.enableAdminAccess, pl.enablePrioritizedList, allocateClaims, allAllocatedDevices, pl.draManager.DeviceClasses(), slices, pl.celCache)
451453
if err != nil {
452454
return nil, statusError(logger, err)
453455
}
@@ -459,6 +461,23 @@ func (pl *DynamicResources) PreFilter(ctx context.Context, state *framework.Cycl
459461
return nil, nil
460462
}
461463

464+
func (pl *DynamicResources) validateDeviceClass(logger klog.Logger, deviceClassName, requestName string) *framework.Status {
465+
if deviceClassName == "" {
466+
return statusError(logger, fmt.Errorf("request %s: unsupported request type", requestName))
467+
}
468+
469+
_, err := pl.draManager.DeviceClasses().Get(deviceClassName)
470+
if err != nil {
471+
// If the class cannot be retrieved, allocation cannot proceed.
472+
if apierrors.IsNotFound(err) {
473+
// Here we mark the pod as "unschedulable", so it'll sleep in
474+
// the unscheduleable queue until a DeviceClass event occurs.
475+
return statusUnschedulable(logger, fmt.Sprintf("request %s: device class %s does not exist", requestName, deviceClassName))
476+
}
477+
}
478+
return nil
479+
}
480+
462481
// PreFilterExtensions returns prefilter extensions, pod add and remove.
463482
func (pl *DynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
464483
return nil

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,17 @@ var (
117117
Namespace(namespace).
118118
Request(className).
119119
Obj()
120+
claimWithPrioritzedList = st.MakeResourceClaim().
121+
Name(claimName).
122+
Namespace(namespace).
123+
RequestWithPrioritizedList(className).
124+
Obj()
120125
pendingClaim = st.FromResourceClaim(claim).
121126
OwnerReference(podName, podUID, podKind).
122127
Obj()
128+
pendingClaimWithPrioritizedList = st.FromResourceClaim(claimWithPrioritzedList).
129+
OwnerReference(podName, podUID, podKind).
130+
Obj()
123131
allocationResult = &resourceapi.AllocationResult{
124132
Devices: resourceapi.DeviceAllocationResult{
125133
Results: []resourceapi.DeviceRequestAllocationResult{{
@@ -133,13 +141,33 @@ var (
133141
return st.MakeNodeSelector().In("metadata.name", []string{nodeName}, st.NodeSelectorTypeMatchFields).Obj()
134142
}(),
135143
}
144+
allocationResultWithPrioritizedList = &resourceapi.AllocationResult{
145+
Devices: resourceapi.DeviceAllocationResult{
146+
Results: []resourceapi.DeviceRequestAllocationResult{{
147+
Driver: driver,
148+
Pool: nodeName,
149+
Device: "instance-1",
150+
Request: "req-1/subreq-1",
151+
}},
152+
},
153+
NodeSelector: func() *v1.NodeSelector {
154+
return st.MakeNodeSelector().In("metadata.name", []string{nodeName}, st.NodeSelectorTypeMatchFields).Obj()
155+
}(),
156+
}
136157
inUseClaim = st.FromResourceClaim(pendingClaim).
137158
Allocation(allocationResult).
138159
ReservedForPod(podName, types.UID(podUID)).
139160
Obj()
161+
inUseClaimWithPrioritizedList = st.FromResourceClaim(pendingClaimWithPrioritizedList).
162+
Allocation(allocationResultWithPrioritizedList).
163+
ReservedForPod(podName, types.UID(podUID)).
164+
Obj()
140165
allocatedClaim = st.FromResourceClaim(pendingClaim).
141166
Allocation(allocationResult).
142167
Obj()
168+
allocatedClaimWithPrioritizedList = st.FromResourceClaim(pendingClaimWithPrioritizedList).
169+
Allocation(allocationResultWithPrioritizedList).
170+
Obj()
143171

144172
allocatedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
145173
Allocation(&resourceapi.AllocationResult{NodeSelector: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}, st.NodeSelectorTypeMatchExpressions).Obj()}).
@@ -201,6 +229,24 @@ func breakCELInClass(class *resourceapi.DeviceClass) *resourceapi.DeviceClass {
201229
return class
202230
}
203231

232+
func updateDeviceClassName(claim *resourceapi.ResourceClaim, deviceClassName string) *resourceapi.ResourceClaim {
233+
claim = claim.DeepCopy()
234+
for i := range claim.Spec.Devices.Requests {
235+
// If the firstAvailable list is empty we update the device class name
236+
// on the base request.
237+
if len(claim.Spec.Devices.Requests[i].FirstAvailable) == 0 {
238+
claim.Spec.Devices.Requests[i].DeviceClassName = deviceClassName
239+
} else {
240+
// If subrequests are specified, update the device class name on
241+
// all of them.
242+
for j := range claim.Spec.Devices.Requests[i].FirstAvailable {
243+
claim.Spec.Devices.Requests[i].FirstAvailable[j].DeviceClassName = deviceClassName
244+
}
245+
}
246+
}
247+
return claim
248+
}
249+
204250
// result defines the expected outcome of some operation. It covers
205251
// operation's status and the state of the world (= objects).
206252
type result struct {
@@ -295,6 +341,8 @@ func TestPlugin(t *testing.T) {
295341
// Feature gates. False is chosen so that the uncommon case
296342
// doesn't need to be set.
297343
disableDRA bool
344+
345+
enableDRAPrioritizedList bool
298346
}{
299347
"empty": {
300348
pod: st.MakePod().Name("foo").Namespace("default").Obj(),
@@ -795,6 +843,69 @@ func TestPlugin(t *testing.T) {
795843
},
796844
disableDRA: true,
797845
},
846+
"claim-with-request-with-unknown-device-class": {
847+
pod: podWithClaimName,
848+
claims: []*resourceapi.ResourceClaim{updateDeviceClassName(claim, "does-not-exist")},
849+
want: want{
850+
prefilter: result{
851+
status: framework.NewStatus(framework.Unschedulable, `request req-1: device class does-not-exist does not exist`),
852+
},
853+
postfilter: result{
854+
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
855+
},
856+
},
857+
},
858+
"claim-with-prioritized-list-feature-disabled": {
859+
enableDRAPrioritizedList: false,
860+
pod: podWithClaimName,
861+
claims: []*resourceapi.ResourceClaim{claimWithPrioritzedList},
862+
classes: []*resourceapi.DeviceClass{deviceClass},
863+
want: want{
864+
filter: perNodeResult{
865+
workerNode.Name: {
866+
status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `claim default/my-pod-my-resource, request req-1: has subrequests, but the feature is disabled`),
867+
},
868+
},
869+
},
870+
},
871+
"claim-with-prioritized-list-unknown-device-class": {
872+
enableDRAPrioritizedList: true,
873+
pod: podWithClaimName,
874+
claims: []*resourceapi.ResourceClaim{updateDeviceClassName(claimWithPrioritzedList, "does-not-exist")},
875+
want: want{
876+
prefilter: result{
877+
status: framework.NewStatus(framework.Unschedulable, `request req-1/subreq-1: device class does-not-exist does not exist`),
878+
},
879+
postfilter: result{
880+
status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
881+
},
882+
},
883+
},
884+
"claim-with-prioritized-list": {
885+
enableDRAPrioritizedList: true,
886+
pod: podWithClaimName,
887+
claims: []*resourceapi.ResourceClaim{pendingClaimWithPrioritizedList},
888+
classes: []*resourceapi.DeviceClass{deviceClass},
889+
objs: []apiruntime.Object{workerNodeSlice},
890+
want: want{
891+
reserve: result{
892+
inFlightClaim: allocatedClaimWithPrioritizedList,
893+
},
894+
prebind: result{
895+
assumedClaim: reserve(allocatedClaimWithPrioritizedList, podWithClaimName),
896+
changes: change{
897+
claim: func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
898+
if claim.Name == claimName {
899+
claim = claim.DeepCopy()
900+
claim.Finalizers = allocatedClaimWithPrioritizedList.Finalizers
901+
claim.Status = inUseClaimWithPrioritizedList.Status
902+
}
903+
return claim
904+
},
905+
},
906+
},
907+
},
908+
},
798909
}
799910

800911
for name, tc := range testcases {
@@ -809,6 +920,7 @@ func TestPlugin(t *testing.T) {
809920
features := feature.Features{
810921
EnableDRAAdminAccess: tc.enableDRAAdminAccess,
811922
EnableDynamicResourceAllocation: !tc.disableDRA,
923+
EnableDRAPrioritizedList: tc.enableDRAPrioritizedList,
812924
}
813925
testCtx := setup(t, nodes, tc.claims, tc.classes, tc.objs, features)
814926
initialObjects := testCtx.listAll(t)

pkg/scheduler/framework/plugins/feature/feature.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package feature
2020
// This struct allows us to break the dependency of the plugins on
2121
// the internal k8s features pkg.
2222
type Features struct {
23+
EnableDRAPrioritizedList bool
2324
EnableDRAAdminAccess bool
2425
EnableDynamicResourceAllocation bool
2526
EnableVolumeCapacityPriority bool

pkg/scheduler/framework/plugins/registry.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
// through the WithFrameworkOutOfTreeRegistry option.
4747
func NewInTreeRegistry() runtime.Registry {
4848
fts := plfeature.Features{
49+
EnableDRAPrioritizedList: feature.DefaultFeatureGate.Enabled(features.DRAPrioritizedList),
4950
EnableDRAAdminAccess: feature.DefaultFeatureGate.Enabled(features.DRAAdminAccess),
5051
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
5152
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),

pkg/scheduler/testing/wrappers.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,28 @@ func (wrapper *ResourceClaimWrapper) Request(deviceClassName string) *ResourceCl
11041104
return wrapper
11051105
}
11061106

1107+
// RequestWithPrioritizedList adds one device request with one subrequest
1108+
// per provided deviceClassName.
1109+
func (wrapper *ResourceClaimWrapper) RequestWithPrioritizedList(deviceClassNames ...string) *ResourceClaimWrapper {
1110+
var prioritizedList []resourceapi.DeviceSubRequest
1111+
for i, deviceClassName := range deviceClassNames {
1112+
prioritizedList = append(prioritizedList, resourceapi.DeviceSubRequest{
1113+
Name: fmt.Sprintf("subreq-%d", i+1),
1114+
AllocationMode: resourceapi.DeviceAllocationModeExactCount,
1115+
Count: 1,
1116+
DeviceClassName: deviceClassName,
1117+
})
1118+
}
1119+
1120+
wrapper.Spec.Devices.Requests = append(wrapper.Spec.Devices.Requests,
1121+
resourceapi.DeviceRequest{
1122+
Name: fmt.Sprintf("req-%d", len(wrapper.Spec.Devices.Requests)+1),
1123+
FirstAvailable: prioritizedList,
1124+
},
1125+
)
1126+
return wrapper
1127+
}
1128+
11071129
// Allocation sets the allocation of the inner object.
11081130
func (wrapper *ResourceClaimWrapper) Allocation(allocation *resourceapi.AllocationResult) *ResourceClaimWrapper {
11091131
if !slices.Contains(wrapper.ResourceClaim.Finalizers, resourceapi.Finalizer) {

0 commit comments

Comments
 (0)