Skip to content

Commit 69b9ba6

Browse files
committed
Introduce PostFilter extension point
1 parent b8b4186 commit 69b9ba6

File tree

5 files changed

+308
-1
lines changed

5 files changed

+308
-1
lines changed

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const (
4444
preFilter = "PreFilter"
4545
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
4646
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
47+
postFilter = "PostFilter"
4748
preScore = "PreScore"
4849
score = "Score"
4950
scoreExtensionNormalize = "ScoreExtensionNormalize"
@@ -67,6 +68,7 @@ type framework struct {
6768
queueSortPlugins []QueueSortPlugin
6869
preFilterPlugins []PreFilterPlugin
6970
filterPlugins []FilterPlugin
71+
postFilterPlugins []PostFilterPlugin
7072
preScorePlugins []PreScorePlugin
7173
scorePlugins []ScorePlugin
7274
reservePlugins []ReservePlugin
@@ -103,6 +105,7 @@ func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint
103105
return []extensionPoint{
104106
{plugins.PreFilter, &f.preFilterPlugins},
105107
{plugins.Filter, &f.filterPlugins},
108+
{plugins.PostFilter, &f.postFilterPlugins},
106109
{plugins.Reserve, &f.reservePlugins},
107110
{plugins.PreScore, &f.preScorePlugins},
108111
{plugins.Score, &f.scorePlugins},
@@ -508,6 +511,33 @@ func (f *framework) runFilterPlugin(ctx context.Context, pl FilterPlugin, state
508511
return status
509512
}
510513

514+
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
515+
// Success or Error is met, otherwise continues to execute all plugins.
516+
func (f *framework) RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
517+
statuses := make(PluginToStatus)
518+
for _, pl := range f.postFilterPlugins {
519+
r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
520+
if s.IsSuccess() {
521+
return r, s
522+
} else if !s.IsUnschedulable() {
523+
// Any status other than Success or Unschedulable is Error.
524+
return nil, NewStatus(Error, s.Message())
525+
}
526+
statuses[pl.Name()] = s
527+
}
528+
return nil, statuses.Merge()
529+
}
530+
531+
func (f *framework) runPostFilterPlugin(ctx context.Context, pl PostFilterPlugin, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status) {
532+
if !state.ShouldRecordPluginMetrics() {
533+
return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
534+
}
535+
startTime := time.Now()
536+
r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
537+
f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime))
538+
return r, s
539+
}
540+
511541
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
512542
// of these plugins returns any status other than "Success", the given pod is rejected.
513543
func (f *framework) RunPreScorePlugins(
@@ -957,3 +987,8 @@ func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plu
957987
}
958988
return pgMap
959989
}
990+
991+
// PreemptHandle returns the internal preemptHandle object.
992+
func (f *framework) PreemptHandle() PreemptHandle {
993+
return f.preemptHandle
994+
}

pkg/scheduler/framework/v1alpha1/framework_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ func (pl *TestPlugin) Filter(ctx context.Context, state *CycleState, pod *v1.Pod
169169
return NewStatus(Code(pl.inj.FilterStatus), "injected filter status")
170170
}
171171

172+
func (pl *TestPlugin) PostFilter(_ context.Context, _ *CycleState, _ *v1.Pod, _ NodeToStatusMap) (*PostFilterResult, *Status) {
173+
return nil, NewStatus(Code(pl.inj.PostFilterStatus), "injected status")
174+
}
175+
172176
func (pl *TestPlugin) PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status {
173177
return NewStatus(Code(pl.inj.PreScoreStatus), "injected status")
174178
}
@@ -1000,7 +1004,6 @@ func TestFilterPlugins(t *testing.T) {
10001004
name: "TestPlugin1",
10011005
inj: injectedResult{FilterStatus: int(UnschedulableAndUnresolvable)},
10021006
},
1003-
10041007
{
10051008
name: "TestPlugin2",
10061009
inj: injectedResult{FilterStatus: int(Unschedulable)},
@@ -1051,6 +1054,84 @@ func TestFilterPlugins(t *testing.T) {
10511054
}
10521055
}
10531056

1057+
func TestPostFilterPlugins(t *testing.T) {
1058+
tests := []struct {
1059+
name string
1060+
plugins []*TestPlugin
1061+
wantStatus *Status
1062+
}{
1063+
{
1064+
name: "a single plugin makes a Pod schedulable",
1065+
plugins: []*TestPlugin{
1066+
{
1067+
name: "TestPlugin",
1068+
inj: injectedResult{PostFilterStatus: int(Success)},
1069+
},
1070+
},
1071+
wantStatus: NewStatus(Success, "injected status"),
1072+
},
1073+
{
1074+
name: "plugin1 failed to make a Pod schedulable, followed by plugin2 which makes the Pod schedulable",
1075+
plugins: []*TestPlugin{
1076+
{
1077+
name: "TestPlugin1",
1078+
inj: injectedResult{PostFilterStatus: int(Unschedulable)},
1079+
},
1080+
{
1081+
name: "TestPlugin2",
1082+
inj: injectedResult{PostFilterStatus: int(Success)},
1083+
},
1084+
},
1085+
wantStatus: NewStatus(Success, "injected status"),
1086+
},
1087+
{
1088+
name: "plugin1 makes a Pod schedulable, followed by plugin2 which cannot make the Pod schedulable",
1089+
plugins: []*TestPlugin{
1090+
{
1091+
name: "TestPlugin1",
1092+
inj: injectedResult{PostFilterStatus: int(Success)},
1093+
},
1094+
{
1095+
name: "TestPlugin2",
1096+
inj: injectedResult{PostFilterStatus: int(Unschedulable)},
1097+
},
1098+
},
1099+
wantStatus: NewStatus(Success, "injected status"),
1100+
},
1101+
}
1102+
1103+
for _, tt := range tests {
1104+
t.Run(tt.name, func(t *testing.T) {
1105+
registry := Registry{}
1106+
cfgPls := &config.Plugins{PostFilter: &config.PluginSet{}}
1107+
for _, pl := range tt.plugins {
1108+
// register all plugins
1109+
tmpPl := pl
1110+
if err := registry.Register(pl.name,
1111+
func(_ runtime.Object, _ FrameworkHandle) (Plugin, error) {
1112+
return tmpPl, nil
1113+
}); err != nil {
1114+
t.Fatalf("fail to register postFilter plugin (%s)", pl.name)
1115+
}
1116+
// append plugins to filter pluginset
1117+
cfgPls.PostFilter.Enabled = append(
1118+
cfgPls.PostFilter.Enabled,
1119+
config.Plugin{Name: pl.name},
1120+
)
1121+
}
1122+
1123+
f, err := newFrameworkWithQueueSortAndBind(registry, cfgPls, emptyArgs)
1124+
if err != nil {
1125+
t.Fatalf("fail to create framework: %s", err)
1126+
}
1127+
_, gotStatus := f.RunPostFilterPlugins(context.TODO(), nil, pod, nil)
1128+
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
1129+
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
1130+
}
1131+
})
1132+
}
1133+
}
1134+
10541135
func TestPreBindPlugins(t *testing.T) {
10551136
tests := []struct {
10561137
name string
@@ -1932,6 +2013,7 @@ type injectedResult struct {
19322013
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
19332014
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
19342015
FilterStatus int `json:"filterStatus,omitempty"`
2016+
PostFilterStatus int `json:"postFilterStatus,omitempty"`
19352017
PreScoreStatus int `json:"preScoreStatus,omitempty"`
19362018
ReserveStatus int `json:"reserveStatus,omitempty"`
19372019
PreBindStatus int `json:"preBindStatus,omitempty"`

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,23 @@ type FilterPlugin interface {
273273
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
274274
}
275275

276+
// PostFilterPlugin is an interface for PostFilter plugins. These plugins are called
277+
// after a pod cannot be scheduled.
278+
type PostFilterPlugin interface {
279+
Plugin
280+
// PostFilter is called by the scheduling framework.
281+
// A PostFilter plugin should return one of the following statuses:
282+
// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
283+
// - Success: the plugin gets executed successfully and the pod can be made schedulable.
284+
// - Error: the plugin aborts due to some internal error.
285+
//
286+
// Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
287+
// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
288+
// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
289+
// preemptor pod's .spec.status.nominatedNodeName field.
290+
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
291+
}
292+
276293
// PreScorePlugin is an interface for Pre-score plugin. Pre-score is an
277294
// informational extension point. Plugins will be called with a list of nodes
278295
// that passed the filtering phase. A plugin may use this data to update internal
@@ -398,6 +415,12 @@ type Framework interface {
398415
// schedule the target pod.
399416
RunFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) PluginToStatus
400417

418+
// RunPostFilterPlugins runs the set of configured PostFilter plugins.
419+
// PostFilter plugins can either be informational, in which case should be configured
420+
// to execute first and return Unschedulable status, or ones that try to change the
421+
// cluster state to make the pod potentially schedulable in a future scheduling cycle.
422+
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
423+
401424
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
402425
// PreFilter plugins. It returns directly if any of the plugins return any
403426
// status other than Success.
@@ -490,6 +513,14 @@ type FrameworkHandle interface {
490513
ClientSet() clientset.Interface
491514

492515
SharedInformerFactory() informers.SharedInformerFactory
516+
517+
// TODO: unroll the wrapped interfaces to FrameworkHandle.
518+
PreemptHandle() PreemptHandle
519+
}
520+
521+
// PostFilterResult wraps needed info for scheduler framework to act upon PostFilter phase.
522+
type PostFilterResult struct {
523+
NominatedNodeName string
493524
}
494525

495526
// PreemptHandle incorporates all needed logic to run preemption logic.

pkg/scheduler/scheduler.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,22 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
545545
" No preemption is performed.")
546546
} else {
547547
preemptionStartTime := time.Now()
548+
// TODO(Huang-Wei): implement the preemption logic as a PostFilter plugin.
548549
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
549550
metrics.PreemptionAttempts.Inc()
550551
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
551552
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
553+
554+
// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.
555+
result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
556+
if status.Code() == framework.Error {
557+
klog.Errorf("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
558+
} else {
559+
klog.V(5).Infof("Status after running PostFilter plugins for pod %v/%v: %v", pod.Namespace, pod.Name, status)
560+
}
561+
if status.IsSuccess() && result != nil {
562+
nominatedNode = result.NominatedNodeName
563+
}
552564
}
553565
// Pod did not fit anywhere, so it is counted as a failure. If preemption
554566
// succeeds, the pod should get counted as a success the next time we try to

0 commit comments

Comments
 (0)