Skip to content

Commit 1b8bbca

Browse files
committed
Add integration test
1 parent 73afab1 commit 1b8bbca

File tree

3 files changed

+90
-21
lines changed

3 files changed

+90
-21
lines changed

pkg/controller/job/job_controller.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
6161

6262
var (
63-
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
64-
syncJobBatchPeriod = time.Second
63+
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job. Exported for tests.
64+
SyncJobBatchPeriod = time.Second
6565
// DefaultJobApiBackOff is the default API backoff period. Exported for tests.
6666
DefaultJobApiBackOff = time.Second
6767
// MaxJobApiBackOff is the max API backoff period. Exported for tests.
@@ -574,16 +574,16 @@ func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interfac
574574
// - Job status update
575575
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
576576
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
577-
jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod)
577+
jm.enqueueSyncJobInternal(logger, obj, SyncJobBatchPeriod)
578578
}
579579

580580
// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
581581
// custom delay, but not smaller than the batching delay.
582582
// It is used when pod recreations are delayed due to pod failures.
583583
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
584584
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
585-
if delay < syncJobBatchPeriod {
586-
delay = syncJobBatchPeriod
585+
if delay < SyncJobBatchPeriod {
586+
delay = SyncJobBatchPeriod
587587
}
588588
jm.enqueueSyncJobInternal(logger, obj, delay)
589589
}

pkg/controller/job/job_controller_test.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2881,7 +2881,7 @@ func TestSingleJobFailedCondition(t *testing.T) {
28812881
}
28822882

28832883
func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
2884-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
2884+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
28852885
logger, ctx := ktesting.NewTestContext(t)
28862886
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
28872887
job1.Name = "job1"
@@ -2928,6 +2928,8 @@ func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
29282928
t.Fatalf("Unexpected error when syncing jobs %v", err)
29292929
}
29302930

2931+
// Verify that the job is updated as succeeded in the client set. However this status is not updated yet in the
2932+
// informer is not started
29312933
jobList, err := clientSet.Tracker().List(
29322934
schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"},
29332935
schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"},
@@ -2940,7 +2942,8 @@ func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
29402942
t.Fatalf("job status is not succeeded: %v", updatedJob)
29412943
}
29422944

2943-
// add the updated pod from the fake clientset memory to informer cache because informer is not started.
2945+
// add the updated pod from the fake clientset memory to informer cache because informer is not started. This is to make
2946+
// sure the job controller informer cache has the latest pod status.
29442947
podList, err = clientSet.Tracker().List(
29452948
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
29462949
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
@@ -2956,8 +2959,9 @@ func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
29562959
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
29572960
}
29582961

2959-
// removing the just created pod from fake clientset memory inorder for the sync job to succeed if creating a new pod because of bug
2960-
// but the pod will remain inside informer cache
2962+
// removing the just created pod from fake clientset memory but the pod will remain inside informer cache
2963+
// of the job controller. We are removing from the client set because in case of a bug if the job controller
2964+
// is trying to create the pod again it can succeed because it creates using the same name again.
29612965
err = clientSet.Tracker().Delete(
29622966
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
29632967
"default", "")
@@ -2978,7 +2982,7 @@ func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
29782982
if err != nil {
29792983
t.Fatalf("Unexpected error when syncing jobs %v", err)
29802984
}
2981-
// no pod should be created
2985+
// no pod should be created. Here it is 0 because we had deleted the pod from the client set.
29822986
if len(podList.(*v1.PodList).Items) != 0 {
29832987
t.Errorf("expect no pods to be created but %v pods are created", len(podList.(*v1.PodList).Items))
29842988
}
@@ -6338,7 +6342,7 @@ func TestGetPodsForJob(t *testing.T) {
63386342
}
63396343

63406344
func TestAddPod(t *testing.T) {
6341-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6345+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
63426346
_, ctx := ktesting.NewTestContext(t)
63436347
logger := klog.FromContext(ctx)
63446348

@@ -6384,7 +6388,7 @@ func TestAddPod(t *testing.T) {
63846388
}
63856389

63866390
func TestAddPodOrphan(t *testing.T) {
6387-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6391+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
63886392
logger, ctx := ktesting.NewTestContext(t)
63896393
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
63906394
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6413,7 +6417,7 @@ func TestAddPodOrphan(t *testing.T) {
64136417
}
64146418

64156419
func TestUpdatePod(t *testing.T) {
6416-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6420+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64176421
_, ctx := ktesting.NewTestContext(t)
64186422
logger := klog.FromContext(ctx)
64196423
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6462,7 +6466,7 @@ func TestUpdatePod(t *testing.T) {
64626466
}
64636467

64646468
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
6465-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6469+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64666470
logger, ctx := ktesting.NewTestContext(t)
64676471
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
64686472
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6490,7 +6494,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
64906494
}
64916495

64926496
func TestUpdatePodChangeControllerRef(t *testing.T) {
6493-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6497+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64946498
_, ctx := ktesting.NewTestContext(t)
64956499
logger := klog.FromContext(ctx)
64966500
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6518,7 +6522,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
65186522
}
65196523

65206524
func TestUpdatePodRelease(t *testing.T) {
6521-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6525+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
65226526
_, ctx := ktesting.NewTestContext(t)
65236527
logger := klog.FromContext(ctx)
65246528
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6546,7 +6550,7 @@ func TestUpdatePodRelease(t *testing.T) {
65466550
}
65476551

65486552
func TestDeletePod(t *testing.T) {
6549-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6553+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
65506554
logger, ctx := ktesting.NewTestContext(t)
65516555
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
65526556
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6591,7 +6595,7 @@ func TestDeletePod(t *testing.T) {
65916595

65926596
func TestDeletePodOrphan(t *testing.T) {
65936597
// Disable batching of pod updates to show it does not get requeued at all
6594-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0))
6598+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, 0))
65956599
logger, ctx := ktesting.NewTestContext(t)
65966600
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
65976601
jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
@@ -7113,7 +7117,7 @@ func TestJobBackoff(t *testing.T) {
71137117
"failure with pod updates batching": {
71147118
requeues: 0,
71157119
phase: v1.PodFailed,
7116-
wantBackoff: syncJobBatchPeriod,
7120+
wantBackoff: SyncJobBatchPeriod,
71177121
},
71187122
}
71197123

test/integration/job/job_test.go

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
clientset "k8s.io/client-go/kubernetes"
4545
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
4646
restclient "k8s.io/client-go/rest"
47+
cache "k8s.io/client-go/tools/cache"
4748
"k8s.io/client-go/tools/record"
4849
"k8s.io/client-go/util/retry"
4950
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -64,6 +65,7 @@ import (
6465

6566
const waitInterval = time.Second
6667
const fastPodFailureBackoff = 100 * time.Millisecond
68+
const fastSyncJobBatchPeriod = 100 * time.Millisecond
6769

6870
// Time duration used to account for controller latency in tests in which it is
6971
// expected the Job controller does not make a change. In that cases we wait a
@@ -4048,6 +4050,69 @@ func TestNodeSelectorUpdate(t *testing.T) {
40484050

40494051
}
40504052

4053+
// TestDelayedJobUpdateEvent tests that a Job that only executes one Pod even when
4054+
// the job events are delayed. This test verfies the finishedJobStore is working
4055+
// correctly and preventing from job controller creating a new pod if the job complete
4056+
// even is delayed.
4057+
func TestDelayedJobUpdateEvent(t *testing.T) {
4058+
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
4059+
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
4060+
closeFn, restConfig, clientSet, ns := setup(t, "simple")
4061+
t.Cleanup(closeFn)
4062+
// the transform is used to introduce a delay for the job events. Since all the object have to go through
4063+
// transform func first before being added to the informer cache, this would serve as an indirect way to
4064+
// introduce watch event delay.
4065+
transformOpt := informers.WithTransform(cache.TransformFunc(func(obj interface{}) (interface{}, error) {
4066+
_, ok := obj.(*batchv1.Job)
4067+
if ok {
4068+
// This will make sure pod events are processed before the job events occur.
4069+
time.Sleep(2 * fastSyncJobBatchPeriod)
4070+
}
4071+
return obj, nil
4072+
}))
4073+
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4074+
t.Cleanup(func() {
4075+
cancel()
4076+
})
4077+
resetMetrics()
4078+
4079+
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
4080+
if err != nil {
4081+
t.Fatalf("Failed to create Job: %v", err)
4082+
}
4083+
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4084+
Active: 1,
4085+
Ready: ptr.To[int32](0),
4086+
Terminating: ptr.To[int32](0),
4087+
})
4088+
4089+
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
4090+
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
4091+
}
4092+
validateJobComplete(ctx, t, clientSet, jobObj)
4093+
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4094+
Failed: 0,
4095+
Succeeded: 1,
4096+
Ready: ptr.To[int32](0),
4097+
Terminating: ptr.To[int32](0),
4098+
})
4099+
validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
4100+
Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"},
4101+
Value: 1,
4102+
})
4103+
validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
4104+
Labels: []string{"NonIndexed", "succeeded"},
4105+
Value: 1,
4106+
})
4107+
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4108+
if err != nil {
4109+
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4110+
}
4111+
if len(jobPods) != 1 {
4112+
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4113+
}
4114+
}
4115+
40514116
type podsByStatus struct {
40524117
Active int
40534118
Ready *int32
@@ -4469,9 +4534,9 @@ func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient
44694534
return closeFn, config, clientSet, ns
44704535
}
44714536

4472-
func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) {
4537+
func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config, options ...informers.SharedInformerOption) (context.Context, context.CancelFunc) {
44734538
tb.Helper()
4474-
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
4539+
informerSet := informers.NewSharedInformerFactoryWithOptions(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0, options...)
44754540
jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet)
44764541
informerSet.Start(ctx.Done())
44774542
go jc.Run(ctx, 1)

0 commit comments

Comments
 (0)