Skip to content

Commit a0d8278

Browse files
authored
Merge pull request kubernetes#76301 from xiuqiaoli/master
Remove FIFO scheduling queue and old pod backoff logic
2 parents 20b76a2 + 36effb4 commit a0d8278

File tree

7 files changed

+117
-190
lines changed

7 files changed

+117
-190
lines changed

pkg/scheduler/factory/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ go_library(
2121
"//pkg/scheduler/internal/queue:go_default_library",
2222
"//pkg/scheduler/plugins:go_default_library",
2323
"//pkg/scheduler/plugins/v1alpha1:go_default_library",
24-
"//pkg/scheduler/util:go_default_library",
2524
"//pkg/scheduler/volumebinder:go_default_library",
2625
"//staging/src/k8s.io/api/core/v1:go_default_library",
2726
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@@ -68,6 +67,7 @@ go_test(
6867
"//staging/src/k8s.io/api/core/v1:go_default_library",
6968
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
7069
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
70+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
7171
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
7272
"//staging/src/k8s.io/client-go/informers:go_default_library",
7373
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",

pkg/scheduler/factory/factory.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import (
5656
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
5757
"k8s.io/kubernetes/pkg/scheduler/plugins"
5858
pluginsv1alpha1 "k8s.io/kubernetes/pkg/scheduler/plugins/v1alpha1"
59-
"k8s.io/kubernetes/pkg/scheduler/util"
6059
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
6160
)
6261

@@ -456,7 +455,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
456455
c.percentageOfNodesToScore,
457456
)
458457

459-
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
460458
return &Config{
461459
SchedulerCache: c.schedulerCache,
462460
// The scheduler only needs to consider schedulable nodes.
@@ -470,7 +468,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
470468
return cache.WaitForCacheSync(c.StopEverything, c.scheduledPodsHasSynced)
471469
},
472470
NextPod: internalqueue.MakeNextPodFunc(c.podQueue),
473-
Error: MakeDefaultErrorFunc(c.client, podBackoff, c.podQueue, c.schedulerCache, c.StopEverything),
471+
Error: MakeDefaultErrorFunc(c.client, c.podQueue, c.schedulerCache, c.StopEverything),
474472
StopEverything: c.StopEverything,
475473
VolumeBinder: c.volumeBinder,
476474
SchedulingQueue: c.podQueue,
@@ -639,7 +637,7 @@ func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) core
639637
}
640638

641639
// MakeDefaultErrorFunc construct a function to handle pod scheduler error
642-
func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.PodBackoffMap, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
640+
func MakeDefaultErrorFunc(client clientset.Interface, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache, stopEverything <-chan struct{}) func(pod *v1.Pod, err error) {
643641
return func(pod *v1.Pod, err error) {
644642
if err == core.ErrNoNodesAvailable {
645643
klog.V(4).Infof("Unable to schedule %v/%v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
@@ -662,7 +660,6 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
662660
}
663661
}
664662

665-
backoff.CleanupPodsCompletesBackingoff()
666663
podSchedulingCycle := podQueue.SchedulingCycle()
667664
// Retry asynchronously.
668665
// Note that this is extremely rudimentary and we need a more real error handling path.
@@ -673,16 +670,9 @@ func MakeDefaultErrorFunc(client clientset.Interface, backoff *internalqueue.Pod
673670
Name: pod.Name,
674671
}
675672

676-
// When pod priority is enabled, we would like to place an unschedulable
677-
// pod in the unschedulable queue. This ensures that if the pod is nominated
678-
// to run on a node, scheduler takes the pod into account when running
679-
// predicates for the node.
680-
if !util.PodPriorityEnabled() {
681-
if !backoff.TryBackoffAndWait(podID, stopEverything) {
682-
klog.Warningf("Request for pod %v already in flight, abandoning", podID)
683-
return
684-
}
685-
}
673+
// An unschedulable pod will be placed in the unschedulable queue.
674+
// This ensures that if the pod is nominated to run on a node,
675+
// scheduler takes the pod into account when running predicates for the node.
686676
// Get the pod again; it may have changed/been scheduled already.
687677
getBackoff := initialGetBackoff
688678
for {

pkg/scheduler/factory/factory_test.go

Lines changed: 107 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/runtime"
29+
"k8s.io/apimachinery/pkg/util/clock"
2930
"k8s.io/apimachinery/pkg/util/sets"
3031
"k8s.io/client-go/informers"
3132
clientset "k8s.io/client-go/kubernetes"
@@ -252,48 +253,125 @@ func TestDefaultErrorFunc(t *testing.T) {
252253
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
253254
stopCh := make(chan struct{})
254255
defer close(stopCh)
255-
queue := &internalqueue.FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
256+
257+
timestamp := time.Now()
258+
queue := internalqueue.NewPriorityQueueWithClock(nil, clock.NewFakeClock(timestamp))
256259
schedulerCache := internalcache.New(30*time.Second, stopCh)
257-
podBackoff := internalqueue.NewPodBackoffMap(1*time.Second, 60*time.Second)
258-
errFunc := MakeDefaultErrorFunc(client, podBackoff, queue, schedulerCache, stopCh)
260+
errFunc := MakeDefaultErrorFunc(client, queue, schedulerCache, stopCh)
259261

262+
// Trigger error handling again to put the pod in unschedulable queue
260263
errFunc(testPod, nil)
261264

262-
for {
263-
// This is a terrible way to do this but I plan on replacing this
264-
// whole error handling system in the future. The test will time
265-
// out if something doesn't work.
266-
time.Sleep(10 * time.Millisecond)
267-
got, exists, _ := queue.Get(testPod)
268-
if !exists {
265+
// Try up to a minute to retrieve the error pod from priority queue
266+
foundPodFlag := false
267+
maxIterations := 10 * 60
268+
for i := 0; i < maxIterations; i++ {
269+
time.Sleep(100 * time.Millisecond)
270+
got := getPodfromPriorityQueue(queue, testPod)
271+
if got == nil {
269272
continue
270273
}
271-
requestReceived := false
272-
actions := client.Actions()
273-
for _, a := range actions {
274-
if a.GetVerb() == "get" {
275-
getAction, ok := a.(clienttesting.GetAction)
276-
if !ok {
277-
t.Errorf("Can't cast action object to GetAction interface")
278-
break
279-
}
280-
name := getAction.GetName()
281-
ns := a.GetNamespace()
282-
if name != "foo" || ns != "bar" {
283-
t.Errorf("Expected name %s namespace %s, got %s %s",
284-
"foo", "bar", name, ns)
285-
}
286-
requestReceived = true
287-
}
274+
275+
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
276+
277+
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
278+
t.Errorf("Expected %v, got %v", e, a)
288279
}
289-
if !requestReceived {
290-
t.Errorf("Get pod request not received")
280+
281+
foundPodFlag = true
282+
break
283+
}
284+
285+
if !foundPodFlag {
286+
t.Errorf("Failed to get pod from the unschedulable queue after waiting for a minute: %v", testPod)
287+
}
288+
289+
// Remove the pod from priority queue to test putting error
290+
// pod in backoff queue.
291+
queue.Delete(testPod)
292+
293+
// Trigger a move request
294+
queue.MoveAllToActiveQueue()
295+
296+
// Trigger error handling again to put the pod in backoff queue
297+
errFunc(testPod, nil)
298+
299+
foundPodFlag = false
300+
for i := 0; i < maxIterations; i++ {
301+
time.Sleep(100 * time.Millisecond)
302+
// The pod should be found from backoff queue at this time
303+
got := getPodfromPriorityQueue(queue, testPod)
304+
if got == nil {
305+
continue
291306
}
307+
308+
testClientGetPodRequest(client, t, testPod.Namespace, testPod.Name)
309+
292310
if e, a := testPod, got; !reflect.DeepEqual(e, a) {
293311
t.Errorf("Expected %v, got %v", e, a)
294312
}
313+
314+
foundPodFlag = true
295315
break
296316
}
317+
318+
if !foundPodFlag {
319+
t.Errorf("Failed to get pod from the backoff queue after waiting for a minute: %v", testPod)
320+
}
321+
}
322+
323+
// getPodfromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
324+
// the specific pod from the given priority queue. It returns the found pod in the priority queue.
325+
func getPodfromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
326+
podList := queue.PendingPods()
327+
if len(podList) == 0 {
328+
return nil
329+
}
330+
331+
queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
332+
if err != nil {
333+
return nil
334+
}
335+
336+
for _, foundPod := range podList {
337+
foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
338+
if err != nil {
339+
return nil
340+
}
341+
342+
if foundPodKey == queryPodKey {
343+
return foundPod
344+
}
345+
}
346+
347+
return nil
348+
}
349+
350+
// testClientGetPodRequest function provides a routine used by TestDefaultErrorFunc test.
351+
// It tests whether the fake client can receive request and correctly "get" the namespace
352+
// and name of the error pod.
353+
func testClientGetPodRequest(client *fake.Clientset, t *testing.T, podNs string, podName string) {
354+
requestReceived := false
355+
actions := client.Actions()
356+
for _, a := range actions {
357+
if a.GetVerb() == "get" {
358+
getAction, ok := a.(clienttesting.GetAction)
359+
if !ok {
360+
t.Errorf("Can't cast action object to GetAction interface")
361+
break
362+
}
363+
name := getAction.GetName()
364+
ns := a.GetNamespace()
365+
if name != podName || ns != podNs {
366+
t.Errorf("Expected name %s namespace %s, got %s %s",
367+
podName, podNs, name, ns)
368+
}
369+
requestReceived = true
370+
}
371+
}
372+
if !requestReceived {
373+
t.Errorf("Get pod request not received")
374+
}
297375
}
298376

299377
func TestBind(t *testing.T) {

pkg/scheduler/internal/queue/pod_backoff.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,20 +60,6 @@ func (pbm *PodBackoffMap) GetBackoffTime(nsPod ktypes.NamespacedName) (time.Time
6060
return backoffTime, true
6161
}
6262

63-
// TryBackoffAndWait tries to perform backoff for a non-preempting pod.
64-
// it is invoked from factory.go if util.PodPriorityEnabled() returns false.
65-
func (pbm *PodBackoffMap) TryBackoffAndWait(nsPod ktypes.NamespacedName, stop <-chan struct{}) bool {
66-
pbm.lock.RLock()
67-
defer pbm.lock.RUnlock()
68-
backoffDuration := pbm.calculateBackoffDuration(nsPod)
69-
select {
70-
case <-time.After(backoffDuration):
71-
return true
72-
case <-stop:
73-
return false
74-
}
75-
}
76-
7763
// calculateBackoffDuration is a helper function for calculating the backoffDuration
7864
// based on the number of attempts the pod has made.
7965
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {

pkg/scheduler/internal/queue/pod_backoff_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,3 @@ func TestClearPodBackoff(t *testing.T) {
9292
t.Errorf("Expected backoff of 1s for pod %s, got %s", podID, duration.String())
9393
}
9494
}
95-
96-
func TestTryBackoffAndWait(t *testing.T) {
97-
bpm := NewPodBackoffMap(1*time.Second, 60*time.Second)
98-
99-
stopCh := make(chan struct{})
100-
podID := ktypes.NamespacedName{Namespace: "ns", Name: "pod"}
101-
if !bpm.TryBackoffAndWait(podID, stopCh) {
102-
t.Error("Expected TryBackoffAndWait success for new pod, got failure.")
103-
}
104-
105-
close(stopCh)
106-
if bpm.TryBackoffAndWait(podID, stopCh) {
107-
t.Error("Expected TryBackoffAndWait failure with closed stopCh, got success.")
108-
}
109-
}

0 commit comments

Comments
 (0)