Skip to content

Commit e601eb7

Browse files
committed
fix: run activate() only when fail
1 parent 4a084d5 commit e601eb7

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

pkg/scheduler/framework/preemption/preemption.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -492,10 +492,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
492492
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
493493
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
494494
defer func() {
495-
ev.mu.Lock()
496-
ev.preempting.Delete(pod.UID)
497-
ev.mu.Unlock()
498-
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
495+
if result == metrics.GoroutineResultError {
496+
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
497+
// So, we should move the Pod to the activeQ.
498+
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
499+
}
499500
}()
500501
defer cancel()
501502
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
@@ -512,15 +513,34 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
512513
}
513514

514515
if len(c.Victims().Pods) == 0 {
516+
ev.mu.Lock()
517+
delete(ev.preempting, pod.UID)
518+
ev.mu.Unlock()
519+
515520
return
516521
}
517522

518-
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName)
523+
// We can evict all victims in parallel, but the last one.
524+
// We have to remove the pod from the preempting map before the last one is evicted
525+
// because, otherwise, the pod removal might be notified to the scheduling queue before
526+
// we remove this pod from the preempting map,
527+
// and the pod could end up stucking at the unschedulable pod pool
528+
// by all the pod removal events being ignored.
529+
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
519530
if err := errCh.ReceiveError(); err != nil {
520531
logger.Error(err, "Error occurred during async preemption")
521532
result = metrics.GoroutineResultError
522533
}
523534

535+
ev.mu.Lock()
536+
delete(ev.preempting, pod.UID)
537+
ev.mu.Unlock()
538+
539+
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
540+
logger.Error(err, "Error occurred during async preemption")
541+
result = metrics.GoroutineResultError
542+
}
543+
524544
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
525545
}()
526546
}

pkg/scheduler/framework/preemption/preemption_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ func TestPrepareCandidate(t *testing.T) {
453453
expectedStatus *framework.Status
454454
// Only compared when async preemption is enabled.
455455
expectedPreemptingMap sets.Set[types.UID]
456+
expectedActivatedPods map[string]*v1.Pod
456457
}{
457458
{
458459
name: "no victims",
@@ -543,6 +544,7 @@ func TestPrepareCandidate(t *testing.T) {
543544
nodeNames: []string{node1Name},
544545
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
545546
expectedPreemptingMap: sets.New(types.UID("preemptor")),
547+
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
546548
},
547549
{
548550
name: "one victim, not-found victim error is ignored when deleting",
@@ -579,6 +581,7 @@ func TestPrepareCandidate(t *testing.T) {
579581
nodeNames: []string{node1Name},
580582
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
581583
expectedPreemptingMap: sets.New(types.UID("preemptor")),
584+
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
582585
},
583586
{
584587
name: "two victims without condition, one passes successfully and the second fails",
@@ -601,6 +604,7 @@ func TestPrepareCandidate(t *testing.T) {
601604
expectedDeletedPods: []string{"victim2"},
602605
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
603606
expectedPreemptingMap: sets.New(types.UID("preemptor")),
607+
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
604608
},
605609
}
606610

@@ -730,9 +734,12 @@ func TestPrepareCandidate(t *testing.T) {
730734
}
731735

732736
if asyncPreemptionEnabled {
733-
// Make sure the preemptor is activated regardless of the preemption result.
734-
if !reflect.DeepEqual(map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods) {
735-
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", map[string]*v1.Pod{tt.preemptor.Name: tt.preemptor}, fakeActivator.activatedPods)
737+
if tt.expectedActivatedPods != nil && !reflect.DeepEqual(tt.expectedActivatedPods, fakeActivator.activatedPods) {
738+
lastErrMsg = fmt.Sprintf("expected activated pods %v, got %v", tt.expectedActivatedPods, fakeActivator.activatedPods)
739+
return false, nil
740+
}
741+
if tt.expectedActivatedPods == nil && len(fakeActivator.activatedPods) != 0 {
742+
lastErrMsg = fmt.Sprintf("expected no activated pods, got %v", fakeActivator.activatedPods)
736743
return false, nil
737744
}
738745
}

0 commit comments

Comments
 (0)