Skip to content

Commit fbc9cf0

Browse files
authored
Merge pull request kubernetes#92797 from ahg-g/ahg-prefilter
Return a FitError when PreFilter fails with unschedulable status
2 parents 26da0ea + c98dee4 commit fbc9cf0

File tree

5 files changed

+118
-35
lines changed

5 files changed

+118
-35
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,13 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
154154
}
155155

156156
startPredicateEvalTime := time.Now()
157-
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
157+
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
158158
if err != nil {
159159
return result, err
160160
}
161161
trace.Step("Computing predicates done")
162162

163-
if len(filteredNodes) == 0 {
163+
if len(feasibleNodes) == 0 {
164164
return result, &FitError{
165165
Pod: pod,
166166
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
@@ -173,16 +173,16 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
173173

174174
startPriorityEvalTime := time.Now()
175175
// When only one node after predicate, just use it.
176-
if len(filteredNodes) == 1 {
176+
if len(feasibleNodes) == 1 {
177177
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
178178
return ScheduleResult{
179-
SuggestedHost: filteredNodes[0].Name,
179+
SuggestedHost: feasibleNodes[0].Name,
180180
EvaluatedNodes: 1 + len(filteredNodesStatuses),
181181
FeasibleNodes: 1,
182182
}, nil
183183
}
184184

185-
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
185+
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
186186
if err != nil {
187187
return result, err
188188
}
@@ -195,8 +195,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
195195

196196
return ScheduleResult{
197197
SuggestedHost: host,
198-
EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
199-
FeasibleNodes: len(filteredNodes),
198+
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
199+
FeasibleNodes: len(feasibleNodes),
200200
}, err
201201
}
202202

@@ -256,23 +256,37 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
256256
// Filters the nodes to find the ones that fit the pod based on the framework
257257
// filter plugins and filter extenders.
258258
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
259+
filteredNodesStatuses := make(framework.NodeToStatusMap)
260+
259261
// Run "prefilter" plugins.
260262
s := prof.RunPreFilterPlugins(ctx, state, pod)
261263
if !s.IsSuccess() {
262-
return nil, nil, s.AsError()
264+
if !s.IsUnschedulable() {
265+
return nil, nil, s.AsError()
266+
}
267+
// All nodes will have the same status. Some non trivial refactoring is
268+
// needed to avoid this copy.
269+
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
270+
if err != nil {
271+
return nil, nil, err
272+
}
273+
for _, n := range allNodes {
274+
filteredNodesStatuses[n.Node().Name] = s
275+
}
276+
return nil, filteredNodesStatuses, nil
277+
263278
}
264279

265-
filteredNodesStatuses := make(framework.NodeToStatusMap)
266-
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
280+
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
267281
if err != nil {
268282
return nil, nil, err
269283
}
270284

271-
filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
285+
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
272286
if err != nil {
273287
return nil, nil, err
274288
}
275-
return filtered, filteredNodesStatuses, nil
289+
return feasibleNodes, filteredNodesStatuses, nil
276290
}
277291

278292
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
@@ -284,22 +298,22 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
284298

285299
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
286300

287-
// Create filtered list with enough space to avoid growing it
301+
// Create feasible list with enough space to avoid growing it
288302
// and allow assigning.
289-
filtered := make([]*v1.Node, numNodesToFind)
303+
feasibleNodes := make([]*v1.Node, numNodesToFind)
290304

291305
if !prof.HasFilterPlugins() {
292306
length := len(allNodes)
293-
for i := range filtered {
294-
filtered[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
307+
for i := range feasibleNodes {
308+
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
295309
}
296-
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % length
297-
return filtered, nil
310+
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
311+
return feasibleNodes, nil
298312
}
299313

300314
errCh := parallelize.NewErrorChannel()
301315
var statusesLock sync.Mutex
302-
var filteredLen int32
316+
var feasibleNodesLen int32
303317
ctx, cancel := context.WithCancel(ctx)
304318
checkNode := func(i int) {
305319
// We check the nodes starting from where we left off in the previous scheduling cycle,
@@ -311,12 +325,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
311325
return
312326
}
313327
if fits {
314-
length := atomic.AddInt32(&filteredLen, 1)
328+
length := atomic.AddInt32(&feasibleNodesLen, 1)
315329
if length > numNodesToFind {
316330
cancel()
317-
atomic.AddInt32(&filteredLen, -1)
331+
atomic.AddInt32(&feasibleNodesLen, -1)
318332
} else {
319-
filtered[length-1] = nodeInfo.Node()
333+
feasibleNodes[length-1] = nodeInfo.Node()
320334
}
321335
} else {
322336
statusesLock.Lock()
@@ -339,26 +353,26 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
339353
// Stops searching for more nodes once the configured number of feasible nodes
340354
// are found.
341355
parallelize.Until(ctx, len(allNodes), checkNode)
342-
processedNodes := int(filteredLen) + len(statuses)
356+
processedNodes := int(feasibleNodesLen) + len(statuses)
343357
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
344358

345-
filtered = filtered[:filteredLen]
359+
feasibleNodes = feasibleNodes[:feasibleNodesLen]
346360
if err := errCh.ReceiveError(); err != nil {
347361
statusCode = framework.Error
348362
return nil, err
349363
}
350-
return filtered, nil
364+
return feasibleNodes, nil
351365
}
352366

353-
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
367+
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
354368
for _, extender := range g.extenders {
355-
if len(filtered) == 0 {
369+
if len(feasibleNodes) == 0 {
356370
break
357371
}
358372
if !extender.IsInterested(pod) {
359373
continue
360374
}
361-
filteredList, failedMap, err := extender.Filter(pod, filtered)
375+
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)
362376
if err != nil {
363377
if extender.IsIgnorable() {
364378
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@@ -375,9 +389,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
375389
statuses[failedNodeName].AppendReason(failedMsg)
376390
}
377391
}
378-
filtered = filteredList
392+
feasibleNodes = feasibleList
379393
}
380-
return filtered, nil
394+
return feasibleNodes, nil
381395
}
382396

383397
// addNominatedPods adds pods with equal or greater priority which are nominated

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,43 @@ func TestGenericScheduler(t *testing.T) {
677677
expectedHosts: nil,
678678
wErr: nil,
679679
},
680+
{
681+
name: "test prefilter plugin returning Unschedulable status",
682+
registerPlugins: []st.RegisterPluginFunc{
683+
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
684+
st.RegisterPreFilterPlugin(
685+
"FakePreFilter",
686+
st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
687+
),
688+
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
689+
},
690+
nodes: []string{"1", "2"},
691+
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
692+
expectedHosts: nil,
693+
wErr: &FitError{
694+
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
695+
NumAllNodes: 2,
696+
FilteredNodesStatuses: framework.NodeToStatusMap{
697+
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
698+
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
699+
},
700+
},
701+
},
702+
{
703+
name: "test prefilter plugin returning error status",
704+
registerPlugins: []st.RegisterPluginFunc{
705+
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
706+
st.RegisterPreFilterPlugin(
707+
"FakePreFilter",
708+
st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")),
709+
),
710+
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
711+
},
712+
nodes: []string{"1", "2"},
713+
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
714+
expectedHosts: nil,
715+
wErr: fmt.Errorf(`prefilter plugin "FakePreFilter" failed for pod "test-prefilter": injected error status`),
716+
},
680717
}
681718
for _, test := range tests {
682719
t.Run(test.name, func(t *testing.T) {
@@ -717,7 +754,7 @@ func TestGenericScheduler(t *testing.T) {
717754
schedulerapi.DefaultPercentageOfNodesToScore)
718755
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
719756
if !reflect.DeepEqual(err, test.wErr) {
720-
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
757+
t.Errorf("want: %v, got: %v", test.wErr, err)
721758
}
722759
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
723760
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)

pkg/scheduler/framework/runtime/framework.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,9 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
395395
status = f.runPreFilterPlugin(ctx, pl, state, pod)
396396
if !status.IsSuccess() {
397397
if status.IsUnschedulable() {
398-
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
399-
klog.V(4).Infof(msg)
400-
return framework.NewStatus(status.Code(), msg)
398+
return status
401399
}
402-
msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
400+
msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message())
403401
klog.Error(msg)
404402
return framework.NewStatus(framework.Error, msg)
405403
}

pkg/scheduler/testing/fake_plugins.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,32 @@ func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState,
123123
func NewMatchFilterPlugin(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
124124
return &MatchFilterPlugin{}, nil
125125
}
126+
127+
// FakePreFilterPlugin is a test filter plugin.
128+
type FakePreFilterPlugin struct {
129+
Status *framework.Status
130+
}
131+
132+
// Name returns name of the plugin.
133+
func (pl *FakePreFilterPlugin) Name() string {
134+
return "FakePreFilter"
135+
}
136+
137+
// PreFilter invoked at the PreFilter extension point.
138+
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status {
139+
return pl.Status
140+
}
141+
142+
// PreFilterExtensions no extensions implemented by this plugin.
143+
func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
144+
return nil
145+
}
146+
147+
// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it.
148+
func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory {
149+
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
150+
return &FakePreFilterPlugin{
151+
Status: status,
152+
}, nil
153+
}
154+
}

pkg/scheduler/testing/framework_helpers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ func RegisterQueueSortPlugin(pluginName string, pluginNewFunc runtime.PluginFact
4242
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "QueueSort")
4343
}
4444

45+
// RegisterPreFilterPlugin returns a function to register a PreFilter Plugin to a given registry.
46+
func RegisterPreFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
47+
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreFilter")
48+
}
49+
4550
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
4651
func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
4752
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")

0 commit comments

Comments
 (0)