Skip to content

Commit b6899c5

Browse files
authored
Merge pull request kubernetes#122251 from olderTaoist/unschedulable-plugin
register unschedulable plugin for those plugins that PreFilter's PreFilterResult filter out some nodes
2 parents 58c4400 + b478621 commit b6899c5

File tree

9 files changed

+80
-38
lines changed

9 files changed

+80
-38
lines changed

pkg/scheduler/framework/autoscaler_contract/framework_contract_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626

2727
"github.com/stretchr/testify/assert"
2828
v1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/util/sets"
2930
"k8s.io/klog/v2/ktesting"
3031
"k8s.io/kubernetes/pkg/scheduler/framework"
3132
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
3233
)
3334

3435
type frameworkContract interface {
35-
RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status)
36+
RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status, sets.Set[string])
3637
RunFilterPlugins(context.Context, *framework.CycleState, *v1.Pod, *framework.NodeInfo) *framework.Status
3738
}
3839

pkg/scheduler/framework/interface.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,10 @@ type Framework interface {
589589
// cycle is aborted.
590590
// It also returns a PreFilterResult, which may influence what or how many nodes to
591591
// evaluate downstream.
592-
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
592+
// The third returns value contains PreFilter plugin that rejected some or all Nodes with PreFilterResult.
593+
// But, note that it doesn't contain any plugin when a plugin rejects this Pod with non-success status,
594+
// not with PreFilterResult.
595+
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status, sets.Set[string])
593596

594597
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
595598
// PostFilter plugins can either be informational, in which case should be configured

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func TestPostFilter(t *testing.T) {
383383

384384
state := framework.NewCycleState()
385385
// Ensure <state> is populated.
386-
if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
386+
if _, status, _ := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
387387
t.Errorf("Unexpected PreFilter Status: %v", status)
388388
}
389389

@@ -1141,7 +1141,7 @@ func TestDryRunPreemption(t *testing.T) {
11411141
for cycle, pod := range tt.testPods {
11421142
state := framework.NewCycleState()
11431143
// Some tests rely on PreFilter plugin to compute its CycleState.
1144-
if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
1144+
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
11451145
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
11461146
}
11471147
pe := preemption.Evaluator{
@@ -1371,7 +1371,7 @@ func TestSelectBestCandidate(t *testing.T) {
13711371

13721372
state := framework.NewCycleState()
13731373
// Some tests rely on PreFilter plugin to compute its CycleState.
1374-
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
1374+
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
13751375
t.Errorf("Unexpected PreFilter Status: %v", status)
13761376
}
13771377
nodeInfos, err := snapshot.NodeInfos().List()
@@ -1784,7 +1784,7 @@ func TestPreempt(t *testing.T) {
17841784

17851785
state := framework.NewCycleState()
17861786
// Some tests rely on PreFilter plugin to compute its CycleState.
1787-
if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
1787+
if _, s, _ := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
17881788
t.Errorf("Unexpected preFilterStatus: %v", s)
17891789
}
17901790
// Call preempt and check the expected results.

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ func TestSelectCandidate(t *testing.T) {
456456

457457
state := framework.NewCycleState()
458458
// Some tests rely on PreFilter plugin to compute its CycleState.
459-
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
459+
if _, status, _ := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
460460
t.Errorf("Unexpected PreFilter Status: %v", status)
461461
}
462462
nodeInfos, err := snapshot.NodeInfos().List()

pkg/scheduler/framework/runtime/framework.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -695,15 +695,15 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
695695
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
696696
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
697697
// If a non-success status is returned, then the scheduling cycle is aborted.
698-
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
698+
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status, _ sets.Set[string]) {
699699
startTime := time.Now()
700700
skipPlugins := sets.New[string]()
701701
defer func() {
702702
state.SkipFilterPlugins = skipPlugins
703703
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
704704
}()
705705
var result *framework.PreFilterResult
706-
var pluginsWithNodes []string
706+
pluginsWithNodes := sets.New[string]()
707707
logger := klog.FromContext(ctx)
708708
verboseLogs := logger.V(4).Enabled()
709709
if verboseLogs {
@@ -726,7 +726,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
726726
if s.Code() == framework.UnschedulableAndUnresolvable {
727727
// In this case, the preemption shouldn't happen in this scheduling cycle.
728728
// So, no need to execute all PreFilter.
729-
return nil, s
729+
return nil, s, nil
730730
}
731731
if s.Code() == framework.Unschedulable {
732732
// In this case, the preemption should happen later in this scheduling cycle.
@@ -735,23 +735,23 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
735735
returnStatus = s
736736
continue
737737
}
738-
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
738+
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name()), nil
739739
}
740740
if !r.AllNodes() {
741-
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
741+
pluginsWithNodes.Insert(pl.Name())
742742
}
743743
result = result.Merge(r)
744744
if !result.AllNodes() && len(result.NodeNames) == 0 {
745-
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
745+
msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", sets.List(pluginsWithNodes))
746746
if len(pluginsWithNodes) == 1 {
747-
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
747+
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", sets.List(pluginsWithNodes)[0])
748748
}
749749

750750
// When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
751-
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
751+
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg), pluginsWithNodes
752752
}
753753
}
754-
return result, returnStatus
754+
return result, returnStatus, pluginsWithNodes
755755
}
756756

757757
func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {

pkg/scheduler/framework/runtime/framework_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,6 @@ func TestPreFilterPlugins(t *testing.T) {
15311531
t.Fatalf("Failed to create framework for testing: %v", err)
15321532
}
15331533
state := framework.NewCycleState()
1534-
15351534
f.RunPreFilterPlugins(ctx, state, nil)
15361535
f.RunPreFilterExtensionAddPod(ctx, state, nil, nil, nil)
15371536
f.RunPreFilterExtensionRemovePod(ctx, state, nil, nil, nil)
@@ -1721,7 +1720,7 @@ func TestRunPreFilterPlugins(t *testing.T) {
17211720
}
17221721

17231722
state := framework.NewCycleState()
1724-
result, status := f.RunPreFilterPlugins(ctx, state, nil)
1723+
result, status, _ := f.RunPreFilterPlugins(ctx, state, nil)
17251724
if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
17261725
t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
17271726
}

pkg/scheduler/schedule_one.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,8 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
461461
return nil, diagnosis, err
462462
}
463463
// Run "prefilter" plugins.
464-
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
464+
preRes, s, unscheduledPlugins := fwk.RunPreFilterPlugins(ctx, state, pod)
465+
diagnosis.UnschedulablePlugins = unscheduledPlugins
465466
if !s.IsSuccess() {
466467
if !s.IsRejected() {
467468
return nil, diagnosis, s.AsError()

pkg/scheduler/schedule_one_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2283,7 +2283,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
22832283
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
22842284
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
22852285
},
2286-
UnschedulablePlugins: sets.Set[string]{},
2286+
UnschedulablePlugins: sets.New("FakePreFilter2", "FakePreFilter3"),
22872287
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
22882288
},
22892289
},
@@ -2311,7 +2311,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
23112311
NodeToStatusMap: framework.NodeToStatusMap{
23122312
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
23132313
},
2314-
UnschedulablePlugins: sets.Set[string]{},
2314+
UnschedulablePlugins: sets.New("FakePreFilter2"),
23152315
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
23162316
},
23172317
},
@@ -2339,7 +2339,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
23392339
NodeToStatusMap: framework.NodeToStatusMap{
23402340
"node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
23412341
},
2342-
UnschedulablePlugins: sets.New("FakeFilter"),
2342+
UnschedulablePlugins: sets.New("FakePreFilter", "FakeFilter"),
23432343
PreFilterMsg: "",
23442344
},
23452345
},
@@ -2444,7 +2444,8 @@ func TestSchedulerSchedulePod(t *testing.T) {
24442444
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
24452445
NumAllNodes: 2,
24462446
Diagnosis: framework.Diagnosis{
2447-
NodeToStatusMap: framework.NodeToStatusMap{},
2447+
NodeToStatusMap: framework.NodeToStatusMap{},
2448+
UnschedulablePlugins: sets.New("FakePreFilter"),
24482449
},
24492450
},
24502451
},

test/integration/scheduler/queue_test.go

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
188188
tests := []struct {
189189
name string
190190
// initialNode is the Node to be created at first.
191-
initialNode *v1.Node
191+
initialNodes []*v1.Node
192192
// initialPod is the Pod to be created at first if it's not empty.
193193
initialPod *v1.Pod
194194
// pods are the list of Pods to be created.
@@ -202,8 +202,8 @@ func TestCoreResourceEnqueue(t *testing.T) {
202202
enableSchedulingQueueHint []bool
203203
}{
204204
{
205-
name: "Pod without a required toleration to a node isn't requeued to activeQ",
206-
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
205+
name: "Pod without a required toleration to a node isn't requeued to activeQ",
206+
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()},
207207
pods: []*v1.Pod{
208208
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
209209
// (TaintToleration plugin is evaluated before NodeResourcesFit plugin.)
@@ -224,9 +224,9 @@ func TestCoreResourceEnqueue(t *testing.T) {
224224
enableSchedulingQueueHint: []bool{false, true},
225225
},
226226
{
227-
name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
228-
initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
229-
initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(),
227+
name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
228+
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
229+
initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(),
230230
pods: []*v1.Pod{
231231
// - Pod2 will be rejected by the PodAffinity plugin.
232232
st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(),
@@ -254,8 +254,8 @@ func TestCoreResourceEnqueue(t *testing.T) {
254254
enableSchedulingQueueHint: []bool{false, true},
255255
},
256256
{
257-
name: "Pod updated with toleration requeued to activeQ",
258-
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj(),
257+
name: "Pod updated with toleration requeued to activeQ",
258+
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj()},
259259
pods: []*v1.Pod{
260260
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
261261
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").Obj(),
@@ -272,8 +272,8 @@ func TestCoreResourceEnqueue(t *testing.T) {
272272
enableSchedulingQueueHint: []bool{false, true},
273273
},
274274
{
275-
name: "Pod got resource scaled down requeued to activeQ",
276-
initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
275+
name: "Pod got resource scaled down requeued to activeQ",
276+
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()},
277277
pods: []*v1.Pod{
278278
// - Pod1 requests a large amount of CPU and will be rejected by the NodeResourcesFit plugin.
279279
st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
@@ -290,8 +290,8 @@ func TestCoreResourceEnqueue(t *testing.T) {
290290
enableSchedulingQueueHint: []bool{false, true},
291291
},
292292
{
293-
name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration",
294-
initialNode: st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
293+
name: "Updating pod condition doesn't retry scheduling if the Pod was rejected by TaintToleration",
294+
initialNodes: []*v1.Node{st.MakeNode().Name("fake-node").Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()},
295295
pods: []*v1.Pod{
296296
// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
297297
st.MakePod().Name("pod1").Container("image").Obj(),
@@ -327,6 +327,41 @@ func TestCoreResourceEnqueue(t *testing.T) {
327327
// because QHint of TaintToleration would decide to ignore a Pod update.
328328
enableSchedulingQueueHint: []bool{true},
329329
},
330+
{
331+
// The test case makes sure that PreFilter plugins returning PreFilterResult are also inserted into pInfo.UnschedulablePlugins
332+
// meaning, they're taken into consideration during requeuing flow of the queue.
333+
// https://github.com/kubernetes/kubernetes/issues/122018
334+
name: "Pod rejected by the PreFilter of NodeAffinity plugin and Filter of NodeResourcesFit is requeued based on both plugins",
335+
initialNodes: []*v1.Node{
336+
st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
337+
st.MakeNode().Name("fake-node2").Label("node", "fake-node2").Label("zone", "zone1").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
338+
},
339+
pods: []*v1.Pod{
340+
// - Pod1 will be rejected by the NodeAffinity plugin and NodeResourcesFit plugin.
341+
st.MakePod().Label("unscheduled", "plugins").Name("pod1").NodeAffinityIn("metadata.name", []string{"fake-node"}).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
342+
},
343+
triggerFn: func(testCtx *testutils.TestContext) error {
344+
// Trigger a NodeDeleted event.
345+
// It will not requeue pod1 to activeQ,
346+
// because both NodeAffinity and NodeResourceFit don't register Node/delete event.
347+
err := testCtx.ClientSet.CoreV1().Nodes().Delete(testCtx.Ctx, "fake-node", metav1.DeleteOptions{})
348+
if err != nil {
349+
return fmt.Errorf("failed to delete fake-node node")
350+
}
351+
352+
// Trigger a NodeCreated event.
353+
// It will requeue pod1 to activeQ, because QHint of NodeAffinity return Queue.
354+
// It makes sure PreFilter plugins returned PreFilterResult takes an effect for sure,
355+
// because NodeResourceFit QHint returns QueueSkip for this event actually.
356+
node := st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj()
357+
if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
358+
return fmt.Errorf("failed to create a new node: %w", err)
359+
}
360+
361+
return nil
362+
},
363+
wantRequeuedPods: sets.New("pod1"),
364+
},
330365
}
331366

332367
for _, tt := range tests {
@@ -350,9 +385,11 @@ func TestCoreResourceEnqueue(t *testing.T) {
350385
defer testCtx.Scheduler.SchedulingQueue.Close()
351386

352387
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
353-
// Create initialNode.
354-
if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil {
355-
t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err)
388+
// Create one Node with a taint.
389+
for _, node := range tt.initialNodes {
390+
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
391+
t.Fatalf("Failed to create an initial Node %q: %v", node.Name, err)
392+
}
356393
}
357394

358395
if tt.initialPod != nil {

0 commit comments

Comments
 (0)