Skip to content

Commit 59eff29

Browse files
committed
- Add Extenders() and PluginsRunner interface to PreemptHandle
- Make some private functions stateless - make addNominatedPods() not dependent on genericScheduler - make addNominatedPods() not dependent on genericScheduler - make selectVictimsOnNode() not dependent on genericScheduler - make selectNodesForPreemption() not dependent on genericScheduler
1 parent 3a95b11 commit 59eff29

File tree

5 files changed

+70
-23
lines changed

5 files changed

+70
-23
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
280280
return "", nil, nil, err
281281
}
282282
}
283-
nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
283+
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
284284
if err != nil {
285285
return "", nil, nil, err
286286
}
@@ -442,7 +442,7 @@ func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *p
442442
// We check the nodes starting from where we left off in the previous scheduling cycle,
443443
// this is to make sure all nodes have the same chance of being examined across pods.
444444
nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)]
445-
fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
445+
fits, status, err := podPassesFiltersOnNode(ctx, prof, g.podNominator, state, pod, nodeInfo)
446446
if err != nil {
447447
errCh.SendErrorWithCancel(err, cancel)
448448
return
@@ -520,12 +520,12 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v
520520
// addNominatedPods adds pods with equal or greater priority which are nominated
521521
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
522522
// 3) augmented nodeInfo.
523-
func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.Profile, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
524-
if g.podNominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
523+
func addNominatedPods(ctx context.Context, pr framework.PluginsRunner, nominator framework.PodNominator, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
524+
if nominator == nil || nodeInfo == nil || nodeInfo.Node() == nil {
525525
// This may happen only in tests.
526526
return false, state, nodeInfo, nil
527527
}
528-
nominatedPods := g.podNominator.NominatedPodsForNode(nodeInfo.Node().Name)
528+
nominatedPods := nominator.NominatedPodsForNode(nodeInfo.Node().Name)
529529
if len(nominatedPods) == 0 {
530530
return false, state, nodeInfo, nil
531531
}
@@ -535,7 +535,7 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
535535
for _, p := range nominatedPods {
536536
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
537537
nodeInfoOut.AddPod(p)
538-
status := prof.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
538+
status := pr.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
539539
if !status.IsSuccess() {
540540
return false, state, nodeInfo, status.AsError()
541541
}
@@ -555,9 +555,10 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, prof *profile.P
555555
// and add the nominated pods. Removal of the victims is done by
556556
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
557557
// NodeInfo before calling this function.
558-
func (g *genericScheduler) podPassesFiltersOnNode(
558+
func podPassesFiltersOnNode(
559559
ctx context.Context,
560-
prof *profile.Profile,
560+
pr framework.PluginsRunner,
561+
nominator framework.PodNominator,
561562
state *framework.CycleState,
562563
pod *v1.Pod,
563564
info *framework.NodeInfo,
@@ -588,15 +589,15 @@ func (g *genericScheduler) podPassesFiltersOnNode(
588589
nodeInfoToUse := info
589590
if i == 0 {
590591
var err error
591-
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info)
592+
podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, pr, nominator, pod, state, info)
592593
if err != nil {
593594
return false, nil, err
594595
}
595596
} else if !podsAdded || !status.IsSuccess() {
596597
break
597598
}
598599

599-
statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
600+
statusMap := pr.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
600601
status = statusMap.Merge()
601602
if !status.IsSuccess() && !status.IsUnschedulable() {
602603
return false, status, status.AsError()
@@ -847,9 +848,10 @@ func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) str
847848

848849
// selectNodesForPreemption finds all the nodes with possible victims for
849850
// preemption in parallel.
850-
func (g *genericScheduler) selectNodesForPreemption(
851+
func selectNodesForPreemption(
851852
ctx context.Context,
852-
prof *profile.Profile,
853+
pr framework.PluginsRunner,
854+
nominator framework.PodNominator,
853855
state *framework.CycleState,
854856
pod *v1.Pod,
855857
potentialNodes []*framework.NodeInfo,
@@ -861,7 +863,7 @@ func (g *genericScheduler) selectNodesForPreemption(
861863
checkNode := func(i int) {
862864
nodeInfoCopy := potentialNodes[i].Clone()
863865
stateCopy := state.Clone()
864-
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
866+
pods, numPDBViolations, fits := selectVictimsOnNode(ctx, pr, nominator, stateCopy, pod, nodeInfoCopy, pdbs)
865867
if fits {
866868
resultLock.Lock()
867869
victims := extenderv1.Victims{
@@ -937,9 +939,10 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
937939
// NOTE: This function assumes that it is never called if "pod" cannot be scheduled
938940
// due to pod affinity, node affinity, or node anti-affinity reasons. None of
939941
// these predicates can be satisfied by removing more pods from the node.
940-
func (g *genericScheduler) selectVictimsOnNode(
942+
func selectVictimsOnNode(
941943
ctx context.Context,
942-
prof *profile.Profile,
944+
pr framework.PluginsRunner,
945+
nominator framework.PodNominator,
943946
state *framework.CycleState,
944947
pod *v1.Pod,
945948
nodeInfo *framework.NodeInfo,
@@ -951,15 +954,15 @@ func (g *genericScheduler) selectVictimsOnNode(
951954
if err := nodeInfo.RemovePod(rp); err != nil {
952955
return err
953956
}
954-
status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
957+
status := pr.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
955958
if !status.IsSuccess() {
956959
return status.AsError()
957960
}
958961
return nil
959962
}
960963
addPod := func(ap *v1.Pod) error {
961964
nodeInfo.AddPod(ap)
962-
status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
965+
status := pr.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
963966
if !status.IsSuccess() {
964967
return status.AsError()
965968
}
@@ -982,7 +985,7 @@ func (g *genericScheduler) selectVictimsOnNode(
982985
// inter-pod affinity to one or more victims, but we have decided not to
983986
// support this case for performance reasons. Having affinity to lower
984987
// priority pods is not a recommended configuration anyway.
985-
if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); !fits {
988+
if fits, _, err := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo); !fits {
986989
if err != nil {
987990
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
988991
}
@@ -1000,7 +1003,7 @@ func (g *genericScheduler) selectVictimsOnNode(
10001003
if err := addPod(p); err != nil {
10011004
return false, err
10021005
}
1003-
fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo)
1006+
fits, _, _ := podPassesFiltersOnNode(ctx, pr, nominator, state, pod, nodeInfo)
10041007
if !fits {
10051008
if err := removePod(p); err != nil {
10061009
return false, err

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1632,7 +1632,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
16321632
if err != nil {
16331633
t.Fatal(err)
16341634
}
1635-
nodeToPods, err := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, test.pdbs)
1635+
nodeToPods, err := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, test.pdbs)
16361636
if err != nil {
16371637
t.Error(err)
16381638
}
@@ -1912,7 +1912,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
19121912
if !preFilterStatus.IsSuccess() {
19131913
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
19141914
}
1915-
candidateNodes, _ := g.selectNodesForPreemption(context.Background(), prof, state, test.pod, nodeInfos, nil)
1915+
candidateNodes, _ := selectNodesForPreemption(context.Background(), prof, g.podNominator, state, test.pod, nodeInfos, nil)
19161916
node := pickOneNodeForPreemption(candidateNodes)
19171917
found := false
19181918
for _, nodeName := range test.expected {

pkg/scheduler/framework/v1alpha1/framework.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type frameworkOptions struct {
121121
snapshotSharedLister SharedLister
122122
metricsRecorder *metricsRecorder
123123
podNominator PodNominator
124+
extenders []Extender
124125
runAllFilters bool
125126
}
126127

@@ -170,10 +171,31 @@ func WithPodNominator(nominator PodNominator) Option {
170171
}
171172
}
172173

174+
// WithExtenders sets extenders for the scheduling framework.
175+
func WithExtenders(extenders []Extender) Option {
176+
return func(o *frameworkOptions) {
177+
o.extenders = extenders
178+
}
179+
}
180+
173181
var defaultFrameworkOptions = frameworkOptions{
174182
metricsRecorder: newMetricsRecorder(1000, time.Second),
175183
}
176184

185+
// TODO(#91029): move this to framework runtime package.
186+
var _ PreemptHandle = &preemptHandle{}
187+
188+
type preemptHandle struct {
189+
extenders []Extender
190+
PodNominator
191+
PluginsRunner
192+
}
193+
194+
// Extenders returns the registered extenders.
195+
func (ph *preemptHandle) Extenders() []Extender {
196+
return ph.extenders
197+
}
198+
177199
var _ Framework = &framework{}
178200

179201
// NewFramework initializes plugins given the configuration and the registry.
@@ -191,9 +213,13 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
191213
clientSet: options.clientSet,
192214
informerFactory: options.informerFactory,
193215
metricsRecorder: options.metricsRecorder,
194-
preemptHandle: options.podNominator,
195216
runAllFilters: options.runAllFilters,
196217
}
218+
f.preemptHandle = &preemptHandle{
219+
extenders: options.extenders,
220+
PodNominator: options.podNominator,
221+
PluginsRunner: f,
222+
}
197223
if plugins == nil {
198224
return f, nil
199225
}

pkg/scheduler/framework/v1alpha1/interface.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,12 @@ type FrameworkHandle interface {
494494

495495
// PreemptHandle incorporates all needed logic to run preemption logic.
496496
type PreemptHandle interface {
497+
// PodNominator abstracts operations to maintain nominated Pods.
497498
PodNominator
499+
// PluginsRunner abstracts operations to run some plugins.
500+
PluginsRunner
501+
// Extenders returns registered scheduler extenders.
502+
Extenders() []Extender
498503
}
499504

500505
// PodNominator abstracts operations to maintain nominated Pods.
@@ -509,3 +514,15 @@ type PodNominator interface {
509514
// NominatedPodsForNode returns nominatedPods on the given node.
510515
NominatedPodsForNode(nodeName string) []*v1.Pod
511516
}
517+
518+
// PluginsRunner abstracts operations to run some plugins.
519+
// This is used by preemption PostFilter plugins when evaluating the feasibility of
520+
// scheduling the pod on nodes when certain running pods get evicted.
521+
type PluginsRunner interface {
522+
// RunFilterPlugins runs the set of configured filter plugins for pod on the given node.
523+
RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
524+
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured PreFilter plugins.
525+
RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToAdd *v1.Pod, nodeInfo *NodeInfo) *Status
526+
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured PreFilter plugins.
527+
RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podToRemove *v1.Pod, nodeInfo *NodeInfo) *Status
528+
}

pkg/scheduler/scheduler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile,
145145
func (es mockScheduler) Extenders() []framework.Extender {
146146
return nil
147147
}
148-
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
148+
149+
func (es mockScheduler) Preempt(ctx context.Context, profile *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
149150
return "", nil, nil, nil
150151
}
151152

0 commit comments

Comments
 (0)