Skip to content

Commit fe5afa9

Browse files
authored
Merge pull request kubernetes#130333 from kmala/job
handle job complete update delayed event
2 parents 8b460b4 + d4fd412 commit fe5afa9

File tree

3 files changed

+177
-24
lines changed

3 files changed

+177
-24
lines changed

pkg/controller/job/job_controller.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ import (
6060
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
6161

6262
var (
63-
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
64-
syncJobBatchPeriod = time.Second
63+
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job. Exported for tests.
64+
SyncJobBatchPeriod = time.Second
6565
// DefaultJobApiBackOff is the default API backoff period. Exported for tests.
6666
DefaultJobApiBackOff = time.Second
6767
// MaxJobApiBackOff is the max API backoff period. Exported for tests.
@@ -123,6 +123,10 @@ type Controller struct {
123123
// Store with information to compute the expotential backoff delay for pod
124124
// recreation in case of pod failures.
125125
podBackoffStore *backoffStore
126+
127+
// finishedJobExpectations contains the job ids for which the job status is finished
128+
// but the corresponding event is not yet received.
129+
finishedJobExpectations sync.Map
126130
}
127131

128132
type syncJobCtx struct {
@@ -176,14 +180,15 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
176180
KubeClient: kubeClient,
177181
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
178182
},
179-
expectations: controller.NewControllerExpectations(),
180-
finalizerExpectations: newUIDTrackingExpectations(),
181-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}),
182-
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}),
183-
broadcaster: eventBroadcaster,
184-
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
185-
clock: clock,
186-
podBackoffStore: newBackoffStore(),
183+
expectations: controller.NewControllerExpectations(),
184+
finalizerExpectations: newUIDTrackingExpectations(),
185+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}),
186+
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}),
187+
broadcaster: eventBroadcaster,
188+
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
189+
clock: clock,
190+
podBackoffStore: newBackoffStore(),
191+
finishedJobExpectations: sync.Map{},
187192
}
188193

189194
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -536,6 +541,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
536541
return
537542
}
538543
}
544+
jm.finishedJobExpectations.Delete(jobObj.UID)
539545
jm.enqueueLabelSelector(jobObj)
540546
}
541547

@@ -568,16 +574,16 @@ func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interfac
568574
// - Job status update
569575
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
570576
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
571-
jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod)
577+
jm.enqueueSyncJobInternal(logger, obj, SyncJobBatchPeriod)
572578
}
573579

574580
// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
575581
// custom delay, but not smaller than the batching delay.
576582
// It is used when pod recreations are delayed due to pod failures.
577583
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
578584
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
579-
if delay < syncJobBatchPeriod {
580-
delay = syncJobBatchPeriod
585+
if delay < SyncJobBatchPeriod {
586+
delay = SyncJobBatchPeriod
581587
}
582588
jm.enqueueSyncJobInternal(logger, obj, delay)
583589
}
@@ -841,6 +847,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
841847
// re-syncing here as the record has to be removed for finished/deleted jobs
842848
return fmt.Errorf("error removing backoff record %w", err)
843849
}
850+
jm.finishedJobExpectations.Delete(job.UID)
851+
return nil
852+
}
853+
if _, ok := jm.finishedJobExpectations.Load(job.UID); ok {
854+
logger.V(2).Info("Skip syncing the job as its marked finished but the corresponding update event is not yet received", "uid", job.UID, "key", key)
844855
return nil
845856
}
846857

@@ -1304,6 +1315,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
13041315
}
13051316
if jobFinished {
13061317
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
1318+
jm.finishedJobExpectations.Store(jobCtx.job.UID, struct{}{})
13071319
}
13081320
recordJobPodFinished(logger, jobCtx.job, oldCounters)
13091321
}

pkg/controller/job/job_controller_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6248,7 +6248,7 @@ func TestGetPodsForJob(t *testing.T) {
62486248
}
62496249

62506250
func TestAddPod(t *testing.T) {
6251-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6251+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
62526252
_, ctx := ktesting.NewTestContext(t)
62536253
logger := klog.FromContext(ctx)
62546254

@@ -6294,7 +6294,7 @@ func TestAddPod(t *testing.T) {
62946294
}
62956295

62966296
func TestAddPodOrphan(t *testing.T) {
6297-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6297+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
62986298
logger, ctx := ktesting.NewTestContext(t)
62996299
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
63006300
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6323,7 +6323,7 @@ func TestAddPodOrphan(t *testing.T) {
63236323
}
63246324

63256325
func TestUpdatePod(t *testing.T) {
6326-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6326+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
63276327
_, ctx := ktesting.NewTestContext(t)
63286328
logger := klog.FromContext(ctx)
63296329
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6372,7 +6372,7 @@ func TestUpdatePod(t *testing.T) {
63726372
}
63736373

63746374
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
6375-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6375+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
63766376
logger, ctx := ktesting.NewTestContext(t)
63776377
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
63786378
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6400,7 +6400,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
64006400
}
64016401

64026402
func TestUpdatePodChangeControllerRef(t *testing.T) {
6403-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6403+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64046404
_, ctx := ktesting.NewTestContext(t)
64056405
logger := klog.FromContext(ctx)
64066406
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6428,7 +6428,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
64286428
}
64296429

64306430
func TestUpdatePodRelease(t *testing.T) {
6431-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6431+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64326432
_, ctx := ktesting.NewTestContext(t)
64336433
logger := klog.FromContext(ctx)
64346434
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@@ -6456,7 +6456,7 @@ func TestUpdatePodRelease(t *testing.T) {
64566456
}
64576457

64586458
func TestDeletePod(t *testing.T) {
6459-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
6459+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, fastSyncJobBatchPeriod))
64606460
logger, ctx := ktesting.NewTestContext(t)
64616461
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
64626462
fakeClock := clocktesting.NewFakeClock(time.Now())
@@ -6501,7 +6501,7 @@ func TestDeletePod(t *testing.T) {
65016501

65026502
func TestDeletePodOrphan(t *testing.T) {
65036503
// Disable batching of pod updates to show it does not get requeued at all
6504-
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0))
6504+
t.Cleanup(setDurationDuringTest(&SyncJobBatchPeriod, 0))
65056505
logger, ctx := ktesting.NewTestContext(t)
65066506
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
65076507
jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
@@ -7023,7 +7023,7 @@ func TestJobBackoff(t *testing.T) {
70237023
"failure with pod updates batching": {
70247024
requeues: 0,
70257025
phase: v1.PodFailed,
7026-
wantBackoff: syncJobBatchPeriod,
7026+
wantBackoff: SyncJobBatchPeriod,
70277027
},
70287028
}
70297029

test/integration/job/job_test.go

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
clientset "k8s.io/client-go/kubernetes"
4646
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
4747
restclient "k8s.io/client-go/rest"
48+
cache "k8s.io/client-go/tools/cache"
4849
"k8s.io/client-go/tools/record"
4950
"k8s.io/client-go/util/retry"
5051
featuregatetesting "k8s.io/component-base/featuregate/testing"
@@ -65,6 +66,7 @@ import (
6566

6667
const waitInterval = time.Second
6768
const fastPodFailureBackoff = 100 * time.Millisecond
69+
const fastSyncJobBatchPeriod = 100 * time.Millisecond
6870

6971
// Time duration used to account for controller latency in tests in which it is
7072
// expected the Job controller does not make a change. In that cases we wait a
@@ -4067,6 +4069,145 @@ func TestNodeSelectorUpdate(t *testing.T) {
40674069

40684070
}
40694071

4072+
// TestDelayedJobUpdateEvent tests that a Job only creates one Pod even when
4073+
// the job events are delayed. This test verfies the finishedJobStore is working
4074+
// correctly and preventing from job controller creating a new pod if the job success
4075+
// or fail event is delayed.
4076+
func TestDelayedJobUpdateEvent(t *testing.T) {
4077+
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
4078+
t.Cleanup(setDurationDuringTest(&jobcontroller.SyncJobBatchPeriod, fastSyncJobBatchPeriod))
4079+
closeFn, restConfig, clientSet, ns := setup(t, "simple")
4080+
t.Cleanup(closeFn)
4081+
// the transform is used to introduce a delay for the job events. Since all the object have to go through
4082+
// transform func first before being added to the informer cache, this would serve as an indirect way to
4083+
// introduce watch event delay.
4084+
transformOpt := informers.WithTransform(cache.TransformFunc(func(obj interface{}) (interface{}, error) {
4085+
_, ok := obj.(*batchv1.Job)
4086+
if ok {
4087+
// This will make sure pod events are processed before the job events occur.
4088+
time.Sleep(2 * fastSyncJobBatchPeriod)
4089+
}
4090+
return obj, nil
4091+
}))
4092+
4093+
type jobStatus struct {
4094+
succeeded int
4095+
failed int
4096+
status batchv1.JobConditionType
4097+
}
4098+
4099+
cases := map[string]struct {
4100+
podReplacementPolicyEnabled bool
4101+
job *batchv1.Job
4102+
podUpdate func(*v1.Pod) bool
4103+
wantStatus jobStatus
4104+
}{
4105+
"job succeeded event delayed": {
4106+
job: &batchv1.Job{},
4107+
podUpdate: func(p *v1.Pod) bool {
4108+
p.Status.Phase = v1.PodSucceeded
4109+
p.Status.ContainerStatuses = []v1.ContainerStatus{
4110+
{
4111+
State: v1.ContainerState{
4112+
Terminated: &v1.ContainerStateTerminated{
4113+
FinishedAt: metav1.Now(),
4114+
},
4115+
},
4116+
},
4117+
}
4118+
return true
4119+
},
4120+
wantStatus: jobStatus{
4121+
succeeded: 1,
4122+
failed: 0,
4123+
status: batchv1.JobComplete,
4124+
},
4125+
},
4126+
"job failed event delayed": {
4127+
job: &batchv1.Job{
4128+
Spec: batchv1.JobSpec{
4129+
Template: v1.PodTemplateSpec{
4130+
Spec: v1.PodSpec{
4131+
Containers: []v1.Container{
4132+
{
4133+
Name: "main-container",
4134+
Image: "foo",
4135+
ImagePullPolicy: v1.PullIfNotPresent,
4136+
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
4137+
},
4138+
},
4139+
},
4140+
},
4141+
BackoffLimit: ptr.To[int32](0),
4142+
},
4143+
},
4144+
podUpdate: func(p *v1.Pod) bool {
4145+
p.Status = v1.PodStatus{
4146+
Phase: v1.PodFailed,
4147+
ContainerStatuses: []v1.ContainerStatus{
4148+
{
4149+
Name: "main-container",
4150+
State: v1.ContainerState{
4151+
Terminated: &v1.ContainerStateTerminated{
4152+
ExitCode: 5,
4153+
},
4154+
},
4155+
},
4156+
},
4157+
}
4158+
return true
4159+
},
4160+
wantStatus: jobStatus{
4161+
succeeded: 0,
4162+
failed: 1,
4163+
status: batchv1.JobFailed,
4164+
},
4165+
},
4166+
}
4167+
4168+
for name, tc := range cases {
4169+
tc := tc
4170+
t.Run(name, func(t *testing.T) {
4171+
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig, transformOpt)
4172+
t.Cleanup(cancel)
4173+
resetMetrics()
4174+
4175+
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, tc.job)
4176+
if err != nil {
4177+
t.Fatalf("Failed to create Job: %v", err)
4178+
}
4179+
4180+
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
4181+
Active: 1,
4182+
Ready: ptr.To[int32](0),
4183+
Terminating: ptr.To[int32](0),
4184+
})
4185+
4186+
if _, err := updateJobPodsStatus(ctx, clientSet, jobObj, tc.podUpdate, 1); err != nil {
4187+
t.Fatalf("Error %q while updating pod status for Job: %v", err, jobObj.Name)
4188+
}
4189+
4190+
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
4191+
Failed: tc.wantStatus.failed,
4192+
Succeeded: tc.wantStatus.succeeded,
4193+
Ready: ptr.To[int32](0),
4194+
Terminating: ptr.To[int32](0),
4195+
})
4196+
4197+
validateJobCondition(ctx, t, clientSet, jobObj, tc.wantStatus.status)
4198+
4199+
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
4200+
if err != nil {
4201+
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
4202+
}
4203+
if len(jobPods) != 1 {
4204+
t.Errorf("Found %d Pods for the job %q, want 1", len(jobPods), klog.KObj(jobObj))
4205+
}
4206+
})
4207+
}
4208+
4209+
}
4210+
40704211
type podsByStatus struct {
40714212
Active int
40724213
Ready *int32
@@ -4488,9 +4629,9 @@ func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient
44884629
return closeFn, config, clientSet, ns
44894630
}
44904631

4491-
func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) {
4632+
func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config, options ...informers.SharedInformerOption) (context.Context, context.CancelFunc) {
44924633
tb.Helper()
4493-
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
4634+
informerSet := informers.NewSharedInformerFactoryWithOptions(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0, options...)
44944635
jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet)
44954636
informerSet.Start(ctx.Done())
44964637
go jc.Run(ctx, 1)

0 commit comments

Comments
 (0)