Skip to content

Commit 133a025

Browse files
committed
Move unschedulable Pod to internal schedulingQ synchronously
- use in-cache Pod instead of real-time Pod (by calling API server) to mark it as unschedulable in internal schedulingQ - remove the backoff logic as now we don't call API server - the whole logic is changed to a synchronous call
1 parent 9388755 commit 133a025

File tree

4 files changed

+142
-182
lines changed

4 files changed

+142
-182
lines changed

pkg/scheduler/factory.go

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ import (
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/fields"
3131
"k8s.io/apimachinery/pkg/runtime"
32-
"k8s.io/apimachinery/pkg/types"
33-
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3432
"k8s.io/apimachinery/pkg/util/sets"
3533
utilfeature "k8s.io/apiserver/pkg/util/feature"
3634
"k8s.io/client-go/informers"
@@ -57,11 +55,6 @@ import (
5755
"k8s.io/kubernetes/pkg/scheduler/profile"
5856
)
5957

60-
const (
61-
initialGetBackoff = 100 * time.Millisecond
62-
maximalGetBackoff = time.Minute
63-
)
64-
6558
// Binder knows how to write a binding.
6659
type Binder interface {
6760
Bind(binding *v1.Binding) error
@@ -205,7 +198,7 @@ func (c *Configurator) create() (*Scheduler, error) {
205198
Algorithm: algo,
206199
Profiles: profiles,
207200
NextPod: internalqueue.MakeNextPodFunc(podQueue),
208-
Error: MakeDefaultErrorFunc(c.client, podQueue, c.schedulerCache),
201+
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
209202
StopEverything: c.StopEverything,
210203
SchedulingQueue: podQueue,
211204
}, nil
@@ -475,7 +468,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
475468
}
476469

477470
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
478-
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
471+
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
479472
return func(podInfo *framework.QueuedPodInfo, err error) {
480473
pod := podInfo.Pod
481474
if err == core.ErrNoNodesAvailable {
@@ -500,40 +493,17 @@ func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.Sch
500493
klog.Errorf("Error scheduling %v/%v: %v; retrying", pod.Namespace, pod.Name, err)
501494
}
502495

503-
podSchedulingCycle := podQueue.SchedulingCycle()
504-
// Retry asynchronously.
505-
// Note that this is extremely rudimentary and we need a more real error handling path.
506-
go func() {
507-
defer utilruntime.HandleCrash()
508-
podID := types.NamespacedName{
509-
Namespace: pod.Namespace,
510-
Name: pod.Name,
511-
}
512-
513-
// Get the pod again; it may have changed/been scheduled already.
514-
getBackoff := initialGetBackoff
515-
for {
516-
pod, err := client.CoreV1().Pods(podID.Namespace).Get(context.TODO(), podID.Name, metav1.GetOptions{})
517-
if err == nil {
518-
if len(pod.Spec.NodeName) == 0 {
519-
podInfo.Pod = pod
520-
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podSchedulingCycle); err != nil {
521-
klog.Error(err)
522-
}
523-
}
524-
break
525-
}
526-
if apierrors.IsNotFound(err) {
527-
klog.Warningf("A pod %v no longer exists", podID)
528-
return
529-
}
530-
klog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
531-
if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
532-
getBackoff = maximalGetBackoff
533-
}
534-
time.Sleep(getBackoff)
535-
}
536-
}()
496+
// Check if the Pod exists in informer cache.
497+
cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
498+
if err != nil {
499+
klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err)
500+
return
501+
}
502+
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
503+
podInfo.Pod = cachedPod.DeepCopy()
504+
if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
505+
klog.Error(err)
506+
}
537507
}
538508
}
539509

0 commit comments

Comments
 (0)