Skip to content

Commit 990339d

Browse files
committed
Introduce syncJobContext to limit the number of function parameters
1 parent ad72319 commit 990339d

File tree

2 files changed

+80
-28
lines changed

2 files changed

+80
-28
lines changed

pkg/controller/job/job_controller.go

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,20 @@ type Controller struct {
131131
podBackoffStore *backoffStore
132132
}
133133

134+
type syncJobContext struct {
135+
job *batch.Job
136+
pods []*v1.Pod
137+
finishedCondition *batch.JobCondition
138+
activePods []*v1.Pod
139+
succeeded int32
140+
prevSucceededIndexes orderedIntervals
141+
succeededIndexes orderedIntervals
142+
newBackoffRecord backoffRecord
143+
expectedRmFinalizers sets.Set[string]
144+
uncounted *uncountedTerminatedPods
145+
podFailureCountByPolicyAction map[string]int
146+
}
147+
134148
// NewController creates a new Job controller that keeps the relevant pods
135149
// in sync with their corresponding Job objects.
136150
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
@@ -742,6 +756,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
742756
return nil
743757
}
744758

759+
syncJobContext := &syncJobContext{}
760+
syncJobContext.job = &job
761+
745762
completionMode := getCompletionMode(&job)
746763
action := metrics.JobSyncActionReconciling
747764

@@ -759,7 +776,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
759776
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
760777
}
761778
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
762-
expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)
779+
syncJobContext.uncounted = uncounted
780+
syncJobContext.expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
763781

764782
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
765783
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
@@ -771,10 +789,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
771789
return err
772790
}
773791

792+
syncJobContext.pods = pods
774793
activePods := controller.FilterActivePods(pods)
794+
syncJobContext.activePods = activePods
775795
active := int32(len(activePods))
776-
newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers)
777-
succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
796+
newSucceededPods, newFailedPods := getNewFinishedPods(syncJobContext)
797+
syncJobContext.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
778798
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
779799
var ready *int32
780800
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
@@ -787,28 +807,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
787807
job.Status.StartTime = &now
788808
}
789809

790-
newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
810+
syncJobContext.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
791811

792812
var manageJobErr error
793-
var finishedCondition *batch.JobCondition
794813

795814
exceedsBackoffLimit := failed > *job.Spec.BackoffLimit
796815

797816
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
798817
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
799-
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
818+
syncJobContext.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
800819
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
801820
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
802-
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
821+
syncJobContext.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
803822
}
804823
}
805-
if finishedCondition == nil {
824+
if syncJobContext.finishedCondition == nil {
806825
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
807826
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
808827
// OR if the number of failed jobs increased since the last syncJob
809-
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
828+
syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
810829
} else if jm.pastActiveDeadline(&job) {
811-
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
830+
syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
812831
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
813832
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
814833
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
@@ -819,23 +838,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
819838
var prevSucceededIndexes, succeededIndexes orderedIntervals
820839
if isIndexedJob(&job) {
821840
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
822-
succeeded = int32(succeededIndexes.total())
841+
syncJobContext.prevSucceededIndexes = prevSucceededIndexes
842+
syncJobContext.succeededIndexes = succeededIndexes
843+
syncJobContext.succeeded = int32(succeededIndexes.total())
823844
}
824845
suspendCondChanged := false
825846
// Remove active pods if Job failed.
826-
if finishedCondition != nil {
847+
if syncJobContext.finishedCondition != nil {
827848
deleted, err := jm.deleteActivePods(ctx, &job, activePods)
828849
if deleted != active || !satisfiedExpectations {
829850
// Can't declare the Job as finished yet, as there might be remaining
830851
// pod finalizers or pods that are not in the informer's cache yet.
831-
finishedCondition = nil
852+
syncJobContext.finishedCondition = nil
832853
}
833854
active -= deleted
834855
manageJobErr = err
835856
} else {
836857
manageJobCalled := false
837858
if satisfiedExpectations && job.DeletionTimestamp == nil {
838-
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord)
859+
active, action, manageJobErr = jm.manageJob(ctx, syncJobContext)
839860
manageJobCalled = true
840861
}
841862
complete := false
@@ -846,16 +867,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
846867
// not expected to fail, but if they do, the failure is ignored. Once any
847868
// pod succeeds, the controller waits for remaining pods to finish, and
848869
// then the job is complete.
849-
complete = succeeded > 0 && active == 0
870+
complete = syncJobContext.succeeded > 0 && active == 0
850871
} else {
851872
// Job specifies a number of completions. This type of job signals
852873
// success by having that number of successes. Since we do not
853874
// start more pods than there are remaining completions, there should
854875
// not be any remaining active pods once this count is reached.
855-
complete = succeeded >= *job.Spec.Completions && active == 0
876+
complete = syncJobContext.succeeded >= *job.Spec.Completions && active == 0
856877
}
857878
if complete {
858-
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
879+
syncJobContext.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
859880
} else if manageJobCalled {
860881
// Update the conditions / emit events only if manageJob was called in
861882
// this syncJob. Otherwise wait for the right syncJob call to make
@@ -891,7 +912,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
891912
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
892913
job.Status.Active = active
893914
job.Status.Ready = ready
894-
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord)
915+
err = jm.trackJobStatusAndRemoveFinalizers(ctx, syncJobContext, needsStatusUpdate)
895916
if err != nil {
896917
return fmt.Errorf("tracking status: %w", err)
897918
}
@@ -970,8 +991,15 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
970991
//
971992
// It does this up to a limited number of Pods so that the size of .status
972993
// doesn't grow too much and this sync doesn't starve other Jobs.
973-
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
994+
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, needsFlush bool) error {
974995
logger := klog.FromContext(ctx)
996+
job := syncJobContext.job
997+
pods := syncJobContext.pods
998+
finishedCond := syncJobContext.finishedCondition
999+
expectedRmFinalizers := syncJobContext.expectedRmFinalizers
1000+
succeededIndexes := syncJobContext.succeededIndexes
1001+
uncounted := syncJobContext.uncounted
1002+
9751003
isIndexed := isIndexedJob(job)
9761004
var podsToRemoveFinalizer []*v1.Pod
9771005
uncountedStatus := job.Status.UncountedTerminatedPods
@@ -1070,8 +1098,9 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
10701098
finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now())
10711099
}
10721100
}
1101+
syncJobContext.podFailureCountByPolicyAction = podFailureCountByPolicyAction
10731102
var err error
1074-
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil {
1103+
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, syncJobContext, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
10751104
return err
10761105
}
10771106
jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond)
@@ -1101,8 +1130,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
11011130
//
11021131
// Returns whether there are pending changes in the Job status that need to be
11031132
// flushed in subsequent calls.
1104-
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
1133+
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
11051134
logger := klog.FromContext(ctx)
1135+
job := syncJobContext.job
1136+
newBackoffRecord := syncJobContext.newBackoffRecord
1137+
podFailureCountByPolicyAction := syncJobContext.podFailureCountByPolicyAction
11061138
var err error
11071139
if needsFlush {
11081140
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
@@ -1337,11 +1369,12 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
13371369

13381370
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
13391371
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
1340-
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) {
1341-
succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
1372+
func getNewFinishedPods(syncJobContext *syncJobContext) (succeededPods, failedPods []*v1.Pod) {
1373+
job := syncJobContext.job
1374+
succeededPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Succeeded(), func(p *v1.Pod) bool {
13421375
return p.Status.Phase == v1.PodSucceeded
13431376
})
1344-
failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
1377+
failedPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Failed(), func(p *v1.Pod) bool {
13451378
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
13461379
if !isPodFailed(p, job) {
13471380
return false
@@ -1365,8 +1398,14 @@ func jobSuspended(job *batch.Job) bool {
13651398
// pods according to what is specified in the job.Spec.
13661399
// Respects back-off; does not create new pods if the back-off time has not passed
13671400
// Does NOT modify <activePods>.
1368-
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) {
1401+
func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobContext) (int32, string, error) {
13691402
logger := klog.FromContext(ctx)
1403+
job := syncJobContext.job
1404+
activePods := syncJobContext.activePods
1405+
succeeded := syncJobContext.succeeded
1406+
succeededIndexes := syncJobContext.succeededIndexes
1407+
newBackoffRecord := syncJobContext.newBackoffRecord
1408+
13701409
active := int32(len(activePods))
13711410
parallelism := *job.Spec.Parallelism
13721411
jobKey, err := controller.KeyFunc(job)
@@ -1561,7 +1600,10 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
15611600
// getValidPodsWithFilter returns the valid pods that pass the filter.
15621601
// Pods are valid if they have a finalizer or in uncounted set
15631602
// and, for Indexed Jobs, a valid completion index.
1564-
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
1603+
func getValidPodsWithFilter(synJobContext *syncJobContext, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
1604+
job := synJobContext.job
1605+
pods := synJobContext.pods
1606+
expectedRmFinalizers := synJobContext.expectedRmFinalizers
15651607
var result []*v1.Pod
15661608
for _, p := range pods {
15671609
uid := string(p.UID)

pkg/controller/job/job_controller_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,7 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) {
10791079
for name, tc := range cases {
10801080
t.Run(name, func(t *testing.T) {
10811081
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
1082-
succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
1082+
syncJobContext := &syncJobContext{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
1083+
succeededPods, failedPods := getNewFinishedPods(syncJobContext)
10831084
succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
10841085
failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
10851086
if succeeded != tc.wantSucceeded {
@@ -1654,7 +1655,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
16541655
if isIndexedJob(job) {
16551656
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
16561657
}
1657-
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{})
1658+
syncJobContext := &syncJobContext{
1659+
job: job,
1660+
pods: tc.pods,
1661+
succeededIndexes: succeededIndexes,
1662+
uncounted: uncounted,
1663+
expectedRmFinalizers: tc.expectedRmFinalizers,
1664+
finishedCondition: tc.finishedCond,
1665+
newBackoffRecord: backoffRecord{},
1666+
}
1667+
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), syncJobContext, tc.needsFlush)
16581668
if !errors.Is(err, tc.wantErr) {
16591669
t.Errorf("Got error %v, want %v", err, tc.wantErr)
16601670
}

0 commit comments

Comments
 (0)