Skip to content

Commit 551f3c7

Browse files
committed
merge the integration tests into a single one
1 parent c7d0ed5 commit 551f3c7

File tree

2 files changed

+122
-134
lines changed

2 files changed

+122
-134
lines changed

pkg/controller/job/job_controller.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ type Controller struct {
124124
// recreation in case of pod failures.
125125
podBackoffStore *backoffStore
126126

127-
// finishedJobStore contains the job ids for which the job status is finished
127+
// finishedJobExpectations contains the job ids for which the job status is finished
128128
// but the corresponding event is not yet received.
129-
finishedJobStore sync.Map
129+
finishedJobExpectations sync.Map
130130
}
131131

132132
type syncJobCtx struct {
@@ -180,15 +180,15 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
180180
KubeClient: kubeClient,
181181
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
182182
},
183-
expectations: controller.NewControllerExpectations(),
184-
finalizerExpectations: newUIDTrackingExpectations(),
185-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}),
186-
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}),
187-
broadcaster: eventBroadcaster,
188-
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
189-
clock: clock,
190-
podBackoffStore: newBackoffStore(),
191-
finishedJobStore: sync.Map{},
183+
expectations: controller.NewControllerExpectations(),
184+
finalizerExpectations: newUIDTrackingExpectations(),
185+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}),
186+
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}),
187+
broadcaster: eventBroadcaster,
188+
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
189+
clock: clock,
190+
podBackoffStore: newBackoffStore(),
191+
finishedJobExpectations: sync.Map{},
192192
}
193193

194194
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -541,7 +541,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
541541
return
542542
}
543543
}
544-
jm.finishedJobStore.Delete(jobObj.UID)
544+
jm.finishedJobExpectations.Delete(jobObj.UID)
545545
jm.enqueueLabelSelector(jobObj)
546546
}
547547

@@ -847,10 +847,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
847847
// re-syncing here as the record has to be removed for finished/deleted jobs
848848
return fmt.Errorf("error removing backoff record %w", err)
849849
}
850-
jm.finishedJobStore.Delete(job.UID)
850+
jm.finishedJobExpectations.Delete(job.UID)
851851
return nil
852852
}
853-
if _, ok := jm.finishedJobStore.Load(job.UID); ok {
853+
if _, ok := jm.finishedJobExpectations.Load(job.UID); ok {
854854
logger.V(2).Info("Skip syncing the job as its marked completed but the completed update event is not yet received", "uid", job.UID, "key", key)
855855
return nil
856856
}
@@ -1315,7 +1315,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
13151315
}
13161316
if jobFinished {
13171317
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
1318-
jm.finishedJobStore.Store(jobCtx.job.UID, struct{}{})
1318+
jm.finishedJobExpectations.Store(jobCtx.job.UID, struct{}{})
13191319
}
13201320
recordJobPodFinished(logger, jobCtx.job, oldCounters)
13211321
}

test/integration/job/job_test.go

Lines changed: 107 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -4050,11 +4050,11 @@ func TestNodeSelectorUpdate(t *testing.T) {
40504050

40514051
}
40524052

4053-
// TestDelayedJobSucceededUpdateEvent tests that a Job only creates one Pod even when
4054-
// the job success events are delayed. This test verfies the finishedJobStore is working
4053+
// TestDelayedJobUpdateEvent tests that a Job only creates one Pod even when
4054+
// the job events are delayed. This test verfies the finishedJobStore is working
40554055
// correctly and preventing from job controller creating a new pod if the job success
4056-
// even is delayed.
4057-
func TestDelayedJobSucceededUpdateEvent(t *testing.T) {
4056+
// or fail event is delayed.
4057+
func TestDelayedJobUpdateEvent(t *testing.T) {
40584058
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
40594059
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
40604060
closeFn, restConfig, clientSet, ns := setup(t, "simple")
@@ -4070,135 +4070,123 @@ func TestDelayedJobSucceededUpdateEvent(t *testing.T) {
40704070
}
40714071
return obj, nil
40724072
}))
4073-
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4074-
t.Cleanup(func() {
4075-
cancel()
4076-
})
4077-
resetMetrics()
40784073

4079-
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
4080-
if err != nil {
4081-
t.Fatalf("Failed to create Job: %v", err)
4082-
}
4083-
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4084-
Active: 1,
4085-
Ready: ptr.To[int32](0),
4086-
Terminating: ptr.To[int32](0),
4087-
})
4088-
4089-
if _, err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
4090-
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
4091-
}
4092-
validateJobComplete(ctx, t, clientSet, jobObj)
4093-
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4094-
Failed: 0,
4095-
Succeeded: 1,
4096-
Ready: ptr.To[int32](0),
4097-
Terminating: ptr.To[int32](0),
4098-
})
4099-
validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
4100-
Labels: []string{"NonIndexed", "succeeded", "CompletionsReached"},
4101-
Value: 1,
4102-
})
4103-
validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
4104-
Labels: []string{"NonIndexed", "succeeded"},
4105-
Value: 1,
4106-
})
4107-
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4108-
if err != nil {
4109-
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4110-
}
4111-
if len(jobPods) != 1 {
4112-
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4074+
type jobStatus struct {
4075+
succeeded int
4076+
failed int
4077+
status batchv1.JobConditionType
41134078
}
4114-
}
4115-
4116-
// TestDelayedJobFailedUpdateEvent tests that a Job only creates one Pod even when
4117-
// the job failed events are delayed. This test verfies the finishedJobStore is working
4118-
// correctly and preventing from job controller creating a new pod if the job failed
4119-
// event is delayed.
4120-
func TestDelayedJobFailedUpdateEvent(t *testing.T) {
4121-
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
4122-
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
4123-
closeFn, restConfig, clientSet, ns := setup(t, "pod-failure-policy")
4124-
t.Cleanup(closeFn)
4125-
// the transform is used to introduce a delay for the job events. Since all the object have to go through
4126-
// transform func first before being added to the informer cache, this would serve as an indirect way to
4127-
// introduce watch event delay.
4128-
transformOpt := informers.WithTransform(cache.TransformFunc(func(obj interface{}) (interface{}, error) {
4129-
_, ok := obj.(*batchv1.Job)
4130-
if ok {
4131-
// This will make sure pod events are processed before the job events occur.
4132-
time.Sleep(2 * fastSyncJobBatchPeriod)
4133-
}
4134-
return obj, nil
4135-
}))
4136-
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4137-
t.Cleanup(func() {
4138-
cancel()
4139-
})
4140-
resetMetrics()
41414079

4142-
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
4143-
Spec: batchv1.JobSpec{
4144-
Template: v1.PodTemplateSpec{
4145-
Spec: v1.PodSpec{
4146-
Containers: []v1.Container{
4147-
{
4148-
Name: "main-container",
4149-
Image: "foo",
4150-
ImagePullPolicy: v1.PullIfNotPresent,
4151-
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
4080+
cases := map[string]struct {
4081+
podReplacementPolicyEnabled bool
4082+
job *batchv1.Job
4083+
podUpdate func(*v1.Pod) bool
4084+
wantStatus jobStatus
4085+
}{
4086+
"job succeeded event delayed": {
4087+
job: &batchv1.Job{},
4088+
podUpdate: func(p *v1.Pod) bool {
4089+
p.Status.Phase = v1.PodSucceeded
4090+
p.Status.ContainerStatuses = []v1.ContainerStatus{
4091+
{
4092+
State: v1.ContainerState{
4093+
Terminated: &v1.ContainerStateTerminated{
4094+
FinishedAt: metav1.Now(),
4095+
},
41524096
},
41534097
},
4154-
},
4098+
}
4099+
return true
4100+
},
4101+
wantStatus: jobStatus{
4102+
succeeded: 1,
4103+
failed: 0,
4104+
status: batchv1.JobComplete,
41554105
},
4156-
BackoffLimit: ptr.To[int32](0),
41574106
},
4158-
})
4159-
if err != nil {
4160-
t.Fatalf("Failed to create Job: %v", err)
4161-
}
4162-
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4163-
Active: 1,
4164-
Ready: ptr.To[int32](0),
4165-
Terminating: ptr.To[int32](0),
4166-
})
4167-
4168-
op := func(p *v1.Pod) bool {
4169-
p.Status = v1.PodStatus{
4170-
Phase: v1.PodFailed,
4171-
ContainerStatuses: []v1.ContainerStatus{
4172-
{
4173-
Name: "main-container",
4174-
State: v1.ContainerState{
4175-
Terminated: &v1.ContainerStateTerminated{
4176-
ExitCode: 5,
4107+
"job failed event delayed": {
4108+
job: &batchv1.Job{
4109+
Spec: batchv1.JobSpec{
4110+
Template: v1.PodTemplateSpec{
4111+
Spec: v1.PodSpec{
4112+
Containers: []v1.Container{
4113+
{
4114+
Name: "main-container",
4115+
Image: "foo",
4116+
ImagePullPolicy: v1.PullIfNotPresent,
4117+
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
4118+
},
4119+
},
41774120
},
41784121
},
4122+
BackoffLimit: ptr.To[int32](0),
41794123
},
41804124
},
4181-
}
4182-
return true
4183-
}
4184-
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
4185-
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
4125+
podUpdate: func(p *v1.Pod) bool {
4126+
p.Status = v1.PodStatus{
4127+
Phase: v1.PodFailed,
4128+
ContainerStatuses: []v1.ContainerStatus{
4129+
{
4130+
Name: "main-container",
4131+
State: v1.ContainerState{
4132+
Terminated: &v1.ContainerStateTerminated{
4133+
ExitCode: 5,
4134+
},
4135+
},
4136+
},
4137+
},
4138+
}
4139+
return true
4140+
},
4141+
wantStatus: jobStatus{
4142+
succeeded: 0,
4143+
failed: 1,
4144+
status: batchv1.JobFailed,
4145+
},
4146+
},
41864147
}
4187-
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4188-
Active: 0,
4189-
Failed: 1,
4190-
Ready: ptr.To[int32](0),
4191-
Terminating: ptr.To[int32](0),
4192-
})
41934148

4194-
validateJobFailed(ctx, t, clientSet, jobObj)
4195-
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4196-
if err != nil {
4197-
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4198-
}
4199-
if len(jobPods) != 1 {
4200-
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4149+
for name, tc := range cases {
4150+
tc := tc
4151+
t.Run(name, func(t *testing.T) {
4152+
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4153+
t.Cleanup(cancel)
4154+
resetMetrics()
4155+
4156+
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, tc.job)
4157+
if err != nil {
4158+
t.Fatalf("Failed to create Job: %v", err)
4159+
}
4160+
4161+
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4162+
Active: 1,
4163+
Ready: ptr.To[int32](0),
4164+
Terminating: ptr.To[int32](0),
4165+
})
4166+
4167+
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, tc.podUpdate, 1); err != nil {
4168+
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
4169+
}
4170+
4171+
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4172+
Failed: tc.wantStatus.failed,
4173+
Succeeded: tc.wantStatus.succeeded,
4174+
Ready: ptr.To[int32](0),
4175+
Terminating: ptr.To[int32](0),
4176+
})
4177+
4178+
validateJobCondition(ctx, t, clientSet, jobObj, tc.wantStatus.status)
4179+
4180+
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4181+
if err != nil {
4182+
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4183+
}
4184+
if len(jobPods) != 1 {
4185+
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4186+
}
4187+
})
42014188
}
4189+
42024190
}
42034191

42044192
type podsByStatus struct {

0 commit comments

Comments
 (0)