Skip to content

Commit fb03382

Browse files
authored
Merge pull request kubernetes#128170 from sanposhiho/async-preemption
feature(KEP-4832): asynchronous preemption
2 parents 9660e5c + 105d489 commit fb03382

File tree

24 files changed

+2010
-727
lines changed

24 files changed

+2010
-727
lines changed

pkg/features/kube_features.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,14 @@ const (
580580
// which benefits to reduce the useless requeueing.
581581
SchedulerQueueingHints featuregate.Feature = "SchedulerQueueingHints"
582582

583+
// owner: @sanposhiho
584+
// kep: http://kep.k8s.io/4832
585+
// alpha: v1.32
586+
//
587+
// Running some expensive operation within the scheduler's preemption asynchronously,
588+
// which improves the scheduling latency when the preemption involves in.
589+
SchedulerAsyncPreemption featuregate.Feature = "SchedulerAsyncPreemption"
590+
583591
// owner: @atosatto @yuanchen8911
584592
// kep: http://kep.k8s.io/3902
585593
//

pkg/features/versioned_kube_features.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
633633
{Version: version.MustParse("1.29"), Default: false, PreRelease: featuregate.Alpha},
634634
},
635635

636+
SchedulerAsyncPreemption: {
637+
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Alpha},
638+
},
639+
636640
SchedulerQueueingHints: {
637641
{Version: version.MustParse("1.28"), Default: false, PreRelease: featuregate.Beta},
638642
{Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.Beta},

pkg/scheduler/apis/config/testing/defaults/defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var ExpandedPluginsV1 = &config.Plugins{
5252
PreEnqueue: config.PluginSet{
5353
Enabled: []config.Plugin{
5454
{Name: names.SchedulingGates},
55+
{Name: names.DefaultPreemption},
5556
},
5657
},
5758
QueueSort: config.PluginSet{

pkg/scheduler/backend/queue/scheduling_queue.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ type PreEnqueueCheck func(pod *v1.Pod) bool
9292
type SchedulingQueue interface {
9393
framework.PodNominator
9494
Add(logger klog.Logger, pod *v1.Pod)
95-
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
96-
// The passed-in pods are originally compiled from plugins that want to activate Pods,
97-
// by injecting the pods through a reserved CycleState struct (PodsToActivate).
95+
// Activate moves the given pods to activeQ.
96+
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
97+
// the wildcard event is registered so that the pod will be requeued when it comes back.
98+
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
99+
// Activate would ignore the pod.
98100
Activate(logger klog.Logger, pods map[string]*v1.Pod)
99101
// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
100102
// The podSchedulingCycle represents the current scheduling cycle number which can be
@@ -411,9 +413,22 @@ func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework
411413
}
412414

413415
if event.IsWildCard() {
416+
// If the wildcard event has a Pod in newObj,
417+
// that indicates that the event wants to be effective for the Pod only.
418+
// Specifically, EventForceActivate could have a target Pod in newObj.
419+
if newObj != nil {
420+
if pod, ok := newObj.(*v1.Pod); !ok || pod.UID != pInfo.Pod.UID {
421+
// This wildcard event is not for this Pod.
422+
if ok {
423+
logger.V(6).Info("Not worth requeuing because the event is wildcard, but for another pod", "pod", klog.KObj(pInfo.Pod), "event", event.Label(), "newObj", klog.KObj(pod))
424+
}
425+
return queueSkip
426+
}
427+
}
428+
414429
// If the wildcard event is special one as someone wants to force all Pods to move to activeQ/backoffQ.
415430
// We return queueAfterBackoff in this case, while resetting all blocked plugins.
416-
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod))
431+
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod), "event", event.Label())
417432
return queueAfterBackoff
418433
}
419434

@@ -590,7 +605,11 @@ func (p *PriorityQueue) Add(logger klog.Logger, pod *v1.Pod) {
590605
}
591606
}
592607

593-
// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.
608+
// Activate moves the given pods to activeQ.
609+
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
610+
// the wildcard event is registered so that the pod will be requeued when it comes back.
611+
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
612+
// Activate would ignore the pod.
594613
func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
595614
p.lock.Lock()
596615
defer p.lock.Unlock()
@@ -599,7 +618,15 @@ func (p *PriorityQueue) Activate(logger klog.Logger, pods map[string]*v1.Pod) {
599618
for _, pod := range pods {
600619
if p.activate(logger, pod) {
601620
activated = true
621+
continue
602622
}
623+
624+
// If this pod is in-flight, register the activation event (for when QHint is enabled) or update moveRequestCycle (for when QHints is disabled)
625+
// so that the pod will be requeued when it comes back.
626+
// Specifically in the in-tree plugins, this is for the scenario with the preemption plugin
627+
// where the async preemption API calls are all done or fail at some point before the Pod comes back to the queue.
628+
p.activeQ.addEventsIfPodInFlight(nil, pod, []framework.ClusterEvent{framework.EventForceActivate})
629+
p.moveRequestCycle = p.activeQ.schedulingCycle()
603630
}
604631

605632
if activated {

pkg/scheduler/backend/queue/scheduling_queue_test.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,13 +1294,17 @@ func TestPriorityQueue_Delete(t *testing.T) {
12941294
}
12951295

12961296
func TestPriorityQueue_Activate(t *testing.T) {
1297+
metrics.Register()
12971298
tests := []struct {
12981299
name string
12991300
qPodInfoInUnschedulablePods []*framework.QueuedPodInfo
13001301
qPodInfoInPodBackoffQ []*framework.QueuedPodInfo
13011302
qPodInActiveQ []*v1.Pod
13021303
qPodInfoToActivate *framework.QueuedPodInfo
1304+
qPodInInFlightPod *v1.Pod
1305+
expectedInFlightEvent *clusterEvent
13031306
want []*framework.QueuedPodInfo
1307+
qHintEnabled bool
13041308
}{
13051309
{
13061310
name: "pod already in activeQ",
@@ -1313,6 +1317,21 @@ func TestPriorityQueue_Activate(t *testing.T) {
13131317
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
13141318
want: []*framework.QueuedPodInfo{},
13151319
},
1320+
{
1321+
name: "[QHint] pod not in unschedulablePods/podBackoffQ but in-flight",
1322+
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
1323+
qPodInInFlightPod: highPriNominatedPodInfo.Pod,
1324+
expectedInFlightEvent: &clusterEvent{oldObj: (*v1.Pod)(nil), newObj: highPriNominatedPodInfo.Pod, event: framework.EventForceActivate},
1325+
want: []*framework.QueuedPodInfo{},
1326+
qHintEnabled: true,
1327+
},
1328+
{
1329+
name: "[QHint] pod not in unschedulablePods/podBackoffQ and not in-flight",
1330+
qPodInfoToActivate: &framework.QueuedPodInfo{PodInfo: highPriNominatedPodInfo},
1331+
qPodInInFlightPod: medPriorityPodInfo.Pod, // different pod is in-flight
1332+
want: []*framework.QueuedPodInfo{},
1333+
qHintEnabled: true,
1334+
},
13161335
{
13171336
name: "pod in unschedulablePods",
13181337
qPodInfoInUnschedulablePods: []*framework.QueuedPodInfo{{PodInfo: highPriNominatedPodInfo}},
@@ -1329,12 +1348,30 @@ func TestPriorityQueue_Activate(t *testing.T) {
13291348

13301349
for _, tt := range tests {
13311350
t.Run(tt.name, func(t *testing.T) {
1351+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, tt.qHintEnabled)
13321352
var objs []runtime.Object
13331353
logger, ctx := ktesting.NewTestContext(t)
13341354
ctx, cancel := context.WithCancel(ctx)
13351355
defer cancel()
13361356
q := NewTestQueueWithObjects(ctx, newDefaultQueueSort(), objs)
13371357

1358+
if tt.qPodInInFlightPod != nil {
1359+
// Put -> Pop the Pod to make it registered in inFlightPods.
1360+
q.activeQ.underLock(func(unlockedActiveQ unlockedActiveQueuer) {
1361+
unlockedActiveQ.AddOrUpdate(newQueuedPodInfoForLookup(tt.qPodInInFlightPod))
1362+
})
1363+
p, err := q.activeQ.pop(logger)
1364+
if err != nil {
1365+
t.Fatalf("Pop failed: %v", err)
1366+
}
1367+
if p.Pod.Name != tt.qPodInInFlightPod.Name {
1368+
t.Errorf("Unexpected popped pod: %v", p.Pod.Name)
1369+
}
1370+
if len(q.activeQ.listInFlightEvents()) != 1 {
1371+
t.Fatal("Expected the pod to be recorded in in-flight events, but it doesn't")
1372+
}
1373+
}
1374+
13381375
// Prepare activeQ/unschedulablePods/podBackoffQ according to the table
13391376
for _, qPod := range tt.qPodInActiveQ {
13401377
q.Add(logger, qPod)
@@ -1353,7 +1390,29 @@ func TestPriorityQueue_Activate(t *testing.T) {
13531390

13541391
// Check the result after activation by the length of activeQ
13551392
if wantLen := len(tt.want); q.activeQ.len() != wantLen {
1356-
t.Errorf("length compare: want %v, got %v", wantLen, q.activeQ.len())
1393+
t.Fatalf("length compare: want %v, got %v", wantLen, q.activeQ.len())
1394+
}
1395+
1396+
if tt.expectedInFlightEvent != nil {
1397+
if len(q.activeQ.listInFlightEvents()) != 2 {
1398+
t.Fatalf("Expected two in-flight event to be recorded, but got %v events", len(q.activeQ.listInFlightEvents()))
1399+
}
1400+
found := false
1401+
for _, e := range q.activeQ.listInFlightEvents() {
1402+
event, ok := e.(*clusterEvent)
1403+
if !ok {
1404+
continue
1405+
}
1406+
1407+
if d := cmp.Diff(tt.expectedInFlightEvent, event, cmpopts.EquateComparable(clusterEvent{})); d != "" {
1408+
t.Fatalf("Unexpected in-flight event (-want, +got):\n%s", d)
1409+
}
1410+
found = true
1411+
}
1412+
1413+
if !found {
1414+
t.Fatalf("Expected in-flight event to be recorded, but it wasn't.")
1415+
}
13571416
}
13581417

13591418
// Check if the specific pod exists in activeQ
@@ -3779,6 +3838,7 @@ func mustNewPodInfo(pod *v1.Pod) *framework.PodInfo {
37793838

37803839
// Test_isPodWorthRequeuing tests isPodWorthRequeuing function.
37813840
func Test_isPodWorthRequeuing(t *testing.T) {
3841+
metrics.Register()
37823842
count := 0
37833843
queueHintReturnQueue := func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
37843844
count++
@@ -3857,11 +3917,37 @@ func Test_isPodWorthRequeuing(t *testing.T) {
38573917
},
38583918
event: framework.EventUnschedulableTimeout,
38593919
oldObj: nil,
3860-
newObj: st.MakeNode().Obj(),
3920+
newObj: nil,
38613921
expected: queueAfterBackoff,
38623922
expectedExecutionCount: 0,
38633923
queueingHintMap: QueueingHintMapPerProfile{},
38643924
},
3925+
{
3926+
name: "return Queue when the event is wildcard and the wildcard targets the pod to be requeued right now",
3927+
podInfo: &framework.QueuedPodInfo{
3928+
UnschedulablePlugins: sets.New("fooPlugin1"),
3929+
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
3930+
},
3931+
event: framework.EventForceActivate,
3932+
oldObj: nil,
3933+
newObj: st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj(),
3934+
expected: queueAfterBackoff,
3935+
expectedExecutionCount: 0,
3936+
queueingHintMap: QueueingHintMapPerProfile{},
3937+
},
3938+
{
3939+
name: "return Skip when the event is wildcard, but the wildcard targets a different pod",
3940+
podInfo: &framework.QueuedPodInfo{
3941+
UnschedulablePlugins: sets.New("fooPlugin1"),
3942+
PodInfo: mustNewPodInfo(st.MakePod().Name("pod1").Namespace("ns1").UID("1").Obj()),
3943+
},
3944+
event: framework.EventForceActivate,
3945+
oldObj: nil,
3946+
newObj: st.MakePod().Name("pod-different").Namespace("ns2").UID("2").Obj(),
3947+
expected: queueSkip,
3948+
expectedExecutionCount: 0,
3949+
queueingHintMap: QueueingHintMapPerProfile{},
3950+
},
38653951
{
38663952
name: "interprets Queue from the Pending plugin as queueImmediately",
38673953
podInfo: &framework.QueuedPodInfo{

pkg/scheduler/framework/events.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const (
3434
// ForceActivate is the event when a pod is moved from unschedulablePods/backoffQ
3535
// to activeQ. Usually it's triggered by plugin implementations.
3636
ForceActivate = "ForceActivate"
37+
// UnschedulableTimeout is the event when a pod is moved from unschedulablePods
38+
// due to the timeout specified at pod-max-in-unschedulable-pods-duration.
39+
UnschedulableTimeout = "UnschedulableTimeout"
3740
)
3841

3942
var (
@@ -50,7 +53,9 @@ var (
5053
// EventUnscheduledPodDelete is the event when an unscheduled pod is deleted.
5154
EventUnscheduledPodDelete = ClusterEvent{Resource: unschedulablePod, ActionType: Delete}
5255
// EventUnschedulableTimeout is the event when a pod stays in unschedulable for longer than timeout.
53-
EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: "UnschedulableTimeout"}
56+
EventUnschedulableTimeout = ClusterEvent{Resource: WildCard, ActionType: All, label: UnschedulableTimeout}
57+
// EventForceActivate is the event when a pod is moved from unschedulablePods/backoffQ to activeQ.
58+
EventForceActivate = ClusterEvent{Resource: WildCard, ActionType: All, label: ForceActivate}
5459
)
5560

5661
// PodSchedulingPropertiesChange interprets the update of a pod and returns corresponding UpdatePodXYZ event(s).

pkg/scheduler/framework/interface.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,8 @@ type Framework interface {
770770

771771
// SetPodNominator sets the PodNominator
772772
SetPodNominator(nominator PodNominator)
773+
// SetPodActivator sets the PodActivator
774+
SetPodActivator(activator PodActivator)
773775

774776
// Close calls Close method of each plugin.
775777
Close() error
@@ -783,6 +785,8 @@ type Handle interface {
783785
PodNominator
784786
// PluginsRunner abstracts operations to run some plugins.
785787
PluginsRunner
788+
// PodActivator abstracts operations in the scheduling queue.
789+
PodActivator
786790
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
787791
// is taken at the beginning of a scheduling cycle and remains unchanged until
788792
// a pod finishes "Permit" point.
@@ -896,6 +900,16 @@ func (ni *NominatingInfo) Mode() NominatingMode {
896900
return ni.NominatingMode
897901
}
898902

903+
// PodActivator abstracts operations in the scheduling queue.
904+
type PodActivator interface {
905+
// Activate moves the given pods to activeQ.
906+
// If a pod isn't found in unschedulablePods or backoffQ and it's in-flight,
907+
// the wildcard event is registered so that the pod will be requeued when it comes back.
908+
// But, if a pod isn't found in unschedulablePods or backoffQ and it's not in-flight (i.e., completely unknown pod),
909+
// Activate would ignore the pod.
910+
Activate(logger klog.Logger, pods map[string]*v1.Pod)
911+
}
912+
899913
// PodNominator abstracts operations to maintain nominated Pods.
900914
type PodNominator interface {
901915
// AddNominatedPod adds the given pod to the nominator or

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ type DefaultPreemption struct {
5353
args config.DefaultPreemptionArgs
5454
podLister corelisters.PodLister
5555
pdbLister policylisters.PodDisruptionBudgetLister
56+
Evaluator *preemption.Evaluator
5657
}
5758

5859
var _ framework.PostFilterPlugin = &DefaultPreemption{}
60+
var _ framework.PreEnqueuePlugin = &DefaultPreemption{}
5961

6062
// Name returns name of the plugin. It is used in logs, etc.
6163
func (pl *DefaultPreemption) Name() string {
@@ -71,13 +73,19 @@ func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feat
7173
if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
7274
return nil, err
7375
}
76+
77+
podLister := fh.SharedInformerFactory().Core().V1().Pods().Lister()
78+
pdbLister := getPDBLister(fh.SharedInformerFactory())
79+
7480
pl := DefaultPreemption{
7581
fh: fh,
7682
fts: fts,
7783
args: *args,
78-
podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
79-
pdbLister: getPDBLister(fh.SharedInformerFactory()),
84+
podLister: podLister,
85+
pdbLister: pdbLister,
8086
}
87+
pl.Evaluator = preemption.NewEvaluator(Name, fh, &pl, fts.EnableAsyncPreemption)
88+
8189
return &pl, nil
8290
}
8391

@@ -87,23 +95,32 @@ func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.Cy
8795
metrics.PreemptionAttempts.Inc()
8896
}()
8997

90-
pe := preemption.Evaluator{
91-
PluginName: names.DefaultPreemption,
92-
Handler: pl.fh,
93-
PodLister: pl.podLister,
94-
PdbLister: pl.pdbLister,
95-
State: state,
96-
Interface: pl,
97-
}
98-
99-
result, status := pe.Preempt(ctx, pod, m)
98+
result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
10099
msg := status.Message()
101100
if len(msg) > 0 {
102101
return result, framework.NewStatus(status.Code(), "preemption: "+msg)
103102
}
104103
return result, status
105104
}
106105

106+
func (pl *DefaultPreemption) PreEnqueue(ctx context.Context, p *v1.Pod) *framework.Status {
107+
if !pl.fts.EnableAsyncPreemption {
108+
return nil
109+
}
110+
if pl.Evaluator.IsPodRunningPreemption(p.GetUID()) {
111+
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "waiting for the preemption for this pod to be finished")
112+
}
113+
return nil
114+
}
115+
116+
// EventsToRegister returns the possible events that may make a Pod
117+
// failed by this plugin schedulable.
118+
func (pl *DefaultPreemption) EventsToRegister(_ context.Context) ([]framework.ClusterEventWithHint, error) {
119+
// The plugin moves the preemptor Pod to acviteQ/backoffQ once the preemption API calls are all done,
120+
// and we don't need to move the Pod with any events.
121+
return nil, nil
122+
}
123+
107124
// calculateNumCandidates returns the number of candidates the FindCandidates
108125
// method must produce from dry running based on the constraints given by
109126
// <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of

0 commit comments

Comments
 (0)