Skip to content

Commit 36c8ecc

Browse files
committed
Refactor scheduler preempt interface
- replace error with NodeToStatusMap in Preempt() signature - eliminate podPreemptor interface and expose its functions statelessly - move logic in scheduler.go#preempt to generic_scheduler.go#Preempt()
1 parent 98f250f commit 36c8ecc

File tree

9 files changed

+224
-219
lines changed

9 files changed

+224
-219
lines changed

pkg/scheduler/BUILD

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@ go_library(
2727
"//pkg/scheduler/internal/queue:go_default_library",
2828
"//pkg/scheduler/metrics:go_default_library",
2929
"//pkg/scheduler/profile:go_default_library",
30+
"//pkg/scheduler/util:go_default_library",
3031
"//staging/src/k8s.io/api/core/v1:go_default_library",
3132
"//staging/src/k8s.io/api/storage/v1:go_default_library",
3233
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
3334
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
3435
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
3536
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
36-
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
3737
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
3838
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
39-
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
4039
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
4140
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
4241
"//staging/src/k8s.io/client-go/informers:go_default_library",

pkg/scheduler/core/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ go_test(
7373
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
7474
"//staging/src/k8s.io/client-go/informers:go_default_library",
7575
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
76+
"//staging/src/k8s.io/client-go/testing:go_default_library",
77+
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
7678
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
7779
],
7880
)

pkg/scheduler/core/generic_scheduler.go

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,10 @@ func (f *FitError) Error() string {
100100
// TODO: Rename this type.
101101
type ScheduleAlgorithm interface {
102102
Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
103-
// Preempt receives scheduling errors for a pod and tries to create room for
103+
// Preempt receives scheduling filter result (NodeToStatusMap) for a pod and tries to create room for
104104
// the pod by preempting lower priority pods if possible.
105-
// It returns the node where preemption happened, a list of preempted pods, a
106-
// list of pods whose nominated node name should be removed, and error if any.
107-
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
105+
// It returns the node where preemption happened, and error if any.
106+
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, framework.NodeToStatusMap) (selectedNode string, err error)
108107
// Extenders returns a slice of extender config. This is exposed for
109108
// testing.
110109
Extenders() []framework.Extender
@@ -249,29 +248,35 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
249248
// other pods with the same priority. The nominated pod prevents other pods from
250249
// using the nominated resources and the nominated pod could take a long time
251250
// before it is retried after many other pending pods.
252-
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
253-
// Scheduler may return various types of errors. Consider preemption only if
254-
// the error is of type FitError.
255-
fitError, ok := scheduleErr.(*FitError)
256-
if !ok || fitError == nil {
257-
return "", nil, nil, nil
251+
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, error) {
252+
cs := prof.ClientSet()
253+
// TODO(Huang-Wei): get pod from informer cache instead of API server.
254+
pod, err := util.GetUpdatedPod(cs, pod)
255+
if err != nil {
256+
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
257+
return "", err
258258
}
259+
259260
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
260261
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
261-
return "", nil, nil, nil
262+
return "", nil
262263
}
263264
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
264265
if err != nil {
265-
return "", nil, nil, err
266+
return "", err
266267
}
267268
if len(allNodes) == 0 {
268-
return "", nil, nil, ErrNoNodesAvailable
269+
return "", ErrNoNodesAvailable
269270
}
270-
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
271+
potentialNodes := nodesWherePreemptionMightHelp(allNodes, m)
271272
if len(potentialNodes) == 0 {
272273
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
273274
// In this case, we should clean-up any existing nominated node name of the pod.
274-
return "", nil, []*v1.Pod{pod}, nil
275+
if err := util.ClearNominatedNodeName(cs, pod); err != nil {
276+
klog.Errorf("Cannot clear 'NominatedNodeName' field of pod %v/%v: %v", pod.Namespace, pod.Name, err)
277+
// We do not return as this error is not critical.
278+
}
279+
return "", nil
275280
}
276281
if klog.V(5).Enabled() {
277282
var sample []string
@@ -284,33 +289,52 @@ func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, s
284289
if g.pdbLister != nil {
285290
pdbs, err = g.pdbLister.List(labels.Everything())
286291
if err != nil {
287-
return "", nil, nil, err
292+
return "", err
288293
}
289294
}
290295
nodeNameToVictims, err := selectNodesForPreemption(ctx, prof, g.podNominator, state, pod, potentialNodes, pdbs)
291296
if err != nil {
292-
return "", nil, nil, err
297+
return "", err
293298
}
294299

295300
// We will only check nodeNameToVictims with extenders that support preemption.
296301
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
297302
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
298303
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
299304
if err != nil {
300-
return "", nil, nil, err
305+
return "", err
301306
}
302307

303308
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
304309
if len(candidateNode) == 0 {
305-
return "", nil, nil, nil
310+
return "", nil
306311
}
307312

313+
victims := nodeNameToVictims[candidateNode].Pods
314+
for _, victim := range victims {
315+
if err := util.DeletePod(cs, victim); err != nil {
316+
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
317+
return "", err
318+
}
319+
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
320+
if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod != nil {
321+
waitingPod.Reject("preempted")
322+
}
323+
prof.Recorder.Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, candidateNode)
324+
}
325+
metrics.PreemptionVictims.Observe(float64(len(victims)))
326+
308327
// Lower priority pods nominated to run on this node, may no longer fit on
309328
// this node. So, we should remove their nomination. Removing their
310329
// nomination updates these pods and moves them to the active queue. It
311330
// lets scheduler find another place for them.
312331
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
313-
return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil
332+
if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {
333+
klog.Errorf("Cannot clear 'NominatedNodeName' field: %v", err)
334+
// We do not return as this error is not critical.
335+
}
336+
337+
return candidateNode, nil
314338
}
315339

316340
// processPreemptionWithExtenders processes preemption with extenders
@@ -1041,13 +1065,13 @@ func selectVictimsOnNode(
10411065

10421066
// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
10431067
// that may be satisfied by removing pods from the node.
1044-
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, fitErr *FitError) []*framework.NodeInfo {
1068+
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) []*framework.NodeInfo {
10451069
var potentialNodes []*framework.NodeInfo
10461070
for _, node := range nodes {
10471071
name := node.Node().Name
10481072
// We reply on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'
10491073
// to determine whether preemption may help or not on the node.
1050-
if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
1074+
if m[name].Code() == framework.UnschedulableAndUnresolvable {
10511075
continue
10521076
}
10531077
potentialNodes = append(potentialNodes, node)

pkg/scheduler/core/generic_scheduler_test.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"k8s.io/apimachinery/pkg/util/wait"
3838
"k8s.io/client-go/informers"
3939
clientsetfake "k8s.io/client-go/kubernetes/fake"
40+
clienttesting "k8s.io/client-go/testing"
41+
"k8s.io/client-go/tools/events"
4042
extenderv1 "k8s.io/kube-scheduler/extender/v1"
4143
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
4244
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -2021,16 +2023,13 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) {
20212023

20222024
for _, test := range tests {
20232025
t.Run(test.name, func(t *testing.T) {
2024-
fitErr := FitError{
2025-
FilteredNodesStatuses: test.nodesStatuses,
2026-
}
20272026
var nodeInfos []*framework.NodeInfo
20282027
for _, n := range makeNodeList(nodeNames) {
20292028
ni := framework.NewNodeInfo()
20302029
ni.SetNode(n)
20312030
nodeInfos = append(nodeInfos, ni)
20322031
}
2033-
nodes := nodesWherePreemptionMightHelp(nodeInfos, &fitErr)
2032+
nodes := nodesWherePreemptionMightHelp(nodeInfos, test.nodesStatuses)
20342033
if len(test.expected) != len(nodes) {
20352034
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
20362035
}
@@ -2359,7 +2358,13 @@ func TestPreempt(t *testing.T) {
23592358
labelKeys := []string{"hostname", "zone", "region"}
23602359
for _, test := range tests {
23612360
t.Run(test.name, func(t *testing.T) {
2362-
client := clientsetfake.NewSimpleClientset()
2361+
apiObjs := mergeObjs(test.pod, test.pods)
2362+
client := clientsetfake.NewSimpleClientset(apiObjs...)
2363+
deletedPodNames := make(sets.String)
2364+
client.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
2365+
deletedPodNames.Insert(action.(clienttesting.DeleteAction).GetName())
2366+
return true, nil, nil
2367+
})
23632368
informerFactory := informers.NewSharedInformerFactory(client, 0)
23642369

23652370
stop := make(chan struct{})
@@ -2399,11 +2404,18 @@ func TestPreempt(t *testing.T) {
23992404
}
24002405

24012406
snapshot := internalcache.NewSnapshot(test.pods, nodes)
2402-
fwk, err := st.NewFramework(test.registerPlugins, framework.WithSnapshotSharedLister(snapshot))
2407+
fwk, err := st.NewFramework(
2408+
test.registerPlugins,
2409+
framework.WithClientSet(client),
2410+
framework.WithSnapshotSharedLister(snapshot),
2411+
)
24032412
if err != nil {
24042413
t.Fatal(err)
24052414
}
2406-
prof := &profile.Profile{Framework: fwk}
2415+
prof := &profile.Profile{
2416+
Framework: fwk,
2417+
Recorder: &events.FakeRecorder{},
2418+
}
24072419

24082420
scheduler := NewGenericScheduler(
24092421
cache,
@@ -2425,7 +2437,7 @@ func TestPreempt(t *testing.T) {
24252437
if test.failedNodeToStatusMap != nil {
24262438
failedNodeToStatusMap = test.failedNodeToStatusMap
24272439
}
2428-
node, victims, _, err := scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
2440+
node, err := scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
24292441
if err != nil {
24302442
t.Errorf("unexpected error in preemption: %v", err)
24312443
}
@@ -2435,31 +2447,39 @@ func TestPreempt(t *testing.T) {
24352447
if len(node) == 0 && len(test.expectedNode) != 0 {
24362448
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
24372449
}
2438-
if len(victims) != len(test.expectedPods) {
2439-
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(victims))
2450+
if len(deletedPodNames) != len(test.expectedPods) {
2451+
t.Errorf("expected %v pods, got %v.", len(test.expectedPods), len(deletedPodNames))
24402452
}
2441-
for _, victim := range victims {
2453+
for victimName := range deletedPodNames {
24422454
found := false
24432455
for _, expPod := range test.expectedPods {
2444-
if expPod == victim.Name {
2456+
if expPod == victimName {
24452457
found = true
24462458
break
24472459
}
24482460
}
24492461
if !found {
2450-
t.Errorf("pod %v is not expected to be a victim.", victim.Name)
2462+
t.Fatalf("pod %v is not expected to be a victim.", victimName)
24512463
}
2452-
// Mark the victims for deletion and record the preemptor's nominated node name.
2453-
now := metav1.Now()
2454-
victim.DeletionTimestamp = &now
2455-
test.pod.Status.NominatedNodeName = node
24562464
}
2465+
test.pod.Status.NominatedNodeName = node
2466+
client.CoreV1().Pods(test.pod.Namespace).Update(context.TODO(), test.pod, metav1.UpdateOptions{})
2467+
2468+
// Manually set the deleted Pods' deletionTimestamp to non-nil.
2469+
for _, pod := range test.pods {
2470+
if deletedPodNames.Has(pod.Name) {
2471+
now := metav1.Now()
2472+
pod.DeletionTimestamp = &now
2473+
deletedPodNames.Delete(pod.Name)
2474+
}
2475+
}
2476+
24572477
// Call preempt again and make sure it doesn't preempt any more pods.
2458-
node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
2478+
node, err = scheduler.Preempt(context.Background(), prof, state, test.pod, failedNodeToStatusMap)
24592479
if err != nil {
24602480
t.Errorf("unexpected error in preemption: %v", err)
24612481
}
2462-
if len(node) != 0 && len(victims) > 0 {
2482+
if len(node) != 0 && len(deletedPodNames) > 0 {
24632483
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
24642484
}
24652485
close(stop)
@@ -2576,3 +2596,14 @@ func nodesToNodeInfos(nodes []*v1.Node, snapshot *internalcache.Snapshot) ([]*fr
25762596
}
25772597
return nodeInfos, nil
25782598
}
2599+
2600+
func mergeObjs(pod *v1.Pod, pods []*v1.Pod) []runtime.Object {
2601+
var objs []runtime.Object
2602+
if pod != nil {
2603+
objs = append(objs, pod)
2604+
}
2605+
for i := range pods {
2606+
objs = append(objs, pods[i])
2607+
}
2608+
return objs
2609+
}

0 commit comments

Comments
 (0)