Skip to content

Commit eeb8a5b

Browse files
committed
Added back the flag to trigger examining all Filters in the scheduler.
1 parent 183a3e5 commit eeb8a5b

File tree

5 files changed

+109
-34
lines changed

5 files changed

+109
-34
lines changed

pkg/scheduler/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ type Configurator struct {
8989
// Disable pod preemption or not.
9090
disablePreemption bool
9191

92+
// Always check all predicates even if the middle of one predicate fails.
93+
alwaysCheckAllPredicates bool
94+
9295
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
9396
percentageOfNodesToScore int32
9497

@@ -202,6 +205,11 @@ func (c *Configurator) CreateFromConfig(policy schedulerapi.Policy) (*Scheduler,
202205
c.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
203206
}
204207

208+
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
209+
// predicates even after one or more of them fails.
210+
if policy.AlwaysCheckAllPredicates {
211+
c.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
212+
}
205213
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
206214
}
207215

@@ -250,6 +258,7 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
250258
framework.WithClientSet(c.client),
251259
framework.WithInformerFactory(c.informerFactory),
252260
framework.WithSnapshotSharedLister(c.nodeInfoSnapshot),
261+
framework.WithRunAllFilters(c.alwaysCheckAllPredicates),
253262
)
254263
if err != nil {
255264
klog.Fatalf("error initializing the scheduling framework: %v", err)

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ type framework struct {
7878
informerFactory informers.SharedInformerFactory
7979

8080
metricsRecorder *metricsRecorder
81+
82+
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
83+
// after the first failure.
84+
runAllFilters bool
8185
}
8286

8387
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
@@ -112,6 +116,7 @@ type frameworkOptions struct {
112116
informerFactory informers.SharedInformerFactory
113117
snapshotSharedLister schedulerlisters.SharedLister
114118
metricsRecorder *metricsRecorder
119+
runAllFilters bool
115120
}
116121

117122
// Option for the framework.
@@ -138,6 +143,14 @@ func WithSnapshotSharedLister(snapshotSharedLister schedulerlisters.SharedLister
138143
}
139144
}
140145

146+
// WithRunAllFilters sets the runAllFilters flag, which means RunFilterPlugins accumulates
147+
// all failure Statuses.
148+
func WithRunAllFilters(runAllFilters bool) Option {
149+
return func(o *frameworkOptions) {
150+
o.runAllFilters = runAllFilters
151+
}
152+
}
153+
141154
// withMetricsRecorder is only used in tests.
142155
func withMetricsRecorder(recorder *metricsRecorder) Option {
143156
return func(o *frameworkOptions) {
@@ -166,6 +179,7 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
166179
clientSet: options.clientSet,
167180
informerFactory: options.informerFactory,
168181
metricsRecorder: options.metricsRecorder,
182+
runAllFilters: options.runAllFilters,
169183
}
170184
if plugins == nil {
171185
return f, nil
@@ -395,27 +409,37 @@ func (f *framework) RunFilterPlugins(
395409
state *CycleState,
396410
pod *v1.Pod,
397411
nodeInfo *schedulernodeinfo.NodeInfo,
398-
) (status *Status) {
412+
) (finalStatus *Status) {
399413
if state.ShouldRecordFrameworkMetrics() {
400414
startTime := time.Now()
401415
defer func() {
402-
f.metricsRecorder.observeExtensionPointDurationAsync(filter, status, metrics.SinceInSeconds(startTime))
416+
f.metricsRecorder.observeExtensionPointDurationAsync(filter, finalStatus, metrics.SinceInSeconds(startTime))
403417
}()
404418
}
405419
for _, pl := range f.filterPlugins {
406-
status = f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
407-
if !status.IsSuccess() {
408-
if !status.IsUnschedulable() {
409-
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
410-
pl.Name(), pod.Name, status.Message())
411-
klog.Error(errMsg)
412-
return NewStatus(Error, errMsg)
420+
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
421+
if !pluginStatus.IsSuccess() {
422+
if !pluginStatus.IsUnschedulable() {
423+
// Filter plugins are not supposed to return any status other than
424+
// Success or Unschedulable.
425+
return NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message()))
426+
}
427+
if !f.runAllFilters {
428+
// Exit early if we don't need to run all filters.
429+
return pluginStatus
413430
}
414-
return status
431+
// We need to continue and run all filters.
432+
if finalStatus.IsSuccess() {
433+
// This is the first failed plugin.
434+
finalStatus = pluginStatus
435+
continue
436+
}
437+
// We get here only if more than one Filter return unschedulable and runAllFilters is true.
438+
finalStatus.reasons = append(finalStatus.reasons, pluginStatus.reasons...)
415439
}
416440
}
417441

418-
return nil
442+
return finalStatus
419443
}
420444

421445
func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {

pkg/scheduler/framework/v1alpha1/framework_test.go

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (pl *TestPlugin) PreFilterExtensions() PreFilterExtensions {
165165
}
166166

167167
func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status {
168-
return NewStatus(Code(pl.inj.FilterStatus), "injected status")
168+
return NewStatus(Code(pl.inj.FilterStatus), "injected filter status")
169169
}
170170

171171
func (pl *TestPlugin) PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status {
@@ -598,9 +598,10 @@ func TestPreFilterPlugins(t *testing.T) {
598598

599599
func TestFilterPlugins(t *testing.T) {
600600
tests := []struct {
601-
name string
602-
plugins []*TestPlugin
603-
wantCode Code
601+
name string
602+
plugins []*TestPlugin
603+
wantStatus *Status
604+
runAllFilters bool
604605
}{
605606
{
606607
name: "SuccessFilter",
@@ -610,7 +611,7 @@ func TestFilterPlugins(t *testing.T) {
610611
inj: injectedResult{FilterStatus: int(Success)},
611612
},
612613
},
613-
wantCode: Success,
614+
wantStatus: nil,
614615
},
615616
{
616617
name: "ErrorFilter",
@@ -620,7 +621,7 @@ func TestFilterPlugins(t *testing.T) {
620621
inj: injectedResult{FilterStatus: int(Error)},
621622
},
622623
},
623-
wantCode: Error,
624+
wantStatus: NewStatus(Error, `running "TestPlugin" filter plugin for pod "": injected filter status`),
624625
},
625626
{
626627
name: "UnschedulableFilter",
@@ -630,7 +631,7 @@ func TestFilterPlugins(t *testing.T) {
630631
inj: injectedResult{FilterStatus: int(Unschedulable)},
631632
},
632633
},
633-
wantCode: Unschedulable,
634+
wantStatus: NewStatus(Unschedulable, "injected filter status"),
634635
},
635636
{
636637
name: "UnschedulableAndUnresolvableFilter",
@@ -641,7 +642,7 @@ func TestFilterPlugins(t *testing.T) {
641642
FilterStatus: int(UnschedulableAndUnresolvable)},
642643
},
643644
},
644-
wantCode: UnschedulableAndUnresolvable,
645+
wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status"),
645646
},
646647
// followings tests cover multiple-plugins scenarios
647648
{
@@ -657,7 +658,7 @@ func TestFilterPlugins(t *testing.T) {
657658
inj: injectedResult{FilterStatus: int(Error)},
658659
},
659660
},
660-
wantCode: Error,
661+
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
661662
},
662663
{
663664
name: "SuccessAndSuccessFilters",
@@ -672,7 +673,7 @@ func TestFilterPlugins(t *testing.T) {
672673
inj: injectedResult{FilterStatus: int(Success)},
673674
},
674675
},
675-
wantCode: Success,
676+
wantStatus: nil,
676677
},
677678
{
678679
name: "ErrorAndSuccessFilters",
@@ -686,7 +687,7 @@ func TestFilterPlugins(t *testing.T) {
686687
inj: injectedResult{FilterStatus: int(Success)},
687688
},
688689
},
689-
wantCode: Error,
690+
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
690691
},
691692
{
692693
name: "SuccessAndErrorFilters",
@@ -701,7 +702,7 @@ func TestFilterPlugins(t *testing.T) {
701702
inj: injectedResult{FilterStatus: int(Error)},
702703
},
703704
},
704-
wantCode: Error,
705+
wantStatus: NewStatus(Error, `running "TestPlugin2" filter plugin for pod "": injected filter status`),
705706
},
706707
{
707708
name: "SuccessAndUnschedulableFilters",
@@ -716,7 +717,50 @@ func TestFilterPlugins(t *testing.T) {
716717
inj: injectedResult{FilterStatus: int(Unschedulable)},
717718
},
718719
},
719-
wantCode: Unschedulable,
720+
wantStatus: NewStatus(Unschedulable, "injected filter status"),
721+
},
722+
{
723+
name: "SuccessFilterWithRunAllFilters",
724+
plugins: []*TestPlugin{
725+
{
726+
name: "TestPlugin",
727+
inj: injectedResult{FilterStatus: int(Success)},
728+
},
729+
},
730+
runAllFilters: true,
731+
wantStatus: nil,
732+
},
733+
{
734+
name: "ErrorAndErrorFilters",
735+
plugins: []*TestPlugin{
736+
{
737+
name: "TestPlugin1",
738+
inj: injectedResult{FilterStatus: int(Error)},
739+
},
740+
741+
{
742+
name: "TestPlugin2",
743+
inj: injectedResult{FilterStatus: int(Error)},
744+
},
745+
},
746+
runAllFilters: true,
747+
wantStatus: NewStatus(Error, `running "TestPlugin1" filter plugin for pod "": injected filter status`),
748+
},
749+
{
750+
name: "ErrorAndErrorFilters",
751+
plugins: []*TestPlugin{
752+
{
753+
name: "TestPlugin1",
754+
inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)},
755+
},
756+
757+
{
758+
name: "TestPlugin2",
759+
inj: injectedResult{FilterStatus: int(Unschedulable)},
760+
},
761+
},
762+
runAllFilters: true,
763+
wantStatus: NewStatus(UnschedulableAndUnresolvable, "injected filter status", "injected filter status"),
720764
},
721765
}
722766

@@ -739,13 +783,13 @@ func TestFilterPlugins(t *testing.T) {
739783
config.Plugin{Name: pl.name})
740784
}
741785

742-
f, err := NewFramework(registry, cfgPls, emptyArgs)
786+
f, err := NewFramework(registry, cfgPls, emptyArgs, WithRunAllFilters(tt.runAllFilters))
743787
if err != nil {
744788
t.Fatalf("fail to create framework: %s", err)
745789
}
746790
status := f.RunFilterPlugins(context.TODO(), nil, pod, nil)
747-
if status.Code() != tt.wantCode {
748-
t.Errorf("Wrong status code. got: %v, want:%v", status.Code(), tt.wantCode)
791+
if !reflect.DeepEqual(status, tt.wantStatus) {
792+
t.Errorf("Wrong status code. got: %v, want:%v", status, tt.wantStatus)
749793
}
750794
})
751795
}

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,12 +379,10 @@ type Framework interface {
379379
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
380380

381381
// RunFilterPlugins runs the set of configured filter plugins for pod on
382-
// the given node. It returns directly if any of the filter plugins
383-
// return any status other than "Success". Note that for the node being
384-
// evaluated, the passed nodeInfo reference could be different from the
385-
// one in NodeInfoSnapshot map (e.g., pods considered to be running on
386-
// the node could be different). For example, during preemption, we may
387-
// pass a copy of the original nodeInfo object that has some pods
382+
// the given node. Note that for the node being evaluated, the passed nodeInfo
383+
// reference could be different from the one in NodeInfoSnapshot map (e.g., pods
384+
// considered to be running on the node could be different). For example, during
385+
// preemption, we may pass a copy of the original nodeInfo object that has some pods
388386
// removed from it to evaluate the possibility of preempting them to
389387
// schedule the target pod.
390388
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status

pkg/scheduler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
897897
FindErr: findErr,
898898
},
899899
eventReason: "FailedScheduling",
900-
expectError: fmt.Errorf("error while running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
900+
expectError: fmt.Errorf("running %q filter plugin for pod %q: %v", volumebinding.Name, "foo", findErr),
901901
},
902902
{
903903
name: "assume error",

0 commit comments

Comments
 (0)