Skip to content

Commit 547c005

Browse files
committed
handle job complete update delayed event
1 parent 6be1530 commit 547c005

File tree

3 files changed

+142
-1
lines changed

3 files changed

+142
-1
lines changed

pkg/controller/job/job_controller.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ type Controller struct {
123123
// Store with information to compute the expotential backoff delay for pod
124124
// recreation in case of pod failures.
125125
podBackoffStore *backoffStore
126+
127+
// completedJobStore contains the job ids for which the job status is updated to completed
128+
// but the corresponding event is not yet received.
129+
completedJobStore *jobUIDCache
126130
}
127131

128132
type syncJobCtx struct {
@@ -184,6 +188,9 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
184188
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
185189
clock: clock,
186190
podBackoffStore: newBackoffStore(),
191+
completedJobStore: &jobUIDCache{
192+
set: sets.New[types.UID](),
193+
},
187194
}
188195

189196
if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -536,6 +543,7 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
536543
return
537544
}
538545
}
546+
jm.completedJobStore.remove(jobObj.UID)
539547
jm.enqueueLabelSelector(jobObj)
540548
}
541549

@@ -820,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
820828
}
821829
return err
822830
}
823-
824831
// Skip syncing of the job it is managed by another controller.
825832
// We cannot rely solely on skipping of queueing such jobs for synchronization,
826833
// because it is possible a synchronization task is queued for a job, without
@@ -841,6 +848,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
841848
// re-syncing here as the record has to be removed for finished/deleted jobs
842849
return fmt.Errorf("error removing backoff record %w", err)
843850
}
851+
jm.completedJobStore.remove(job.UID)
852+
return nil
853+
}
854+
if jm.completedJobStore.exists(job.UID) {
855+
logger.V(2).Info("Skip syncing the job as its marked completed but the completed update event is not yet received", "uid", job.UID, "key", key)
844856
return nil
845857
}
846858

@@ -1304,6 +1316,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
13041316
}
13051317
if jobFinished {
13061318
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
1319+
jm.completedJobStore.add(jobCtx.job.UID)
13071320
}
13081321
recordJobPodFinished(logger, jobCtx.job, oldCounters)
13091322
}

pkg/controller/job/job_controller_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import (
5656
_ "k8s.io/kubernetes/pkg/apis/core/install"
5757
"k8s.io/kubernetes/pkg/controller"
5858
"k8s.io/kubernetes/pkg/controller/job/metrics"
59+
"k8s.io/kubernetes/pkg/controller/job/util"
5960
"k8s.io/kubernetes/pkg/controller/testutil"
6061
"k8s.io/kubernetes/pkg/features"
6162
"k8s.io/utils/clock"
@@ -2879,6 +2880,110 @@ func TestSingleJobFailedCondition(t *testing.T) {
28792880

28802881
}
28812882

2883+
func TestJobControllerMissingJobSucceedEvent(t *testing.T) {
2884+
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
2885+
logger, ctx := ktesting.NewTestContext(t)
2886+
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
2887+
job1.Name = "job1"
2888+
clientSet := fake.NewSimpleClientset(job1)
2889+
fakeClock := clocktesting.NewFakeClock(time.Now())
2890+
jm, informer := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
2891+
jm.podControl = &controller.RealPodControl{
2892+
KubeClient: clientSet,
2893+
Recorder: testutil.NewFakeRecorder(),
2894+
}
2895+
jm.podStoreSynced = alwaysReady
2896+
jm.jobStoreSynced = alwaysReady
2897+
2898+
err := informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
2899+
if err != nil {
2900+
t.Fatalf("Unexpected error when adding job to indexer %v", err)
2901+
}
2902+
// 1st reconcile should create a new pod
2903+
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2904+
if err != nil {
2905+
t.Fatalf("Unexpected error when syncing jobs %v", err)
2906+
}
2907+
2908+
podIndexer := informer.Core().V1().Pods().Informer().GetIndexer()
2909+
podList, err := clientSet.Tracker().List(
2910+
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2911+
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2912+
"default")
2913+
if err != nil {
2914+
t.Fatalf("Unexpected error when fetching pods %v", err)
2915+
}
2916+
// manually adding the just-created pod from fake clientset memory to informer cache because informer is not started.
2917+
// we are updating the pod status to succeeded which should update the job status to succeeded and remove the finalizer of the pod.
2918+
justCreatedPod := podList.(*v1.PodList).Items[0]
2919+
fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0])
2920+
justCreatedPod.Status.Phase = v1.PodSucceeded
2921+
err = podIndexer.Add(&justCreatedPod)
2922+
if err != nil {
2923+
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
2924+
}
2925+
jm.addPod(logger, &justCreatedPod)
2926+
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2927+
if err != nil {
2928+
t.Fatalf("Unexpected error when syncing jobs %v", err)
2929+
}
2930+
2931+
jobList, err := clientSet.Tracker().List(
2932+
schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"},
2933+
schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"},
2934+
"default")
2935+
if err != nil {
2936+
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
2937+
}
2938+
updatedJob := jobList.(*batch.JobList).Items[0]
2939+
if !util.IsJobSucceeded(&updatedJob) {
2940+
t.Fatalf("job status is not succeeded: %v", updatedJob)
2941+
}
2942+
2943+
// add the updated pod from the fake clientset memory to informer cache because informer is not started.
2944+
podList, err = clientSet.Tracker().List(
2945+
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2946+
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2947+
"default")
2948+
if err != nil {
2949+
t.Fatalf("Unexpected error when fetching pods %v", err)
2950+
}
2951+
fmt.Printf("pod is %v\n", podList.(*v1.PodList).Items[0])
2952+
updatedPod := podList.(*v1.PodList).Items[0]
2953+
updatedPod.Status.Phase = v1.PodSucceeded
2954+
err = podIndexer.Add(&updatedPod)
2955+
if err != nil {
2956+
t.Fatalf("Unexpected error when adding pod to indexer %v", err)
2957+
}
2958+
2959+
// removing the just created pod from fake clientset memory inorder for the sync job to succeed if creating a new pod because of bug
2960+
// but the pod will remain inside informer cache
2961+
err = clientSet.Tracker().Delete(
2962+
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2963+
"default", "")
2964+
if err != nil {
2965+
t.Fatalf("Unexpected error when deleting pod to indexer %v", err)
2966+
}
2967+
2968+
err = jm.syncJob(ctx, testutil.GetKey(job1, t))
2969+
if err != nil {
2970+
t.Fatalf("Unexpected error when syncing jobs %v", err)
2971+
}
2972+
time.Sleep(time.Second)
2973+
2974+
podList, err = clientSet.Tracker().List(
2975+
schema.GroupVersionResource{Version: "v1", Resource: "pods"},
2976+
schema.GroupVersionKind{Version: "v1", Kind: "Pod"},
2977+
"default")
2978+
if err != nil {
2979+
t.Fatalf("Unexpected error when syncing jobs %v", err)
2980+
}
2981+
// no pod should be created
2982+
if len(podList.(*v1.PodList).Items) != 0 {
2983+
t.Errorf("expect no pods to be created but %v pods are created", len(podList.(*v1.PodList).Items))
2984+
}
2985+
}
2986+
28822987
func TestSyncJobComplete(t *testing.T) {
28832988
_, ctx := ktesting.NewTestContext(t)
28842989
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})

pkg/controller/job/tracking_utils.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,26 @@ func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
151151
}
152152
return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
153153
}
154+
155+
type jobUIDCache struct {
156+
sync.RWMutex
157+
set sets.Set[types.UID]
158+
}
159+
160+
func (j *jobUIDCache) add(uid types.UID) {
161+
j.Lock()
162+
defer j.Unlock()
163+
j.set.Insert(uid)
164+
}
165+
166+
func (j *jobUIDCache) remove(uid types.UID) {
167+
j.Lock()
168+
defer j.Unlock()
169+
j.set.Delete(uid)
170+
}
171+
172+
func (j *jobUIDCache) exists(uid types.UID) bool {
173+
j.RLock()
174+
defer j.RUnlock()
175+
return j.set.Has(uid)
176+
}

0 commit comments

Comments
 (0)