Skip to content

Commit cc2946e

Browse files
authored
Merge pull request kubernetes#125515 from mimowo/refactor-terminating-counter
Refactor tracking of terminating pods in Job controller
2 parents 4cd4e35 + 8cec2c7 commit cc2946e

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

pkg/controller/job/job_controller.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -811,18 +811,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
811811
if err != nil {
812812
return err
813813
}
814-
var terminating *int32
815-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
816-
terminating = ptr.To(controller.CountTerminatingPods(pods))
817-
}
818814
jobCtx := &syncJobCtx{
819815
job: &job,
820816
pods: pods,
821817
activePods: controller.FilterActivePods(logger, pods),
822-
terminating: terminating,
823818
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
824819
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
825820
}
821+
if trackTerminatingPods(&job) {
822+
jobCtx.terminating = ptr.To(controller.CountTerminatingPods(pods))
823+
}
826824
active := int32(len(jobCtx.activePods))
827825
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
828826
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
@@ -896,7 +894,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
896894
jobCtx.finishedCondition = nil
897895
}
898896
active -= deleted
899-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
897+
if trackTerminatingPods(jobCtx.job) {
900898
*jobCtx.terminating += deleted
901899
}
902900
manageJobErr = err
@@ -956,11 +954,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
956954
}
957955
}
958956

957+
var terminating *int32
958+
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
959+
terminating = jobCtx.terminating
960+
}
959961
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
960-
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
962+
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
961963
job.Status.Active = active
962964
job.Status.Ready = ready
963-
job.Status.Terminating = jobCtx.terminating
965+
job.Status.Terminating = terminating
964966
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
965967
if err != nil {
966968
return fmt.Errorf("tracking status: %w", err)
@@ -1507,23 +1509,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15071509
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
15081510
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
15091511
active -= removed
1510-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1512+
if trackTerminatingPods(job) {
15111513
*jobCtx.terminating += removed
15121514
}
15131515
return active, metrics.JobSyncActionPodsDeleted, err
15141516
}
15151517

1516-
var terminating int32 = 0
1517-
if onlyReplaceFailedPods(jobCtx.job) {
1518-
// For PodFailurePolicy specified but PodReplacementPolicy disabled
1519-
// we still need to count terminating pods for replica counts
1520-
// But we will not allow updates to status.
1521-
if jobCtx.terminating == nil {
1522-
terminating = controller.CountTerminatingPods(jobCtx.pods)
1523-
} else {
1524-
terminating = *jobCtx.terminating
1525-
}
1526-
}
15271518
wantActive := int32(0)
15281519
if job.Spec.Completions == nil {
15291520
// Job does not specify a number of completions. Therefore, number active
@@ -1559,7 +1550,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15591550
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
15601551
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
15611552
active -= removed
1562-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1553+
if trackTerminatingPods(job) {
15631554
*jobCtx.terminating += removed
15641555
}
15651556
// While it is possible for a Job to require both pod creations and
@@ -1569,6 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15691560
return active, metrics.JobSyncActionPodsDeleted, err
15701561
}
15711562

1563+
var terminating int32 = 0
1564+
if onlyReplaceFailedPods(jobCtx.job) {
1565+
// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
1566+
// and so we can use the value.
1567+
terminating = *jobCtx.terminating
1568+
}
15721569
if diff := wantActive - terminating - active; diff > 0 {
15731570
var remainingTime time.Duration
15741571
if !hasBackoffLimitPerIndex(job) {
@@ -1954,6 +1951,17 @@ func countReadyPods(pods []*v1.Pod) int32 {
19541951
return cnt
19551952
}
19561953

1954+
// trackTerminatingPods checks if the count of terminating pods is tracked.
1955+
// They are tracked when any the following is true:
1956+
// - JobPodReplacementPolicy is enabled to be returned in the status field,
1957+
// - only failed pods are replaced, because pod failure policy is used
1958+
func trackTerminatingPods(job *batch.Job) bool {
1959+
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1960+
return true
1961+
}
1962+
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
1963+
}
1964+
19571965
// This checks if we should apply PodReplacementPolicy.
19581966
// PodReplacementPolicy controls when we recreate pods if they are marked as terminating
19591967
// Failed means that we recreate only once the pod has terminated.

0 commit comments

Comments
 (0)