Skip to content

Commit d06ff65

Browse files
authored
Merge pull request kubernetes#92876 from Huang-Wei/pdbLister
Add pdbLister as a member field of struct DefaultPreemption
2 parents c23a4b0 + 9d377eb commit d06ff65

File tree

3 files changed

+41
-22
lines changed

3 files changed

+41
-22
lines changed

pkg/scheduler/framework/plugins/defaultpreemption/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ go_library(
1919
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
2020
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
2121
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
22+
"//staging/src/k8s.io/client-go/informers:go_default_library",
23+
"//staging/src/k8s.io/client-go/listers/policy/v1beta1:go_default_library",
2224
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
2325
"//vendor/k8s.io/klog/v2:go_default_library",
2426
],

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"k8s.io/apimachinery/pkg/labels"
3232
"k8s.io/apimachinery/pkg/runtime"
3333
utilfeature "k8s.io/apiserver/pkg/util/feature"
34+
"k8s.io/client-go/informers"
35+
policylisters "k8s.io/client-go/listers/policy/v1beta1"
3436
extenderv1 "k8s.io/kube-scheduler/extender/v1"
3537
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
3638
kubefeatures "k8s.io/kubernetes/pkg/features"
@@ -48,7 +50,8 @@ const (
4850

4951
// DefaultPreemption is a PostFilter plugin implements the preemption logic.
5052
type DefaultPreemption struct {
51-
fh framework.FrameworkHandle
53+
fh framework.FrameworkHandle
54+
pdbLister policylisters.PodDisruptionBudgetLister
5255
}
5356

5457
var _ framework.PostFilterPlugin = &DefaultPreemption{}
@@ -60,10 +63,9 @@ func (pl *DefaultPreemption) Name() string {
6063

6164
// New initializes a new plugin and returns it.
6265
func New(_ runtime.Object, fh framework.FrameworkHandle) (framework.Plugin, error) {
63-
pl := DefaultPreemption{fh}
64-
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
65-
// A hack to initialize pdbLister in sharedInformerFactory.
66-
fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister()
66+
pl := DefaultPreemption{
67+
fh: fh,
68+
pdbLister: getPDBLister(fh.SharedInformerFactory()),
6769
}
6870
return &pl, nil
6971
}
@@ -77,7 +79,7 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
7779
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
7880
}()
7981

80-
nnn, err := preempt(ctx, pl.fh, state, pod, m)
82+
nnn, err := pl.preempt(ctx, state, pod, m)
8183
if err != nil {
8284
return nil, framework.NewStatus(framework.Error, err.Error())
8385
}
@@ -97,20 +99,20 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
9799
// other pods with the same priority. The nominated pod prevents other pods from
98100
// using the nominated resources and the nominated pod could take a long time
99101
// before it is retried after many other pending pods.
100-
func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
101-
cs := fh.ClientSet()
102+
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
103+
cs := pl.fh.ClientSet()
102104
// TODO(Huang-Wei): get pod from informer cache instead of API server.
103105
pod, err := util.GetUpdatedPod(cs, pod)
104106
if err != nil {
105107
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
106108
return "", err
107109
}
108110

109-
if !podEligibleToPreemptOthers(pod, fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) {
111+
if !podEligibleToPreemptOthers(pod, pl.fh.SnapshotSharedLister().NodeInfos(), m[pod.Status.NominatedNodeName]) {
110112
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
111113
return "", nil
112114
}
113-
allNodes, err := fh.SnapshotSharedLister().NodeInfos().List()
115+
allNodes, err := pl.fh.SnapshotSharedLister().NodeInfos().List()
114116
if err != nil {
115117
return "", err
116118
}
@@ -134,19 +136,19 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
134136
}
135137
klog.Infof("%v potential nodes for preemption, first %v are: %v", len(potentialNodes), len(sample), sample)
136138
}
137-
pdbs, err := getPodDisruptionBudgets(fh)
139+
pdbs, err := getPodDisruptionBudgets(pl.pdbLister)
138140
if err != nil {
139141
return "", err
140142
}
141-
nodeNameToVictims, err := selectNodesForPreemption(ctx, fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
143+
nodeNameToVictims, err := selectNodesForPreemption(ctx, pl.fh.PreemptHandle(), state, pod, potentialNodes, pdbs)
142144
if err != nil {
143145
return "", err
144146
}
145147

146148
// We will only check nodeNameToVictims with extenders that support preemption.
147149
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
148150
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
149-
nodeNameToVictims, err = processPreemptionWithExtenders(fh, pod, nodeNameToVictims)
151+
nodeNameToVictims, err = processPreemptionWithExtenders(pl.fh, pod, nodeNameToVictims)
150152
if err != nil {
151153
return "", err
152154
}
@@ -163,18 +165,18 @@ func preempt(ctx context.Context, fh framework.FrameworkHandle, state *framework
163165
return "", err
164166
}
165167
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
166-
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
168+
if waitingPod := pl.fh.GetWaitingPod(victim.UID); waitingPod != nil {
167169
waitingPod.Reject("preempted")
168170
}
169-
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
171+
pl.fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
170172
}
171173
metrics.PreemptionVictims.Observe(float64(len(victims)))
172174

173175
// Lower priority pods nominated to run on this node, may no longer fit on
174176
// this node. So, we should remove their nomination. Removing their
175177
// nomination updates these pods and moves them to the active queue. It
176178
// lets scheduler find another place for them.
177-
nominatedPods := getLowerPriorityNominatedPods(fh.PreemptHandle(), pod, candidateNode)
179+
nominatedPods := getLowerPriorityNominatedPods(pl.fh.PreemptHandle(), pod, candidateNode)
178180
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
179181
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
180182
// We do not return as this error is not critical.
@@ -615,9 +617,16 @@ func filterPodsWithPDBViolation(pods []*v1.Pod, pdbs []*policy.PodDisruptionBudg
615617
return violatingPods, nonViolatingPods
616618
}
617619

618-
func getPodDisruptionBudgets(fh framework.FrameworkHandle) ([]*policy.PodDisruptionBudget, error) {
620+
func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
619621
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodDisruptionBudget) {
620-
return fh.SharedInformerFactory().Policy().V1beta1().PodDisruptionBudgets().Lister().List(labels.Everything())
622+
return informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister()
623+
}
624+
return nil
625+
}
626+
627+
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {
628+
if pdbLister != nil {
629+
return pdbLister.List(labels.Everything())
621630
}
622631
return nil, nil
623632
}

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption_test.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,14 +221,17 @@ func TestPostFilter(t *testing.T) {
221221
if err != nil {
222222
t.Fatal(err)
223223
}
224-
p := DefaultPreemption{fh: f}
225224

226225
state := framework.NewCycleState()
227226
// Ensure <state> is populated.
228227
if status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
229228
t.Errorf("Unexpected PreFilter Status: %v", status)
230229
}
231230

231+
p := DefaultPreemption{
232+
fh: f,
233+
pdbLister: getPDBLister(informerFactory),
234+
}
232235
gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses)
233236
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
234237
t.Errorf("Status does not match: %v, want: %v", gotStatus, tt.wantStatus)
@@ -1236,6 +1239,7 @@ func TestPreempt(t *testing.T) {
12361239
extenders = append(extenders, extender)
12371240
}
12381241

1242+
informerFactory := informers.NewSharedInformerFactory(client, 0)
12391243
fwk, err := st.NewFramework(
12401244
[]st.RegisterPluginFunc{
12411245
test.registerPlugin,
@@ -1247,7 +1251,7 @@ func TestPreempt(t *testing.T) {
12471251
frameworkruntime.WithExtenders(extenders),
12481252
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
12491253
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
1250-
frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(client, 0)),
1254+
frameworkruntime.WithInformerFactory(informerFactory),
12511255
)
12521256
if err != nil {
12531257
t.Fatal(err)
@@ -1260,7 +1264,11 @@ func TestPreempt(t *testing.T) {
12601264
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)
12611265
}
12621266
// Call preempt and check the expected results.
1263-
node, err := preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap))
1267+
pl := DefaultPreemption{
1268+
fh: fwk,
1269+
pdbLister: getPDBLister(informerFactory),
1270+
}
1271+
node, err := pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
12641272
if err != nil {
12651273
t.Errorf("unexpected error in preemption: %v", err)
12661274
}
@@ -1298,7 +1306,7 @@ func TestPreempt(t *testing.T) {
12981306
}
12991307

13001308
// Call preempt again and make sure it doesn't preempt any more pods.
1301-
node, err = preempt(context.Background(), fwk, state, test.pod, make(framework.NodeToStatusMap))
1309+
node, err = pl.preempt(context.Background(), state, test.pod, make(framework.NodeToStatusMap))
13021310
if err != nil {
13031311
t.Errorf("unexpected error in preemption: %v", err)
13041312
}

0 commit comments

Comments
 (0)