Skip to content

Commit 5ed7b1a

Browse files
authored
Merge pull request kubernetes#92012 from Huang-Wei/postfilter-impl-2
[postfilter-impl-2] Introduce a defaultpreemption PostFilter plugin
2 parents 3b466d1 + 196056d commit 5ed7b1a

File tree

12 files changed

+134
-80
lines changed

12 files changed

+134
-80
lines changed

pkg/scheduler/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ go_library(
4242
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
4343
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
4444
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
45-
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
4645
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
4746
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
4847
"//vendor/k8s.io/klog/v2:go_default_library",

pkg/scheduler/core/BUILD

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
visibility = ["//visibility:public"],
1111
deps = [
1212
"//pkg/api/v1/pod:go_default_library",
13+
"//pkg/features:go_default_library",
1314
"//pkg/scheduler/apis/config:go_default_library",
1415
"//pkg/scheduler/framework/v1alpha1:go_default_library",
1516
"//pkg/scheduler/internal/cache:go_default_library",
@@ -23,8 +24,8 @@ go_library(
2324
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2425
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
2526
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
27+
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
2628
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
27-
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
2829
"//staging/src/k8s.io/client-go/rest:go_default_library",
2930
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
3031
"//vendor/k8s.io/klog/v2:go_default_library",

pkg/scheduler/core/extender_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
283283
emptySnapshot,
284284
extenders,
285285
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
286-
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
287286
false,
288287
schedulerapi.DefaultPercentageOfNodesToScore)
289288
podIgnored := &v1.Pod{}

pkg/scheduler/core/generic_scheduler.go

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ import (
3333
policy "k8s.io/api/policy/v1beta1"
3434
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3535
"k8s.io/apimachinery/pkg/labels"
36+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3637
corelisters "k8s.io/client-go/listers/core/v1"
37-
policylisters "k8s.io/client-go/listers/policy/v1beta1"
3838
extenderv1 "k8s.io/kube-scheduler/extender/v1"
3939
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
40+
kubefeatures "k8s.io/kubernetes/pkg/features"
4041
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
4142
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
4243
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
@@ -100,10 +101,6 @@ func (f *FitError) Error() string {
100101
// TODO: Rename this type.
101102
type ScheduleAlgorithm interface {
102103
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
103-
// Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for
104-
// the pod by preempting lower priority pods if possible.
105-
// It returns the node where preemption happened, and error if any.
106-
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error)
107104
// Extenders returns a slice of extender config. This is exposed for
108105
// testing.
109106
Extenders() []framework.Extender
@@ -126,7 +123,6 @@ type genericScheduler struct {
126123
extenders []framework.Extender
127124
nodeInfoSnapshot *internalcache.Snapshot
128125
pvcLister corelisters.PersistentVolumeClaimLister
129-
pdbLister policylisters.PodDisruptionBudgetLister
130126
disablePreemption bool
131127
percentageOfNodesToScore int32
132128
nextStartNodeIndex int
@@ -236,7 +232,7 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
236232
return selected, nil
237233
}
238234

239-
// preempt finds nodes with pods that can be preempted to make room for "pod" to
235+
// Preempt finds nodes with pods that can be preempted to make room for "pod" to
240236
// schedule. It chooses one of the nodes and preempts the pods on the node and
241237
// returns 1) the node, 2) the list of preempted pods if such a node is found,
242238
// 3) A list of pods whose nominated node name should be cleared, and 4) any
@@ -248,20 +244,20 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
248244
// other pods with the same priority. The nominated pod prevents other pods from
249245
// using the nominated resources and the nominated pod could take a long time
250246
// before it is retried after many other pending pods.
251-
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
252-
cs := prof.ClientSet()
247+
func Preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
248+
cs := fh.ClientSet()
253249
// TODO(Huang-Wei): get pod from informer cache instead of API server.
254250
pod, err := util.GetUpdatedPod(cs, pod)
255251
if err != nil {
256252
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
257253
return "", err
258254
}
259255

260-
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
256+
if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos()) {
261257
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
262258
return "", nil
263259
}
264-
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
260+
allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
265261
if err != nil {
266262
return "", err
267263
}
@@ -285,22 +281,19 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
285281
}
286282
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
287283
}
288-
var pdbs []*policy.PodDisruptionBudget
289-
if g.pdbLister != nil {
290-
pdbs, err = g.pdbLister.List(labels.Everything())
291-
if err != nil {
292-
return "", err
293-
}
284+
pdbs, err := getPodDisruptionBudgets(fh)
285+
if err != nil {
286+
return "", err
294287
}
295-
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
288+
nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
296289
if err != nil {
297290
return "", err
298291
}
299292

300293
// We will only check nodeNameToVictims with extenders that support preemption.
301294
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
302295
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
303-
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
296+
nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
304297
if err != nil {
305298
return "", err
306299
}
@@ -317,18 +310,18 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
317310
return "", err
318311
}
319312
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
320-
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
313+
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
321314
waitingPod.Reject("preempted")
322315
}
323-
prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
316+
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
324317
}
325318
metrics.PreemptionVictims.Observe(float64(len(victims)))
326319

327320
// Lower priority pods nominated to run on this node, may no longer fit on
328321
// this node. So, we should remove their nomination. Removing their
329322
// nomination updates these pods and moves them to the active queue. It
330323
// lets scheduler find another place for them.
331-
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
324+
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
332325
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
333326
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
334327
// We do not return as this error is not critical.
@@ -337,18 +330,22 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
337330
return candidateNode, nil
338331
}
339332

333+
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
334+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
335+
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
336+
}
337+
return nil, nil
338+
}
339+
340340
// processPreemptionWithExtenders processes preemption with extenders
341-
func (g *genericScheduler) processPreemptionWithExtenders(
342-
pod *v1.Pod,
343-
nodeNameToVictims map[string]*extenderv1.Victims,
344-
) (map[string]*extenderv1.Victims, error) {
341+
func processPreemptionWithExtenders(fh framework.FrameworkHandle, pod *v1.Pod, nodeNameToVictims map[string]*extenderv1.Victims) (map[string]*extenderv1.Victims, error) {
345342
if len(nodeNameToVictims) > 0 {
346-
for _, extender := range g.extenders {
343+
for _, extender := range fh.PreemptHandle().Extenders() {
347344
if extender.SupportsPreemption() && extender.IsInterested(pod) {
348345
newNodeNameToVictims, err := extender.ProcessPreemption(
349346
pod,
350347
nodeNameToVictims,
351-
g.nodeInfoSnapshot.NodeInfos(),
348+
fh.SnapshotSharedLister().NodeInfos(),
352349
)
353350
if err != nil {
354351
if extender.IsIgnorable() {
@@ -381,8 +378,8 @@ func (g *genericScheduler) processPreemptionWithExtenders(
381378
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
382379
// worth the complexity, especially because we generally expect to have a very
383380
// small number of nominated pods per node.
384-
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
385-
pods := g.podNominator.NominatedPodsForNode(nodeName)
381+
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {
382+
pods := pn.NominatedPodsForNode(nodeName)
386383

387384
if len(pods) == 0 {
388385
return nil
@@ -1141,7 +1138,6 @@ func NewGenericScheduler(
11411138
nodeInfoSnapshot *internalcache.Snapshot,
11421139
extenders []framework.Extender,
11431140
pvcLister corelisters.PersistentVolumeClaimLister,
1144-
pdbLister policylisters.PodDisruptionBudgetLister,
11451141
disablePreemption bool,
11461142
percentageOfNodesToScore int32) ScheduleAlgorithm {
11471143
return &genericScheduler{
@@ -1150,7 +1146,6 @@ func NewGenericScheduler(
11501146
extenders: extenders,
11511147
nodeInfoSnapshot: nodeInfoSnapshot,
11521148
pvcLister: pvcLister,
1153-
pdbLister: pdbLister,
11541149
disablePreemption: disablePreemption,
11551150
percentageOfNodesToScore: percentageOfNodesToScore,
11561151
}

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -693,9 +693,6 @@ func TestGenericScheduler(t *testing.T) {
693693
}
694694
for _, test := range tests {
695695
t.Run(test.name, func(t *testing.T) {
696-
client := clientsetfake.NewSimpleClientset()
697-
informerFactory := informers.NewSharedInformerFactory(client, 0)
698-
699696
cache := internalcache.New(time.Duration(0), wait.NeverStop)
700697
for _, pod := range test.pods {
701698
cache.AddPod(pod)
@@ -724,7 +721,6 @@ func TestGenericScheduler(t *testing.T) {
724721
snapshot,
725722
[]framework.Extender{},
726723
pvcLister,
727-
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
728724
false,
729725
schedulerapi.DefaultPercentageOfNodesToScore)
730726
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
@@ -752,7 +748,7 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler {
752748
cache,
753749
internalqueue.NewSchedulingQueue(nil),
754750
emptySnapshot,
755-
nil, nil, nil, false,
751+
nil, nil, false,
756752
schedulerapi.DefaultPercentageOfNodesToScore)
757753
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
758754
return s.(*genericScheduler)
@@ -1045,7 +1041,6 @@ func TestZeroRequest(t *testing.T) {
10451041
emptySnapshot,
10461042
[]framework.Extender{},
10471043
nil,
1048-
nil,
10491044
false,
10501045
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
10511046
scheduler.nodeInfoSnapshot = snapshot
@@ -1508,9 +1503,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
15081503
labelKeys := []string{"hostname", "zone", "region"}
15091504
for _, test := range tests {
15101505
t.Run(test.name, func(t *testing.T) {
1511-
client := clientsetfake.NewSimpleClientset()
1512-
informerFactory := informers.NewSharedInformerFactory(client, 0)
1513-
15141506
filterFailedNodeReturnCodeMap := map[string]framework.Code{}
15151507
cache := internalcache.New(time.Duration(0), wait.NeverStop)
15161508
for _, pod := range test.pods {
@@ -1558,7 +1550,6 @@ func TestSelectNodesForPreemption(t *testing.T) {
15581550
snapshot,
15591551
[]framework.Extender{},
15601552
nil,
1561-
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
15621553
false,
15631554
schedulerapi.DefaultPercentageOfNodesToScore)
15641555
g := scheduler.(*genericScheduler)
@@ -2306,7 +2297,6 @@ func TestPreempt(t *testing.T) {
23062297
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
23072298
return true, nil, nil
23082299
})
2309-
informerFactory := informers.NewSharedInformerFactory(client, 0)
23102300

23112301
stop := make(chan struct{})
23122302
cache := internalcache.New(time.Duration(0), stop)
@@ -2344,29 +2334,21 @@ func TestPreempt(t *testing.T) {
23442334
extenders = append(extenders, extender)
23452335
}
23462336

2337+
podNominator := internalqueue.NewPodNominator()
23472338
snapshot := internalcache.NewSnapshot(test.pods, nodes)
23482339
fwk, err := st.NewFramework(
23492340
test.registerPlugins,
23502341
framework.WithClientSet(client),
2342+
framework.WithEventRecorder(&events.FakeRecorder{}),
2343+
framework.WithExtenders(extenders),
2344+
framework.WithPodNominator(podNominator),
23512345
framework.WithSnapshotSharedLister(snapshot),
2346+
framework.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)),
23522347
)
23532348
if err != nil {
23542349
t.Fatal(err)
23552350
}
2356-
prof := &profile.Profile{
2357-
Framework: fwk,
2358-
Recorder: &events.FakeRecorder{},
2359-
}
23602351

2361-
scheduler := NewGenericScheduler(
2362-
cache,
2363-
internalqueue.NewSchedulingQueue(nil),
2364-
snapshot,
2365-
extenders,
2366-
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
2367-
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
2368-
false,
2369-
schedulerapi.DefaultPercentageOfNodesToScore)
23702352
state := framework.NewCycleState()
23712353
// Some tests rely on PreFilter plugin to compute its CycleState.
23722354
preFilterStatus := fwk.RunPreFilterPlugins(context.Background(), state, test.pod)
@@ -2378,7 +2360,7 @@ func TestPreempt(t *testing.T) {
23782360
if test.failedNodeToStatusMap != nil {
23792361
failedNodeToStatusMap = test.failedNodeToStatusMap
23802362
}
2381-
node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
2363+
node, err := Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap)
23822364
if err != nil {
23832365
t.Errorf("unexpected error in preemption: %v", err)
23842366
}
@@ -2416,7 +2398,7 @@ func TestPreempt(t *testing.T) {
24162398
}
24172399

24182400
// Call preempt again and make sure it doesn't preempt any more pods.
2419-
node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
2401+
node, err = Preempt(context.Background(), fwk, state, test.pod, failedNodeToStatusMap)
24202402
if err != nil {
24212403
t.Errorf("unexpected error in preemption: %v", err)
24222404
}

pkg/scheduler/eventhandlers.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
coreinformers "k8s.io/client-go/informers/core/v1"
3131
"k8s.io/client-go/tools/cache"
3232
"k8s.io/kubernetes/pkg/features"
33+
kubefeatures "k8s.io/kubernetes/pkg/features"
3334
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
3435
"k8s.io/kubernetes/pkg/scheduler/profile"
3536
)
@@ -466,6 +467,11 @@ func addAllEventHandlers(
466467
AddFunc: sched.onStorageClassAdd,
467468
},
468469
)
470+
471+
// TODO(Huang-Wei): remove this hack when defaultpreemption plugin is enabled.
472+
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
473+
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
474+
}
469475
}
470476

471477
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string {

pkg/scheduler/factory.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@ import (
3030
"k8s.io/apimachinery/pkg/fields"
3131
"k8s.io/apimachinery/pkg/runtime"
3232
"k8s.io/apimachinery/pkg/util/sets"
33-
utilfeature "k8s.io/apiserver/pkg/util/feature"
3433
"k8s.io/client-go/informers"
3534
coreinformers "k8s.io/client-go/informers/core/v1"
3635
clientset "k8s.io/client-go/kubernetes"
3736
corelisters "k8s.io/client-go/listers/core/v1"
38-
policylisters "k8s.io/client-go/listers/policy/v1beta1"
3937
"k8s.io/client-go/tools/cache"
4038
"k8s.io/klog/v2"
41-
kubefeatures "k8s.io/kubernetes/pkg/features"
4239
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
4340
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
4441
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
@@ -192,7 +189,6 @@ func (c *Configurator) create() (*Scheduler, error) {
192189
c.nodeInfoSnapshot,
193190
extenders,
194191
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
195-
GetPodDisruptionBudgetLister(c.informerFactory),
196192
c.disablePreemption,
197193
c.percentageOfNodesToScore,
198194
)
@@ -478,11 +474,3 @@ func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodL
478474
}
479475
}
480476
}
481-
482-
// GetPodDisruptionBudgetLister returns pdb lister from the given informer factory. Returns nil if PodDisruptionBudget feature is disabled.
483-
func GetPodDisruptionBudgetLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
484-
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
485-
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
486-
}
487-
return nil
488-
}

pkg/scheduler/framework/plugins/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ filegroup(
4848
":package-srcs",
4949
"//pkg/scheduler/framework/plugins/defaultbinder:all-srcs",
5050
"//pkg/scheduler/framework/plugins/defaultpodtopologyspread:all-srcs",
51+
"//pkg/scheduler/framework/plugins/defaultpreemption:all-srcs",
5152
"//pkg/scheduler/framework/plugins/examples:all-srcs",
5253
"//pkg/scheduler/framework/plugins/helper:all-srcs",
5354
"//pkg/scheduler/framework/plugins/imagelocality:all-srcs",

0 commit comments

Comments
 (0)