Skip to content

Commit 8cec2c7

Browse files
committed
Refactor tracking of terminating pods in Job controller
1 parent 78377c4 commit 8cec2c7

File tree

1 file changed

+29
-21
lines changed

1 file changed

+29
-21
lines changed

pkg/controller/job/job_controller.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -811,18 +811,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
811811
if err != nil {
812812
return err
813813
}
814-
var terminating *int32
815-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
816-
terminating = ptr.To(controller.CountTerminatingPods(pods))
817-
}
818814
jobCtx := &syncJobCtx{
819815
job: &job,
820816
pods: pods,
821817
activePods: controller.FilterActivePods(logger, pods),
822-
terminating: terminating,
823818
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
824819
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
825820
}
821+
if trackTerminatingPods(&job) {
822+
jobCtx.terminating = ptr.To(controller.CountTerminatingPods(pods))
823+
}
826824
active := int32(len(jobCtx.activePods))
827825
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
828826
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
@@ -896,7 +894,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
896894
jobCtx.finishedCondition = nil
897895
}
898896
active -= deleted
899-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
897+
if trackTerminatingPods(jobCtx.job) {
900898
*jobCtx.terminating += deleted
901899
}
902900
manageJobErr = err
@@ -956,11 +954,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
956954
}
957955
}
958956

957+
var terminating *int32
958+
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
959+
terminating = jobCtx.terminating
960+
}
959961
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
960-
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
962+
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
961963
job.Status.Active = active
962964
job.Status.Ready = ready
963-
job.Status.Terminating = jobCtx.terminating
965+
job.Status.Terminating = terminating
964966
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
965967
if err != nil {
966968
return fmt.Errorf("tracking status: %w", err)
@@ -1504,23 +1506,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15041506
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
15051507
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
15061508
active -= removed
1507-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1509+
if trackTerminatingPods(job) {
15081510
*jobCtx.terminating += removed
15091511
}
15101512
return active, metrics.JobSyncActionPodsDeleted, err
15111513
}
15121514

1513-
var terminating int32 = 0
1514-
if onlyReplaceFailedPods(jobCtx.job) {
1515-
// For PodFailurePolicy specified but PodReplacementPolicy disabled
1516-
// we still need to count terminating pods for replica counts
1517-
// But we will not allow updates to status.
1518-
if jobCtx.terminating == nil {
1519-
terminating = controller.CountTerminatingPods(jobCtx.pods)
1520-
} else {
1521-
terminating = *jobCtx.terminating
1522-
}
1523-
}
15241515
wantActive := int32(0)
15251516
if job.Spec.Completions == nil {
15261517
// Job does not specify a number of completions. Therefore, number active
@@ -1556,7 +1547,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15561547
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
15571548
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
15581549
active -= removed
1559-
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1550+
if trackTerminatingPods(job) {
15601551
*jobCtx.terminating += removed
15611552
}
15621553
// While it is possible for a Job to require both pod creations and
@@ -1566,6 +1557,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
15661557
return active, metrics.JobSyncActionPodsDeleted, err
15671558
}
15681559

1560+
var terminating int32 = 0
1561+
if onlyReplaceFailedPods(jobCtx.job) {
1562+
// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
1563+
// and so we can use the value.
1564+
terminating = *jobCtx.terminating
1565+
}
15691566
if diff := wantActive - terminating - active; diff > 0 {
15701567
var remainingTime time.Duration
15711568
if !hasBackoffLimitPerIndex(job) {
@@ -1951,6 +1948,17 @@ func countReadyPods(pods []*v1.Pod) int32 {
19511948
return cnt
19521949
}
19531950

1951+
// trackTerminatingPods checks if the count of terminating pods is tracked.
1952+
// They are tracked when any the following is true:
1953+
// - JobPodReplacementPolicy is enabled to be returned in the status field,
1954+
// - only failed pods are replaced, because pod failure policy is used
1955+
func trackTerminatingPods(job *batch.Job) bool {
1956+
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
1957+
return true
1958+
}
1959+
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
1960+
}
1961+
19541962
// This checks if we should apply PodReplacementPolicy.
19551963
// PodReplacementPolicy controls when we recreate pods if they are marked as terminating
19561964
// Failed means that we recreate only once the pod has terminated.

0 commit comments

Comments
 (0)