Skip to content

Commit 67cdc26

Browse files
authored
Merge pull request kubernetes#127497 from pohly/dra-scheduler-queueing-hints-fix
DRA scheduler: fix queuing hint support
2 parents 5e65529 + aee77bf commit 67cdc26

File tree

9 files changed

+403
-104
lines changed

9 files changed

+403
-104
lines changed

pkg/scheduler/framework/events.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
v1 "k8s.io/api/core/v1"
2121
"k8s.io/apimachinery/pkg/api/equality"
2222
utilfeature "k8s.io/apiserver/pkg/util/feature"
23+
"k8s.io/dynamic-resource-allocation/resourceclaim"
2324
"k8s.io/kubernetes/pkg/api/v1/resource"
2425
"k8s.io/kubernetes/pkg/features"
2526
)
@@ -65,6 +66,8 @@ var (
6566
PodTolerationChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodTolerations, Label: "PodTolerationChange"}
6667
// PodSchedulingGateEliminatedChange is the event when a pod's scheduling gate is changed.
6768
PodSchedulingGateEliminatedChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodSchedulingGatesEliminated, Label: "PodSchedulingGateChange"}
69+
// PodGeneratedResourceClaimChange is the event when a pod's list of generated ResourceClaims changes.
70+
PodGeneratedResourceClaimChange = ClusterEvent{Resource: Pod, ActionType: UpdatePodGeneratedResourceClaim, Label: "PodGeneratedResourceClaimChange"}
6871
// NodeSpecUnschedulableChange is the event when unschedulable node spec is changed.
6972
NodeSpecUnschedulableChange = ClusterEvent{Resource: Node, ActionType: UpdateNodeTaint, Label: "NodeSpecUnschedulableChange"}
7073
// NodeAllocatableChange is the event when node allocatable is changed.
@@ -152,6 +155,9 @@ func PodSchedulingPropertiesChange(newPod *v1.Pod, oldPod *v1.Pod) (events []Clu
152155
extractPodSchedulingGateEliminatedChange,
153156
extractPodTolerationChange,
154157
}
158+
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
159+
podChangeExtracters = append(podChangeExtracters, extractPodGeneratedResourceClaimChange)
160+
}
155161

156162
for _, fn := range podChangeExtracters {
157163
if event := fn(newPod, oldPod); event != nil {
@@ -222,6 +228,14 @@ func extractPodSchedulingGateEliminatedChange(newPod *v1.Pod, oldPod *v1.Pod) *C
222228
return nil
223229
}
224230

231+
func extractPodGeneratedResourceClaimChange(newPod *v1.Pod, oldPod *v1.Pod) *ClusterEvent {
232+
if !resourceclaim.PodStatusEqual(newPod.Status.ResourceClaimStatuses, oldPod.Status.ResourceClaimStatuses) {
233+
return &PodGeneratedResourceClaimChange
234+
}
235+
236+
return nil
237+
}
238+
225239
// NodeSchedulingPropertiesChange interprets the update of a node and returns corresponding UpdateNodeXYZ event(s).
226240
func NodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) (events []ClusterEvent) {
227241
nodeChangeExtracters := []nodeChangeExtractor{

pkg/scheduler/framework/events_test.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import (
2525
v1 "k8s.io/api/core/v1"
2626
"k8s.io/apimachinery/pkg/api/resource"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
utilfeature "k8s.io/apiserver/pkg/util/feature"
29+
featuregatetesting "k8s.io/component-base/featuregate/testing"
30+
"k8s.io/kubernetes/pkg/features"
2831
st "k8s.io/kubernetes/pkg/scheduler/testing"
32+
"k8s.io/utils/ptr"
2933
)
3034

3135
func TestNodeAllocatableChange(t *testing.T) {
@@ -335,11 +339,20 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
335339
},
336340
},
337341
}
342+
claimStatusA := v1.PodResourceClaimStatus{
343+
Name: "my-claim",
344+
ResourceClaimName: ptr.To("claim"),
345+
}
346+
claimStatusB := v1.PodResourceClaimStatus{
347+
Name: "my-claim-2",
348+
ResourceClaimName: ptr.To("claim-2"),
349+
}
338350
tests := []struct {
339-
name string
340-
newPod *v1.Pod
341-
oldPod *v1.Pod
342-
want []ClusterEvent
351+
name string
352+
newPod *v1.Pod
353+
oldPod *v1.Pod
354+
draDisabled bool
355+
want []ClusterEvent
343356
}{
344357
{
345358
name: "only label is updated",
@@ -389,9 +402,35 @@ func Test_podSchedulingPropertiesChange(t *testing.T) {
389402
oldPod: st.MakePod().Toleration("key").Obj(),
390403
want: []ClusterEvent{PodTolerationChange},
391404
},
405+
{
406+
name: "pod claim statuses change, feature disabled",
407+
draDisabled: true,
408+
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
409+
oldPod: st.MakePod().Obj(),
410+
want: []ClusterEvent{assignedPodOtherUpdate},
411+
},
412+
{
413+
name: "pod claim statuses change, feature enabled",
414+
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
415+
oldPod: st.MakePod().Obj(),
416+
want: []ClusterEvent{PodGeneratedResourceClaimChange},
417+
},
418+
{
419+
name: "pod claim statuses swapped",
420+
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
421+
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusB, claimStatusA).Obj(),
422+
want: []ClusterEvent{PodGeneratedResourceClaimChange},
423+
},
424+
{
425+
name: "pod claim statuses extended",
426+
newPod: st.MakePod().ResourceClaimStatuses(claimStatusA, claimStatusB).Obj(),
427+
oldPod: st.MakePod().ResourceClaimStatuses(claimStatusA).Obj(),
428+
want: []ClusterEvent{PodGeneratedResourceClaimChange},
429+
},
392430
}
393431
for _, tt := range tests {
394432
t.Run(tt.name, func(t *testing.T) {
433+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, !tt.draDisabled)
395434
got := PodSchedulingPropertiesChange(tt.newPod, tt.oldPod)
396435
if diff := cmp.Diff(tt.want, got); diff != "" {
397436
t.Errorf("unexpected event is returned from podSchedulingPropertiesChange (-want, +got):\n%s", diff)

pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,8 @@ func (pl *dynamicResources) EventsToRegister(_ context.Context) ([]framework.Clu
399399
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: nodeActionType}},
400400
// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
401401
{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
402+
// Adding the ResourceClaim name to the pod status makes pods waiting for their ResourceClaim schedulable.
403+
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.UpdatePodGeneratedResourceClaim}, QueueingHintFn: pl.isSchedulableAfterPodChange},
402404
// A pod might be waiting for a class to get created or modified.
403405
{Event: framework.ClusterEvent{Resource: framework.DeviceClass, ActionType: framework.Add | framework.Update}},
404406
// Adding or updating a ResourceSlice might make a pod schedulable because new resources became available.
@@ -450,7 +452,10 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
450452
// This is not an unexpected error: we know that
451453
// foreachPodResourceClaim only returns errors for "not
452454
// schedulable".
453-
logger.V(6).Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
455+
if loggerV := logger.V(6); loggerV.Enabled() {
456+
owner := metav1.GetControllerOf(modifiedClaim)
457+
loggerV.Info("pod is not schedulable after resource claim change", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "claimOwner", owner, "reason", err.Error())
458+
}
454459
return framework.QueueSkip, nil
455460
}
456461

@@ -474,7 +479,7 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
474479
}
475480

476481
if originalClaim == nil {
477-
logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
482+
logger.V(5).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
478483
return framework.Queue, nil
479484
}
480485

@@ -492,7 +497,34 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
492497
return framework.QueueSkip, nil
493498
}
494499

495-
logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
500+
logger.V(5).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
501+
return framework.Queue, nil
502+
}
503+
504+
// isSchedulableAfterPodChange is invoked for update pod events reported by
505+
// an informer. It checks whether that change adds the ResourceClaim(s) that the
506+
// pod has been waiting for.
507+
func (pl *dynamicResources) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
508+
_, modifiedPod, err := schedutil.As[*v1.Pod](nil, newObj)
509+
if err != nil {
510+
// Shouldn't happen.
511+
return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
512+
}
513+
514+
if pod.UID != modifiedPod.UID {
515+
logger.V(7).Info("pod is not schedulable after change in other pod", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
516+
return framework.QueueSkip, nil
517+
}
518+
519+
if err := pl.foreachPodResourceClaim(modifiedPod, nil); err != nil {
520+
// This is not an unexpected error: we know that
521+
// foreachPodResourceClaim only returns errors for "not
522+
// schedulable".
523+
logger.V(6).Info("pod is not schedulable after being updated", "pod", klog.KObj(pod))
524+
return framework.QueueSkip, nil
525+
}
526+
527+
logger.V(5).Info("pod got updated and is schedulable", "pod", klog.KObj(pod))
496528
return framework.Queue, nil
497529
}
498530

@@ -537,7 +569,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
537569
// Deleted? That can happen because we ourselves delete the PodSchedulingContext while
538570
// working on the pod. This can be ignored.
539571
if oldObj != nil && newObj == nil {
540-
logger.V(4).Info("PodSchedulingContext got deleted")
572+
logger.V(5).Info("PodSchedulingContext got deleted")
541573
return framework.QueueSkip, nil
542574
}
543575

@@ -568,7 +600,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
568600
// This is not an unexpected error: we know that
569601
// foreachPodResourceClaim only returns errors for "not
570602
// schedulable".
571-
logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
603+
logger.V(5).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
572604
return framework.QueueSkip, nil
573605
}
574606

@@ -589,7 +621,7 @@ func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger
589621
if oldPodScheduling == nil /* create */ ||
590622
len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
591623
// This definitely is new information for the scheduler. Try again immediately.
592-
logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
624+
logger.V(5).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
593625
return framework.Queue, nil
594626
}
595627

0 commit comments

Comments
 (0)