Skip to content

Commit 27caa6e

Browse files
committed
merge pod condition update with setting nominated node name in the scheduler
1 parent 56ad0ce commit 27caa6e

File tree

2 files changed

+101
-124
lines changed

2 files changed

+101
-124
lines changed

pkg/scheduler/scheduler.go

Lines changed: 55 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,12 @@ const (
5656
pluginMetricsSamplePercent = 10
5757
)
5858

59-
// podConditionUpdater updates the condition of a pod based on the passed
60-
// PodCondition
61-
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
62-
type podConditionUpdater interface {
63-
update(pod *v1.Pod, podCondition *v1.PodCondition) error
64-
}
65-
6659
// PodPreemptor has methods needed to delete a pod and to update 'NominatedPod'
6760
// field of the preemptor pod.
6861
// TODO (ahmad-diaa): Remove type and replace it with scheduler methods
6962
type podPreemptor interface {
7063
getUpdatedPod(pod *v1.Pod) (*v1.Pod, error)
7164
deletePod(pod *v1.Pod) error
72-
setNominatedNodeName(pod *v1.Pod, nominatedNode string) error
7365
removeNominatedNodeName(pod *v1.Pod) error
7466
}
7567

@@ -81,10 +73,6 @@ type Scheduler struct {
8173
SchedulerCache internalcache.Cache
8274

8375
Algorithm core.ScheduleAlgorithm
84-
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
85-
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
86-
// handler so that binding and setting PodCondition it is atomic.
87-
podConditionUpdater podConditionUpdater
8876
// PodPreemptor is used to evict pods and update 'NominatedNode' field of
8977
// the preemptor pod.
9078
podPreemptor podPreemptor
@@ -112,6 +100,8 @@ type Scheduler struct {
112100
Profiles profile.Map
113101

114102
scheduledPodsHasSynced func() bool
103+
104+
client clientset.Interface
115105
}
116106

117107
// Cache returns the cache in scheduler for test to check the data in scheduler.
@@ -312,7 +302,7 @@ func New(client clientset.Interface,
312302
// Additional tweaks to the config produced by the configurator.
313303
sched.DisablePreemption = options.disablePreemption
314304
sched.StopEverything = stopEverything
315-
sched.podConditionUpdater = &podConditionUpdaterImpl{client}
305+
sched.client = client
316306
sched.podPreemptor = &podPreemptorImpl{client}
317307
sched.scheduledPodsHasSynced = podInformer.Informer().HasSynced
318308

@@ -366,21 +356,44 @@ func (sched *Scheduler) Run(ctx context.Context) {
366356
sched.SchedulingQueue.Close()
367357
}
368358

369-
// recordFailedSchedulingEvent records an event for the pod that indicates the
370-
// pod has failed to schedule.
371-
// NOTE: This function modifies "pod". "pod" should be copied before being passed.
372-
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, message string) {
359+
// recordSchedulingFailure records an event for the pod that indicates the
360+
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
361+
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
373362
sched.Error(podInfo, err)
363+
364+
// Update the scheduling queue with the nominated pod information. Without
365+
// this, there would be a race condition between the next scheduling cycle
366+
// and the time the scheduler receives a Pod Update for the nominated pod.
367+
// Here we check for nil only for tests.
368+
if sched.SchedulingQueue != nil {
369+
sched.SchedulingQueue.AddNominatedPod(podInfo.Pod, nominatedNode)
370+
}
371+
374372
pod := podInfo.Pod
375-
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
376-
if err := sched.podConditionUpdater.update(pod, &v1.PodCondition{
373+
prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", err.Error())
374+
if err := updatePod(sched.client, pod, &v1.PodCondition{
377375
Type: v1.PodScheduled,
378376
Status: v1.ConditionFalse,
379377
Reason: reason,
380378
Message: err.Error(),
381-
}); err != nil {
382-
klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
379+
}, nominatedNode); err != nil {
380+
klog.Errorf("Error updating pod %s/%s: %v", pod.Namespace, pod.Name, err)
381+
}
382+
}
383+
384+
func updatePod(client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatedNode string) error {
385+
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
386+
podCopy := pod.DeepCopy()
387+
// NominatedNodeName is updated only if we are trying to set it, and the value is
388+
// different from the existing one.
389+
if !podutil.UpdatePodCondition(&podCopy.Status, condition) &&
390+
(len(nominatedNode) == 0 || pod.Status.NominatedNodeName == nominatedNode) {
391+
return nil
392+
}
393+
if nominatedNode != "" {
394+
podCopy.Status.NominatedNodeName = nominatedNode
383395
}
396+
return patchPod(client, pod, podCopy)
384397
}
385398

386399
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
@@ -399,19 +412,6 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
399412
return "", err
400413
}
401414
if len(nodeName) != 0 {
402-
// Update the scheduling queue with the nominated pod information. Without
403-
// this, there would be a race condition between the next scheduling cycle
404-
// and the time the scheduler receives a Pod Update for the nominated pod.
405-
sched.SchedulingQueue.AddNominatedPod(preemptor, nodeName)
406-
407-
// Make a call to update nominated node name of the pod on the API server.
408-
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
409-
if err != nil {
410-
klog.Errorf("Error in preemption process. Cannot set 'NominatedNodeName' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
411-
sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
412-
return "", err
413-
}
414-
415415
for _, victim := range victims {
416416
if err := sched.podPreemptor.deletePod(victim); err != nil {
417417
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
@@ -549,13 +549,14 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
549549
// preempt, with the expectation that the next time the pod is tried for scheduling it
550550
// will fit due to the preemption. It is also possible that a different pod will schedule
551551
// into the resources that were preempted, but this is harmless.
552+
nominatedNode := ""
552553
if fitError, ok := err.(*core.FitError); ok {
553554
if sched.DisablePreemption {
554555
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
555556
" No preemption is performed.")
556557
} else {
557558
preemptionStartTime := time.Now()
558-
sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
559+
nominatedNode, _ = sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
559560
metrics.PreemptionAttempts.Inc()
560561
metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
561562
metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
@@ -571,7 +572,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
571572
klog.Errorf("error selecting node for pod: %v", err)
572573
metrics.PodScheduleErrors.Inc()
573574
}
574-
sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
575+
sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
575576
return
576577
}
577578
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
@@ -582,7 +583,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
582583

583584
// Run "reserve" plugins.
584585
if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
585-
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
586+
sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, "")
586587
metrics.PodScheduleErrors.Inc()
587588
return
588589
}
@@ -595,7 +596,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
595596
// This relies on the fact that Error will check if the pod has been bound
596597
// to a node and if so will not add it back to the unscheduled pods queue
597598
// (otherwise this would cause an infinite loop).
598-
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
599+
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, "")
599600
metrics.PodScheduleErrors.Inc()
600601
// trigger un-reserve plugins to clean up state associated with the reserved Pod
601602
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
@@ -618,7 +619,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
618619
}
619620
// One of the plugins returned status different than success or wait.
620621
prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
621-
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
622+
sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, "")
622623
return
623624
}
624625

@@ -644,7 +645,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
644645
}
645646
// trigger un-reserve plugins to clean up state associated with the reserved Pod
646647
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
647-
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
648+
sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
648649
return
649650
}
650651

@@ -659,7 +660,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
659660
}
660661
// trigger un-reserve plugins to clean up state associated with the reserved Pod
661662
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
662-
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
663+
sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, "")
663664
return
664665
}
665666

@@ -669,7 +670,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
669670
metrics.PodScheduleErrors.Inc()
670671
// trigger un-reserve plugins to clean up state associated with the reserved Pod
671672
prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
672-
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
673+
sched.recordSchedulingFailure(prof, assumedPodInfo, fmt.Errorf("Binding rejected: %v", err), SchedulerError, "")
673674
} else {
674675
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
675676
if klog.V(2).Enabled() {
@@ -713,31 +714,6 @@ func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool
713714
return false
714715
}
715716

716-
type podConditionUpdaterImpl struct {
717-
Client clientset.Interface
718-
}
719-
720-
func (p *podConditionUpdaterImpl) update(pod *v1.Pod, condition *v1.PodCondition) error {
721-
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s, Reason=%s)", pod.Namespace, pod.Name, condition.Type, condition.Status, condition.Reason)
722-
oldData, err := json.Marshal(pod)
723-
if err != nil {
724-
return err
725-
}
726-
if !podutil.UpdatePodCondition(&pod.Status, condition) {
727-
return nil
728-
}
729-
newData, err := json.Marshal(pod)
730-
if err != nil {
731-
return err
732-
}
733-
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
734-
if err != nil {
735-
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
736-
}
737-
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
738-
return err
739-
}
740-
741717
type podPreemptorImpl struct {
742718
Client clientset.Interface
743719
}
@@ -750,36 +726,33 @@ func (p *podPreemptorImpl) deletePod(pod *v1.Pod) error {
750726
return p.Client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
751727
}
752728

753-
func (p *podPreemptorImpl) setNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error {
754-
klog.V(3).Infof("Setting nominated node name for %s/%s to \"%s\"", pod.Namespace, pod.Name, nominatedNodeName)
755-
if pod.Status.NominatedNodeName == nominatedNodeName {
729+
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
730+
if len(pod.Status.NominatedNodeName) == 0 {
756731
return nil
757732
}
758733
podCopy := pod.DeepCopy()
759-
oldData, err := json.Marshal(podCopy)
734+
podCopy.Status.NominatedNodeName = ""
735+
return patchPod(p.Client, pod, podCopy)
736+
}
737+
738+
func patchPod(client clientset.Interface, old *v1.Pod, new *v1.Pod) error {
739+
oldData, err := json.Marshal(old)
760740
if err != nil {
761741
return err
762742
}
763-
podCopy.Status.NominatedNodeName = nominatedNodeName
764-
newData, err := json.Marshal(podCopy)
743+
744+
newData, err := json.Marshal(new)
765745
if err != nil {
766746
return err
767747
}
768748
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Pod{})
769749
if err != nil {
770-
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", pod.Namespace, pod.Name, err)
750+
return fmt.Errorf("failed to create merge patch for pod %q/%q: %v", old.Namespace, old.Name, err)
771751
}
772-
_, err = p.Client.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
752+
_, err = client.CoreV1().Pods(old.Namespace).Patch(context.TODO(), old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
773753
return err
774754
}
775755

776-
func (p *podPreemptorImpl) removeNominatedNodeName(pod *v1.Pod) error {
777-
if len(pod.Status.NominatedNodeName) == 0 {
778-
return nil
779-
}
780-
return p.setNominatedNodeName(pod, "")
781-
}
782-
783756
func defaultAlgorithmSourceProviderName() *string {
784757
provider := schedulerapi.SchedulerDefaultProviderName
785758
return &provider

0 commit comments

Comments
 (0)