Skip to content

Commit 4083c7d

Browse files
committed
Fix a flaky scheduler preemption e2e
- Use preemptor pod's Status.NominatedNodeName to signal success of the Preemption behavior - Optimize the test to eliminate unnecessary Pods creation - Increase timeout from 1 minute to 2 minutes
1 parent 1c51c44 commit 4083c7d

File tree

2 files changed

+102
-54
lines changed

2 files changed

+102
-54
lines changed

test/e2e/framework/replicaset/wait.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package replicaset
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
appsv1 "k8s.io/api/apps/v1"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,8 +44,14 @@ func WaitForReadyReplicaSet(c clientset.Interface, ns, name string) error {
4344

4445
// WaitForReplicaSetTargetAvailableReplicas waits for .status.availableReplicas of a RS to equal targetReplicaNum
4546
func WaitForReplicaSetTargetAvailableReplicas(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32) error {
47+
return WaitForReplicaSetTargetAvailableReplicasWithTimeout(c, replicaSet, targetReplicaNum, framework.PollShortTimeout)
48+
}
49+
50+
// WaitForReplicaSetTargetAvailableReplicasWithTimeout waits for .status.availableReplicas of a RS to equal targetReplicaNum
51+
// with given timeout.
52+
func WaitForReplicaSetTargetAvailableReplicasWithTimeout(c clientset.Interface, replicaSet *appsv1.ReplicaSet, targetReplicaNum int32, timeout time.Duration) error {
4653
desiredGeneration := replicaSet.Generation
47-
err := wait.PollImmediate(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
54+
err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
4855
rs, err := c.AppsV1().ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name, metav1.GetOptions{})
4956
if err != nil {
5057
return false, err

test/e2e/scheduling/preemption.go

Lines changed: 94 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ package scheduling
1919
import (
2020
"fmt"
2121
"strings"
22+
"sync/atomic"
2223
"time"
2324

25+
"k8s.io/apimachinery/pkg/util/wait"
2426
"k8s.io/client-go/tools/cache"
2527

2628
appsv1 "k8s.io/api/apps/v1"
@@ -289,7 +291,7 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
289291
nodeCopy := node.DeepCopy()
290292
// force it to update
291293
nodeCopy.ResourceVersion = "0"
292-
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("800")
294+
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
293295
node, err = cs.CoreV1().Nodes().UpdateStatus(nodeCopy)
294296
framework.ExpectNoError(err)
295297

@@ -307,8 +309,8 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
307309
}
308310
})
309311

310-
ginkgo.It("runs ReplicaSets to verify preemption running path [Flaky]", func() {
311-
podNamesSeen := make(map[string]struct{})
312+
ginkgo.It("runs ReplicaSets to verify preemption running path", func() {
313+
podNamesSeen := []int32{0, 0, 0}
312314
stopCh := make(chan struct{})
313315

314316
// create a pod controller to list/watch pod events from the test framework namespace
@@ -327,108 +329,121 @@ var _ = SIGDescribe("PreemptionExecutionPath", func() {
327329
cache.ResourceEventHandlerFuncs{
328330
AddFunc: func(obj interface{}) {
329331
if pod, ok := obj.(*v1.Pod); ok {
330-
podNamesSeen[pod.Name] = struct{}{}
332+
if strings.HasPrefix(pod.Name, "rs-pod1") {
333+
atomic.AddInt32(&podNamesSeen[0], 1)
334+
} else if strings.HasPrefix(pod.Name, "rs-pod2") {
335+
atomic.AddInt32(&podNamesSeen[1], 1)
336+
} else if strings.HasPrefix(pod.Name, "rs-pod3") {
337+
atomic.AddInt32(&podNamesSeen[2], 1)
338+
}
331339
}
332340
},
333341
},
334342
)
335343
go podController.Run(stopCh)
336344
defer close(stopCh)
337345

338-
// prepare four ReplicaSet
346+
// prepare three ReplicaSet
339347
rsConfs := []pauseRSConfig{
340348
{
341-
Replicas: int32(5),
349+
Replicas: int32(1),
342350
PodConfig: pausePodConfig{
343351
Name: "pod1",
344352
Namespace: ns,
345353
Labels: map[string]string{"name": "pod1"},
346354
PriorityClassName: "p1",
347355
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
348356
Resources: &v1.ResourceRequirements{
349-
Requests: v1.ResourceList{fakecpu: resource.MustParse("40")},
350-
Limits: v1.ResourceList{fakecpu: resource.MustParse("40")},
357+
Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
358+
Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
351359
},
352360
},
353361
},
354362
{
355-
Replicas: int32(4),
363+
Replicas: int32(1),
356364
PodConfig: pausePodConfig{
357365
Name: "pod2",
358366
Namespace: ns,
359367
Labels: map[string]string{"name": "pod2"},
360368
PriorityClassName: "p2",
361369
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
362370
Resources: &v1.ResourceRequirements{
363-
Requests: v1.ResourceList{fakecpu: resource.MustParse("50")},
364-
Limits: v1.ResourceList{fakecpu: resource.MustParse("50")},
371+
Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
372+
Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
365373
},
366374
},
367375
},
368376
{
369-
Replicas: int32(4),
377+
Replicas: int32(1),
370378
PodConfig: pausePodConfig{
371379
Name: "pod3",
372380
Namespace: ns,
373381
Labels: map[string]string{"name": "pod3"},
374382
PriorityClassName: "p3",
375383
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
376384
Resources: &v1.ResourceRequirements{
377-
Requests: v1.ResourceList{fakecpu: resource.MustParse("95")},
378-
Limits: v1.ResourceList{fakecpu: resource.MustParse("95")},
379-
},
380-
},
381-
},
382-
{
383-
Replicas: int32(1),
384-
PodConfig: pausePodConfig{
385-
Name: "pod4",
386-
Namespace: ns,
387-
Labels: map[string]string{"name": "pod4"},
388-
PriorityClassName: "p4",
389-
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
390-
Resources: &v1.ResourceRequirements{
391-
Requests: v1.ResourceList{fakecpu: resource.MustParse("400")},
392-
Limits: v1.ResourceList{fakecpu: resource.MustParse("400")},
385+
Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
386+
Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
393387
},
394388
},
395389
},
396390
}
397-
// create ReplicaSet{1,2,3} so as to occupy 780/800 fake resource
398-
rsNum := len(rsConfs)
399-
for i := 0; i < rsNum-1; i++ {
391+
// create ReplicaSet{1,2,3} so as to occupy 950/1000 fake resource
392+
for i := range rsConfs {
400393
runPauseRS(f, rsConfs[i])
401394
}
402395

403396
framework.Logf("pods created so far: %v", podNamesSeen)
404397
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
405398

406-
// create ReplicaSet4
407-
// if runPauseRS failed, it means ReplicaSet4 cannot be scheduled even after 1 minute
408-
// which is unacceptable
409-
runPauseRS(f, rsConfs[rsNum-1])
399+
// create a Preemptor Pod
400+
preemptorPodConf := pausePodConfig{
401+
Name: "pod4",
402+
Namespace: ns,
403+
Labels: map[string]string{"name": "pod4"},
404+
PriorityClassName: "p4",
405+
NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
406+
Resources: &v1.ResourceRequirements{
407+
Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
408+
Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
409+
},
410+
}
411+
preemptorPod := createPod(f, preemptorPodConf)
412+
waitForPreemptingWithTimeout(f, preemptorPod, framework.PodGetTimeout)
410413

411414
framework.Logf("pods created so far: %v", podNamesSeen)
412-
framework.Logf("length of pods created so far: %v", len(podNamesSeen))
413415

414-
// count pods number of ReplicaSet{1,2,3}, if it's more than expected replicas
415-
// then it denotes its pods have been over-preempted
416-
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once
417-
maxRSPodsSeen := []int{5 * 2, 4 * 2, 4}
418-
rsPodsSeen := []int{0, 0, 0}
419-
for podName := range podNamesSeen {
420-
if strings.HasPrefix(podName, "rs-pod1") {
421-
rsPodsSeen[0]++
422-
} else if strings.HasPrefix(podName, "rs-pod2") {
423-
rsPodsSeen[1]++
424-
} else if strings.HasPrefix(podName, "rs-pod3") {
425-
rsPodsSeen[2]++
416+
// count pods number of ReplicaSet{1,2,3}:
417+
// - if it's more than expected replicas, it denotes its pods have been over-preempted
418+
// - if it's less than expected replicas, it denotes its pods are under-preempted
419+
// "*2" means pods of ReplicaSet{1,2} are expected to be only preempted once.
420+
expectedRSPods := []int32{1 * 2, 1 * 2, 1}
421+
err := wait.Poll(framework.Poll, framework.PollShortTimeout, func() (bool, error) {
422+
for i := 0; i < len(podNamesSeen); i++ {
423+
got := atomic.LoadInt32(&podNamesSeen[i])
424+
if got < expectedRSPods[i] {
425+
framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
426+
return false, nil
427+
} else if got > expectedRSPods[i] {
428+
return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
429+
}
426430
}
431+
return true, nil
432+
})
433+
if err != nil {
434+
framework.Logf("pods created so far: %v", podNamesSeen)
435+
framework.Failf("failed pod observation expectations: %v", err)
427436
}
428-
for i, got := range rsPodsSeen {
429-
expected := maxRSPodsSeen[i]
430-
if got > expected {
431-
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expected, got)
437+
438+
// If logic continues to here, we should do a final check to ensure within a time period,
439+
// the state is stable; otherwise, pods may be over-preempted.
440+
time.Sleep(5 * time.Second)
441+
for i := 0; i < len(podNamesSeen); i++ {
442+
got := atomic.LoadInt32(&podNamesSeen[i])
443+
if got < expectedRSPods[i] {
444+
framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
445+
} else if got > expectedRSPods[i] {
446+
framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
432447
}
433448
}
434449
})
@@ -472,6 +487,32 @@ func createPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSe
472487

473488
func runPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
474489
rs := createPauseRS(f, conf)
475-
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicas(f.ClientSet, rs, conf.Replicas))
490+
framework.ExpectNoError(replicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
476491
return rs
477492
}
493+
494+
func createPod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
495+
namespace := conf.Namespace
496+
if len(namespace) == 0 {
497+
namespace = f.Namespace.Name
498+
}
499+
pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(initPausePod(f, conf))
500+
framework.ExpectNoError(err)
501+
return pod
502+
}
503+
504+
// waitForPreemptingWithTimeout verifies if 'pod' is preempting within 'timeout', specifically it checks
505+
// if the 'spec.NodeName' field of preemptor 'pod' has been set.
506+
func waitForPreemptingWithTimeout(f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
507+
err := wait.Poll(2*time.Second, timeout, func() (bool, error) {
508+
pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
509+
if err != nil {
510+
return false, err
511+
}
512+
if len(pod.Spec.NodeName) > 0 {
513+
return true, nil
514+
}
515+
return false, err
516+
})
517+
framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
518+
}

0 commit comments

Comments
 (0)