Skip to content

Commit c98dee4

Browse files
committed
Return a FitError when PreFilter fails with unschedulable status
1 parent a472138 commit c98dee4

File tree

5 files changed

+118
-35
lines changed

5 files changed

+118
-35
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,13 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
151151
}
152152

153153
startPredicateEvalTime := time.Now()
154-
filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
154+
feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
155155
if err != nil {
156156
return result, err
157157
}
158158
trace.Step("Computing predicates done")
159159

160-
if len(filteredNodes) == 0 {
160+
if len(feasibleNodes) == 0 {
161161
return result, &FitError{
162162
Pod: pod,
163163
NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
@@ -170,16 +170,16 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
170170

171171
startPriorityEvalTime := time.Now()
172172
// When only one node after predicate, just use it.
173-
if len(filteredNodes) == 1 {
173+
if len(feasibleNodes) == 1 {
174174
metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime))
175175
return ScheduleResult{
176-
SuggestedHost: filteredNodes[0].Name,
176+
SuggestedHost: feasibleNodes[0].Name,
177177
EvaluatedNodes: 1 + len(filteredNodesStatuses),
178178
FeasibleNodes: 1,
179179
}, nil
180180
}
181181

182-
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes)
182+
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
183183
if err != nil {
184184
return result, err
185185
}
@@ -192,8 +192,8 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
192192

193193
return ScheduleResult{
194194
SuggestedHost: host,
195-
EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses),
196-
FeasibleNodes: len(filteredNodes),
195+
EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
196+
FeasibleNodes: len(feasibleNodes),
197197
}, err
198198
}
199199

@@ -253,23 +253,37 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
253253
// Filters the nodes to find the ones that fit the pod based on the framework
254254
// filter plugins and filter extenders.
255255
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
256+
filteredNodesStatuses := make(framework.NodeToStatusMap)
257+
256258
// Run "prefilter" plugins.
257259
s := prof.RunPreFilterPlugins(ctx, state, pod)
258260
if !s.IsSuccess() {
259-
return nil, nil, s.AsError()
261+
if !s.IsUnschedulable() {
262+
return nil, nil, s.AsError()
263+
}
264+
// All nodes will have the same status. Some non trivial refactoring is
265+
// needed to avoid this copy.
266+
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
267+
if err != nil {
268+
return nil, nil, err
269+
}
270+
for _, n := range allNodes {
271+
filteredNodesStatuses[n.Node().Name] = s
272+
}
273+
return nil, filteredNodesStatuses, nil
274+
260275
}
261276

262-
filteredNodesStatuses := make(framework.NodeToStatusMap)
263-
filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
277+
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
264278
if err != nil {
265279
return nil, nil, err
266280
}
267281

268-
filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
282+
feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
269283
if err != nil {
270284
return nil, nil, err
271285
}
272-
return filtered, filteredNodesStatuses, nil
286+
return feasibleNodes, filteredNodesStatuses, nil
273287
}
274288

275289
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
@@ -281,22 +295,22 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
281295

282296
numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))
283297

284-
// Create filtered list with enough space to avoid growing it
298+
// Create feasible list with enough space to avoid growing it
285299
// and allow assigning.
286-
filtered := make([]*v1.Node, numNodesToFind)
300+
feasibleNodes := make([]*v1.Node, numNodesToFind)
287301

288302
if !prof.HasFilterPlugins() {
289303
length := len(allNodes)
290-
for i := range filtered {
291-
filtered[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
304+
for i := range feasibleNodes {
305+
feasibleNodes[i] = allNodes[(g.nextStartNodeIndex+i)%length].Node()
292306
}
293-
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % length
294-
return filtered, nil
307+
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
308+
return feasibleNodes, nil
295309
}
296310

297311
errCh := parallelize.NewErrorChannel()
298312
var statusesLock sync.Mutex
299-
var filteredLen int32
313+
var feasibleNodesLen int32
300314
ctx, cancel := context.WithCancel(ctx)
301315
checkNode := func(i int) {
302316
// We check the nodes starting from where we left off in the previous scheduling cycle,
@@ -308,12 +322,12 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
308322
return
309323
}
310324
if fits {
311-
length := atomic.AddInt32(&filteredLen, 1)
325+
length := atomic.AddInt32(&feasibleNodesLen, 1)
312326
if length > numNodesToFind {
313327
cancel()
314-
atomic.AddInt32(&filteredLen, -1)
328+
atomic.AddInt32(&feasibleNodesLen, -1)
315329
} else {
316-
filtered[length-1] = nodeInfo.Node()
330+
feasibleNodes[length-1] = nodeInfo.Node()
317331
}
318332
} else {
319333
statusesLock.Lock()
@@ -336,26 +350,26 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
336350
// Stops searching for more nodes once the configured number of feasible nodes
337351
// are found.
338352
parallelize.Until(ctx, len(allNodes), checkNode)
339-
processedNodes := int(filteredLen) + len(statuses)
353+
processedNodes := int(feasibleNodesLen) + len(statuses)
340354
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)
341355

342-
filtered = filtered[:filteredLen]
356+
feasibleNodes = feasibleNodes[:feasibleNodesLen]
343357
if err := errCh.ReceiveError(); err != nil {
344358
statusCode = framework.Error
345359
return nil, err
346360
}
347-
return filtered, nil
361+
return feasibleNodes, nil
348362
}
349363

350-
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
364+
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
351365
for _, extender := range g.extenders {
352-
if len(filtered) == 0 {
366+
if len(feasibleNodes) == 0 {
353367
break
354368
}
355369
if !extender.IsInterested(pod) {
356370
continue
357371
}
358-
filteredList, failedMap, err := extender.Filter(pod, filtered)
372+
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)
359373
if err != nil {
360374
if extender.IsIgnorable() {
361375
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@@ -372,9 +386,9 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
372386
statuses[failedNodeName].AppendReason(failedMsg)
373387
}
374388
}
375-
filtered = filteredList
389+
feasibleNodes = feasibleList
376390
}
377-
return filtered, nil
391+
return feasibleNodes, nil
378392
}
379393

380394
// addNominatedPods adds pods with equal or greater priority which are nominated

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,43 @@ func TestGenericScheduler(t *testing.T) {
677677
expectedHosts: nil,
678678
wErr: nil,
679679
},
680+
{
681+
name: "test prefilter plugin returning Unschedulable status",
682+
registerPlugins: []st.RegisterPluginFunc{
683+
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
684+
st.RegisterPreFilterPlugin(
685+
"FakePreFilter",
686+
st.NewFakePreFilterPlugin(framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
687+
),
688+
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
689+
},
690+
nodes: []string{"1", "2"},
691+
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
692+
expectedHosts: nil,
693+
wErr: &FitError{
694+
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
695+
NumAllNodes: 2,
696+
FilteredNodesStatuses: framework.NodeToStatusMap{
697+
"1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
698+
"2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status"),
699+
},
700+
},
701+
},
702+
{
703+
name: "test prefilter plugin returning error status",
704+
registerPlugins: []st.RegisterPluginFunc{
705+
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
706+
st.RegisterPreFilterPlugin(
707+
"FakePreFilter",
708+
st.NewFakePreFilterPlugin(framework.NewStatus(framework.Error, "injected error status")),
709+
),
710+
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
711+
},
712+
nodes: []string{"1", "2"},
713+
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-prefilter", UID: types.UID("test-prefilter")}},
714+
expectedHosts: nil,
715+
wErr: fmt.Errorf(`prefilter plugin "FakePreFilter" failed for pod "test-prefilter": injected error status`),
716+
},
680717
}
681718
for _, test := range tests {
682719
t.Run(test.name, func(t *testing.T) {
@@ -717,7 +754,7 @@ func TestGenericScheduler(t *testing.T) {
717754
schedulerapi.DefaultPercentageOfNodesToScore)
718755
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
719756
if !reflect.DeepEqual(err, test.wErr) {
720-
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
757+
t.Errorf("want: %v, got: %v", test.wErr, err)
721758
}
722759
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
723760
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)

pkg/scheduler/framework/runtime/framework.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,11 +395,9 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
395395
status = f.runPreFilterPlugin(ctx, pl, state, pod)
396396
if !status.IsSuccess() {
397397
if status.IsUnschedulable() {
398-
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
399-
klog.V(4).Infof(msg)
400-
return framework.NewStatus(status.Code(), msg)
398+
return status
401399
}
402-
msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
400+
msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message())
403401
klog.Error(msg)
404402
return framework.NewStatus(framework.Error, msg)
405403
}

pkg/scheduler/testing/fake_plugins.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,32 @@ func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState,
123123
func NewMatchFilterPlugin(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
124124
return &MatchFilterPlugin{}, nil
125125
}
126+
127+
// FakePreFilterPlugin is a test filter plugin.
128+
type FakePreFilterPlugin struct {
129+
Status *framework.Status
130+
}
131+
132+
// Name returns name of the plugin.
133+
func (pl *FakePreFilterPlugin) Name() string {
134+
return "FakePreFilter"
135+
}
136+
137+
// PreFilter invoked at the PreFilter extension point.
138+
func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status {
139+
return pl.Status
140+
}
141+
142+
// PreFilterExtensions no extensions implemented by this plugin.
143+
func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
144+
return nil
145+
}
146+
147+
// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it.
148+
func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory {
149+
return func(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
150+
return &FakePreFilterPlugin{
151+
Status: status,
152+
}, nil
153+
}
154+
}

pkg/scheduler/testing/framework_helpers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ func RegisterQueueSortPlugin(pluginName string, pluginNewFunc runtime.PluginFact
4242
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "QueueSort")
4343
}
4444

45+
// RegisterPreFilterPlugin returns a function to register a PreFilter Plugin to a given registry.
46+
func RegisterPreFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
47+
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreFilter")
48+
}
49+
4550
// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry.
4651
func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc {
4752
return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter")

0 commit comments

Comments
 (0)