Skip to content

Commit 94fe1b3

Browse files
authored
Merge pull request kubernetes#121867 from lianghao208/preeption
feat: Support score extension function in preemption.
2 parents 56d7898 + 34e620d commit 94fe1b3

File tree

4 files changed

+211
-57
lines changed

4 files changed

+211
-57
lines changed

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNo
264264
return true, ""
265265
}
266266

267+
// OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
268+
func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
269+
return nil
270+
}
271+
267272
// podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled.
268273
// Otherwise, it additionally checks if the termination state is caused by scheduler preemption.
269274
func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,7 @@ func TestSelectBestCandidate(t *testing.T) {
13891389
}
13901390
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
13911391
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
1392-
s := pe.SelectCandidate(logger, candidates)
1392+
s := pe.SelectCandidate(ctx, candidates)
13931393
if s == nil || len(s.Name()) == 0 {
13941394
return
13951395
}

pkg/scheduler/framework/preemption/preemption.go

Lines changed: 67 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ type Interface interface {
116116
// Note that both `state` and `nodeInfo` are deep copied.
117117
SelectVictimsOnNode(ctx context.Context, state *framework.CycleState,
118118
pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status)
119+
// OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
120+
// The ordered score functions will be processed one by one iff we find more than one node with the highest score.
121+
// Default score functions will be processed if nil returned here for backwards-compatibility.
122+
OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64
119123
}
120124

121125
type Evaluator struct {
@@ -190,7 +194,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeT
190194
}
191195

192196
// 4) Find the best candidate.
193-
bestCandidate := ev.SelectCandidate(logger, candidates)
197+
bestCandidate := ev.SelectCandidate(ctx, candidates)
194198
if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
195199
return nil, framework.NewStatus(framework.Unschedulable, "no candidate node for preemption")
196200
}
@@ -309,7 +313,9 @@ func (ev *Evaluator) callExtenders(logger klog.Logger, pod *v1.Pod, candidates [
309313

310314
// SelectCandidate chooses the best-fit candidate from given <candidates> and return it.
311315
// NOTE: This method is exported for easier testing in default preemption.
312-
func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate) Candidate {
316+
func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate) Candidate {
317+
logger := klog.FromContext(ctx)
318+
313319
if len(candidates) == 0 {
314320
return nil
315321
}
@@ -318,7 +324,8 @@ func (ev *Evaluator) SelectCandidate(logger klog.Logger, candidates []Candidate)
318324
}
319325

320326
victimsMap := ev.CandidatesToVictimsMap(candidates)
321-
candidateNode := pickOneNodeForPreemption(logger, victimsMap)
327+
scoreFuncs := ev.OrderedScoreFuncs(ctx, victimsMap)
328+
candidateNode := pickOneNodeForPreemption(logger, victimsMap, scoreFuncs)
322329

323330
// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree
324331
// preemption plugins that exercise different candidates on the same nominated node.
@@ -428,8 +435,10 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
428435
return nil, nil
429436
}
430437

431-
// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
432-
// pods in each map entry are ordered by decreasing priority.
438+
// pickOneNodeForPreemption chooses one node among the given nodes.
439+
// It assumes pods in each map entry are ordered by decreasing priority.
440+
// If the scoreFuns is not empty, It picks a node based on score scoreFuns returns.
441+
// If the scoreFuns is empty,
433442
// It picks a node based on the following criteria:
434443
// 1. A node with minimum number of PDB violations.
435444
// 2. A node with minimum highest priority victim is picked.
@@ -439,7 +448,7 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
439448
// 6. If there are still ties, the first such node is picked (sort of randomly).
440449
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
441450
// allocation and garbage collection time.
442-
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims) string {
451+
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims, scoreFuncs []func(node string) int64) string {
443452
if len(nodesToVictims) == 0 {
444453
return ""
445454
}
@@ -449,58 +458,60 @@ func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*ext
449458
allCandidates = append(allCandidates, node)
450459
}
451460

452-
minNumPDBViolatingScoreFunc := func(node string) int64 {
453-
// The smaller the NumPDBViolations, the higher the score.
454-
return -nodesToVictims[node].NumPDBViolations
455-
}
456-
minHighestPriorityScoreFunc := func(node string) int64 {
457-
// highestPodPriority is the highest priority among the victims on this node.
458-
highestPodPriority := corev1helpers.PodPriority(nodesToVictims[node].Pods[0])
459-
// The smaller the highestPodPriority, the higher the score.
460-
return -int64(highestPodPriority)
461-
}
462-
minSumPrioritiesScoreFunc := func(node string) int64 {
463-
var sumPriorities int64
464-
for _, pod := range nodesToVictims[node].Pods {
465-
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
466-
// needed so that a node with a few pods with negative priority is not
467-
// picked over a node with a smaller number of pods with the same negative
468-
// priority (and similar scenarios).
469-
sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
461+
if len(scoreFuncs) == 0 {
462+
minNumPDBViolatingScoreFunc := func(node string) int64 {
463+
// The smaller the NumPDBViolations, the higher the score.
464+
return -nodesToVictims[node].NumPDBViolations
465+
}
466+
minHighestPriorityScoreFunc := func(node string) int64 {
467+
// highestPodPriority is the highest priority among the victims on this node.
468+
highestPodPriority := corev1helpers.PodPriority(nodesToVictims[node].Pods[0])
469+
// The smaller the highestPodPriority, the higher the score.
470+
return -int64(highestPodPriority)
471+
}
472+
minSumPrioritiesScoreFunc := func(node string) int64 {
473+
var sumPriorities int64
474+
for _, pod := range nodesToVictims[node].Pods {
475+
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
476+
// needed so that a node with a few pods with negative priority is not
477+
// picked over a node with a smaller number of pods with the same negative
478+
// priority (and similar scenarios).
479+
sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
480+
}
481+
// The smaller the sumPriorities, the higher the score.
482+
return -sumPriorities
470483
}
471-
// The smaller the sumPriorities, the higher the score.
472-
return -sumPriorities
473-
}
474-
minNumPodsScoreFunc := func(node string) int64 {
475-
// The smaller the length of pods, the higher the score.
476-
return -int64(len(nodesToVictims[node].Pods))
477-
}
478-
latestStartTimeScoreFunc := func(node string) int64 {
479-
// Get earliest start time of all pods on the current node.
480-
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
481-
if earliestStartTimeOnNode == nil {
482-
logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
483-
return int64(math.MinInt64)
484+
minNumPodsScoreFunc := func(node string) int64 {
485+
// The smaller the length of pods, the higher the score.
486+
return -int64(len(nodesToVictims[node].Pods))
487+
}
488+
latestStartTimeScoreFunc := func(node string) int64 {
489+
// Get the earliest start time of all pods on the current node.
490+
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
491+
if earliestStartTimeOnNode == nil {
492+
logger.Error(errors.New("earliestStartTime is nil for node"), "Should not reach here", "node", node)
493+
return int64(math.MinInt64)
494+
}
495+
// The bigger the earliestStartTimeOnNode, the higher the score.
496+
return earliestStartTimeOnNode.UnixNano()
497+
}
498+
499+
// Each scoreFunc scores the nodes according to specific rules and keeps the name of the node
500+
// with the highest score. If and only if the scoreFunc has more than one node with the highest
501+
// score, we will execute the other scoreFunc in order of precedence.
502+
scoreFuncs = []func(string) int64{
503+
// A node with a minimum number of PDB is preferable.
504+
minNumPDBViolatingScoreFunc,
505+
// A node with a minimum highest priority victim is preferable.
506+
minHighestPriorityScoreFunc,
507+
// A node with the smallest sum of priorities is preferable.
508+
minSumPrioritiesScoreFunc,
509+
// A node with the minimum number of pods is preferable.
510+
minNumPodsScoreFunc,
511+
// A node with the latest start time of all highest priority victims is preferable.
512+
latestStartTimeScoreFunc,
513+
// If there are still ties, then the first Node in the list is selected.
484514
}
485-
// The bigger the earliestStartTimeOnNode, the higher the score.
486-
return earliestStartTimeOnNode.UnixNano()
487-
}
488-
489-
// Each scoreFunc scores the nodes according to specific rules and keeps the name of the node
490-
// with the highest score. If and only if the scoreFunc has more than one node with the highest
491-
// score, we will execute the other scoreFunc in order of precedence.
492-
scoreFuncs := []func(string) int64{
493-
// A node with a minimum number of PDB is preferable.
494-
minNumPDBViolatingScoreFunc,
495-
// A node with a minimum highest priority victim is preferable.
496-
minHighestPriorityScoreFunc,
497-
// A node with the smallest sum of priorities is preferable.
498-
minSumPrioritiesScoreFunc,
499-
// A node with the minimum number of pods is preferable.
500-
minNumPodsScoreFunc,
501-
// A node with the latest start time of all highest priority victims is preferable.
502-
latestStartTimeScoreFunc,
503-
// If there are still ties, then the first Node in the list is selected.
504515
}
505516

506517
for _, f := range scoreFuncs {

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,47 @@ func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominate
8282
return true, ""
8383
}
8484

85+
func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
86+
return nil
87+
}
88+
89+
type FakePreemptionScorePostFilterPlugin struct{}
90+
91+
func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
92+
ctx context.Context, state *framework.CycleState, pod *v1.Pod,
93+
nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) {
94+
return append(victims, nodeInfo.Pods[0].Pod), 1, nil
95+
}
96+
97+
func (pl *FakePreemptionScorePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
98+
return 0, nodes
99+
}
100+
101+
func (pl *FakePreemptionScorePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
102+
m := make(map[string]*extenderv1.Victims, len(candidates))
103+
for _, c := range candidates {
104+
m[c.Name()] = c.Victims()
105+
}
106+
return m
107+
}
108+
109+
func (pl *FakePreemptionScorePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
110+
return true, ""
111+
}
112+
113+
func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
114+
return []func(string) int64{
115+
func(node string) int64 {
116+
var sumContainers int64
117+
for _, pod := range nodesToVictims[node].Pods {
118+
sumContainers += int64(len(pod.Spec.Containers) + len(pod.Spec.InitContainers))
119+
}
120+
// The smaller the sumContainers, the higher the score.
121+
return -sumContainers
122+
},
123+
}
124+
}
125+
85126
func TestNodesWherePreemptionMightHelp(t *testing.T) {
86127
// Prepare 4 nodes names.
87128
nodeNames := []string{"node1", "node2", "node3", "node4"}
@@ -337,3 +378,100 @@ func TestDryRunPreemption(t *testing.T) {
337378
})
338379
}
339380
}
381+
382+
func TestSelectCandidate(t *testing.T) {
383+
tests := []struct {
384+
name string
385+
nodeNames []string
386+
pod *v1.Pod
387+
testPods []*v1.Pod
388+
expected string
389+
}{
390+
{
391+
name: "pod has different number of containers on each node",
392+
nodeNames: []string{"node1", "node2", "node3"},
393+
pod: st.MakePod().Name("p").UID("p").Priority(highPriority).Req(veryLargeRes).Obj(),
394+
testPods: []*v1.Pod{
395+
st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(midPriority).Containers([]v1.Container{
396+
st.MakeContainer().Name("container1").Obj(),
397+
st.MakeContainer().Name("container2").Obj(),
398+
}).Obj(),
399+
st.MakePod().Name("p2.1").UID("p2.1").Node("node2").Priority(midPriority).Containers([]v1.Container{
400+
st.MakeContainer().Name("container1").Obj(),
401+
}).Obj(),
402+
st.MakePod().Name("p3.1").UID("p3.1").Node("node3").Priority(midPriority).Containers([]v1.Container{
403+
st.MakeContainer().Name("container1").Obj(),
404+
st.MakeContainer().Name("container2").Obj(),
405+
st.MakeContainer().Name("container3").Obj(),
406+
}).Obj(),
407+
},
408+
expected: "node2",
409+
},
410+
}
411+
412+
for _, tt := range tests {
413+
t.Run(tt.name, func(t *testing.T) {
414+
logger, _ := ktesting.NewTestContext(t)
415+
nodes := make([]*v1.Node, len(tt.nodeNames))
416+
for i, nodeName := range tt.nodeNames {
417+
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
418+
}
419+
registeredPlugins := append([]tf.RegisterPluginFunc{
420+
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
421+
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
422+
)
423+
var objs []runtime.Object
424+
objs = append(objs, tt.pod)
425+
for _, pod := range tt.testPods {
426+
objs = append(objs, pod)
427+
}
428+
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
429+
snapshot := internalcache.NewSnapshot(tt.testPods, nodes)
430+
_, ctx := ktesting.NewTestContext(t)
431+
ctx, cancel := context.WithCancel(ctx)
432+
defer cancel()
433+
fwk, err := tf.NewFramework(
434+
ctx,
435+
registeredPlugins,
436+
"",
437+
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
438+
frameworkruntime.WithSnapshotSharedLister(snapshot),
439+
frameworkruntime.WithLogger(logger),
440+
)
441+
if err != nil {
442+
t.Fatal(err)
443+
}
444+
445+
state := framework.NewCycleState()
446+
// Some tests rely on PreFilter plugin to compute its CycleState.
447+
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
448+
t.Errorf("Unexpected PreFilter Status: %v", status)
449+
}
450+
nodeInfos, err := snapshot.NodeInfos().List()
451+
if err != nil {
452+
t.Fatal(err)
453+
}
454+
455+
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
456+
457+
for _, pod := range tt.testPods {
458+
state := framework.NewCycleState()
459+
pe := Evaluator{
460+
PluginName: "FakePreemptionScorePostFilter",
461+
Handler: fwk,
462+
Interface: fakePreemptionScorePostFilterPlugin,
463+
State: state,
464+
}
465+
candidates, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
466+
s := pe.SelectCandidate(ctx, candidates)
467+
if s == nil || len(s.Name()) == 0 {
468+
t.Errorf("expect any node in %v, but no candidate selected", tt.expected)
469+
return
470+
}
471+
if diff := cmp.Diff(tt.expected, s.Name()); diff != "" {
472+
t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name())
473+
}
474+
}
475+
})
476+
}
477+
}

0 commit comments

Comments
 (0)