Skip to content

Commit c7d0ed5

Browse files
committed
add integration test for job failure event delay and remove the unit test
1 parent 1b8bbca commit c7d0ed5

File tree

2 files changed

+92
-113
lines changed

2 files changed

+92
-113
lines changed

pkg/controller/job/job_controller_test.go

Lines changed: 0 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ import (
5656
_ "k8s.io/kubernetes/pkg/apis/core/install"
5757
"k8s.io/kubernetes/pkg/controller"
5858
"k8s.io/kubernetes/pkg/controller/job/metrics"
59-
"k8s.io/kubernetes/pkg/controller/job/util"
6059
"k8s.io/kubernetes/pkg/controller/testutil"
6160
"k8s.io/kubernetes/pkg/features"
6261
"k8s.io/utils/clock"
@@ -2880,114 +2879,6 @@ func TestSingleJobFailedCondition(t *testing.T) {
28802879

28812880
}
28822881

2883-
func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
2884-
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
2885-
logger, ctx := ktesting.NewTestContext(t)
2886-
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
2887-
job1.Name = "job1"
2888-
clientSet := fake.NewSimpleClientset(job1)
2889-
fakeClock := clocktesting.NewFakeClock(time.Now())
2890-
jm, informer := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
2891-
jm.podControl = &controller.RealPodControl{
2892-
KubeClient: clientSet,
2893-
Recorder: testutil.NewFakeRecorder(),
2894-
}
2895-
jm.podStoreSynced = alwaysReady
2896-
jm.jobStoreSynced = alwaysReady
2897-
2898-
err := informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
2899-
if err != nil {
2900-
t.Fatalf("Unexpected error when adding job to indexer %v", err)
2901-
}
2902-
// 1st reconcile should create a new pod
2903-
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2904-
if err != nil {
2905-
t.Fatalf("Unexpected error when syncing jobs %v", err)
2906-
}
2907-
2908-
podIndexer := informer.Core().V1().Pods().Informer().GetIndexer()
2909-
podList, err := clientSet.Tracker().List(
2910-
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2911-
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2912-
"default")
2913-
if err != nil {
2914-
t.Fatalf("Unexpected error when fetching pods %v", err)
2915-
}
2916-
// manually adding the just-created pod from fake clientset memory to informer cache because informer is not started.
2917-
// we are updating the pod status to succeeded which should update the job status to succeeded and remove the finalizer of the pod.
2918-
justCreatedPod := podList.(*v1.PodList).Items[0]
2919-
t.Logf("pod is %v\n", podList.(*v1.PodList).Items[0])
2920-
justCreatedPod.Status.Phase = v1.PodSucceeded
2921-
err = podIndexer.Add(&justCreatedPod)
2922-
if err != nil {
2923-
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
2924-
}
2925-
jm.addPod(logger, &justCreatedPod)
2926-
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2927-
if err != nil {
2928-
t.Fatalf("Unexpected error when syncing jobs %v", err)
2929-
}
2930-
2931-
// Verify that the job is updated as succeeded in the client set. However this status is not updated yet in the
2932-
// informer is not started
2933-
jobList, err := clientSet.Tracker().List(
2934-
schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"},
2935-
schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"},
2936-
"default")
2937-
if err != nil {
2938-
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
2939-
}
2940-
updatedJob := jobList.(*batch.JobList).Items[0]
2941-
if !util.IsJobSucceeded(&updatedJob) {
2942-
t.Fatalf("job status is not succeeded: %v", updatedJob)
2943-
}
2944-
2945-
// add the updated pod from the fake clientset memory to informer cache because informer is not started. This is to make
2946-
// sure the job controller informer cache has the latest pod status.
2947-
podList, err = clientSet.Tracker().List(
2948-
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2949-
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2950-
"default")
2951-
if err != nil {
2952-
t.Fatalf("Unexpected error when fetching pods %v", err)
2953-
}
2954-
t.Logf("pod is %v\n", podList.(*v1.PodList).Items[0])
2955-
updatedPod := podList.(*v1.PodList).Items[0]
2956-
updatedPod.Status.Phase = v1.PodSucceeded
2957-
err = podIndexer.Add(&updatedPod)
2958-
if err != nil {
2959-
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
2960-
}
2961-
2962-
// removing the just created pod from fake clientset memory but the pod will remain inside informer cache
2963-
// of the job controller. We are removing from the client set because in case of a bug if the job controller
2964-
// is trying to create the pod again it can succeed because it creates using the same name again.
2965-
err = clientSet.Tracker().Delete(
2966-
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2967-
"default", "")
2968-
if err != nil {
2969-
t.Fatalf("Unexpected error when deleting pod to indexer %v", err)
2970-
}
2971-
2972-
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2973-
if err != nil {
2974-
t.Fatalf("Unexpected error when syncing jobs %v", err)
2975-
}
2976-
time.Sleep(100 * time.Millisecond)
2977-
2978-
podList, err = clientSet.Tracker().List(
2979-
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2980-
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2981-
"default")
2982-
if err != nil {
2983-
t.Fatalf("Unexpected error when syncing jobs %v", err)
2984-
}
2985-
// no pod should be created. Here it is 0 because we had deleted the pod from the client set.
2986-
if len(podList.(*v1.PodList).Items) != 0 {
2987-
t.Errorf("expect no pods to be created but %v pods are created", len(podList.(*v1.PodList).Items))
2988-
}
2989-
}
2990-
29912882
func TestSyncJobComplete(t *testing.T) {
29922883
_, ctx := ktesting.NewTestContext(t)
29932884
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})

test/integration/job/job_test.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4050,11 +4050,11 @@ func TestNodeSelectorUpdate(t *testing.T) {
40504050

40514051
}
40524052

4053-
// TestDelayedJobUpdateEvent tests that a Job that only executes one Pod even when
4054-
// the job events are delayed. This test verfies the finishedJobStore is working
4055-
// correctly and preventing from job controller creating a new pod if the job complete
4053+
// TestDelayedJobSucceededUpdateEvent tests that a Job only creates one Pod even when
4054+
// the job success events are delayed. This test verfies the finishedJobStore is working
4055+
// correctly and preventing from job controller creating a new pod if the job success
40564056
// even is delayed.
4057-
func TestDelayedJobUpdateEvent(t *testing.T) {
4057+
func TestDelayedJobSucceededUpdateEvent(t *testing.T) {
40584058
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
40594059
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
40604060
closeFn, restConfig, clientSet, ns := setup(t, "simple")
@@ -4113,6 +4113,94 @@ func TestDelayedJobUpdateEvent(t *testing.T) {
41134113
}
41144114
}
41154115

4116+
// TestDelayedJobFailedUpdateEvent tests that a Job only creates one Pod even when
4117+
// the job failed events are delayed. This test verfies the finishedJobStore is working
4118+
// correctly and preventing from job controller creating a new pod if the job failed
4119+
// event is delayed.
4120+
func TestDelayedJobFailedUpdateEvent(t *testing.T) {
4121+
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
4122+
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
4123+
closeFn, restConfig, clientSet, ns := setup(t, "pod-failure-policy")
4124+
t.Cleanup(closeFn)
4125+
// the transform is used to introduce a delay for the job events. Since all the object have to go through
4126+
// transform func first before being added to the informer cache, this would serve as an indirect way to
4127+
// introduce watch event delay.
4128+
transformOpt := informers.WithTransform(cache.TransformFunc(func(obj interface{}) (interface{}, error) {
4129+
_, ok := obj.(*batchv1.Job)
4130+
if ok {
4131+
// This will make sure pod events are processed before the job events occur.
4132+
time.Sleep(2 * fastSyncJobBatchPeriod)
4133+
}
4134+
return obj, nil
4135+
}))
4136+
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4137+
t.Cleanup(func() {
4138+
cancel()
4139+
})
4140+
resetMetrics()
4141+
4142+
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
4143+
Spec: batchv1.JobSpec{
4144+
Template: v1.PodTemplateSpec{
4145+
Spec: v1.PodSpec{
4146+
Containers: []v1.Container{
4147+
{
4148+
Name: "main-container",
4149+
Image: "foo",
4150+
ImagePullPolicy: v1.PullIfNotPresent,
4151+
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
4152+
},
4153+
},
4154+
},
4155+
},
4156+
BackoffLimit: ptr.To[int32](0),
4157+
},
4158+
})
4159+
if err != nil {
4160+
t.Fatalf("Failed to create Job: %v", err)
4161+
}
4162+
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4163+
Active: 1,
4164+
Ready: ptr.To[int32](0),
4165+
Terminating: ptr.To[int32](0),
4166+
})
4167+
4168+
op := func(p *v1.Pod) bool {
4169+
p.Status = v1.PodStatus{
4170+
Phase: v1.PodFailed,
4171+
ContainerStatuses: []v1.ContainerStatus{
4172+
{
4173+
Name: "main-container",
4174+
State: v1.ContainerState{
4175+
Terminated: &v1.ContainerStateTerminated{
4176+
ExitCode: 5,
4177+
},
4178+
},
4179+
},
4180+
},
4181+
}
4182+
return true
4183+
}
4184+
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
4185+
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
4186+
}
4187+
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4188+
Active: 0,
4189+
Failed: 1,
4190+
Ready: ptr.To[int32](0),
4191+
Terminating: ptr.To[int32](0),
4192+
})
4193+
4194+
validateJobFailed(ctx, t, clientSet, jobObj)
4195+
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4196+
if err != nil {
4197+
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4198+
}
4199+
if len(jobPods) != 1 {
4200+
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4201+
}
4202+
}
4203+
41164204
type podsByStatus struct {
41174205
Active int
41184206
Ready *int32

0 commit comments

Comments
 (0)