Skip to content

Commit f8a4e34

Browse files
authored
Fix tracking of terminating Pods when nothing else changes (kubernetes#121342)
* cleanup: refactor pod replacement policy integration test into staged assertion * cleanup: remove typo in job_test.go * refactor PodReplacementPolicy test and remove test for defaulting the policy * fix issue with missing update in job controller for terminating status and refactor pod replacement policy integration test * use t.Cleanup instead of defer in PodReplacementPolicy integration tests * revert t.Cleanup to defer for reseting feature flag in PodReplacementPolicy integration tests
1 parent 9aa0475 commit f8a4e34

File tree

2 files changed

+207
-99
lines changed

2 files changed

+207
-99
lines changed

pkg/controller/job/job_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,10 +914,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
914914
}
915915

916916
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
917+
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
917918
job.Status.Active = active
918919
job.Status.Ready = ready
919920
job.Status.Terminating = jobCtx.terminating
920-
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
921921
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
922922
if err != nil {
923923
return fmt.Errorf("tracking status: %w", err)

test/integration/job/job_test.go

Lines changed: 206 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1690,81 +1690,102 @@ func TestIndexedJob(t *testing.T) {
16901690
}
16911691

16921692
func TestJobPodReplacementPolicy(t *testing.T) {
1693-
const podCount int32 = 2
16941693
indexedCompletion := batchv1.IndexedCompletion
16951694
nonIndexedCompletion := batchv1.NonIndexedCompletion
16961695
var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
16971696
return &obj
16981697
}
1699-
jobSpecIndexedDefault := &batchv1.JobSpec{
1700-
Parallelism: ptr.To(podCount),
1701-
Completions: ptr.To(podCount),
1702-
CompletionMode: &indexedCompletion,
1698+
type jobStatus struct {
1699+
active int
1700+
failed int
1701+
terminating *int32
17031702
}
17041703
cases := map[string]struct {
17051704
podReplacementPolicyEnabled bool
1706-
deletePods bool
1707-
failPods bool
1708-
wantTerminating *int32
1709-
wantFailed int
1710-
wantActive int
17111705
jobSpec *batchv1.JobSpec
1706+
wantStatusAfterDeletion jobStatus
1707+
wantStatusAfterFailure jobStatus
17121708
}{
1713-
"feature flag off, delete pods and verify no terminating status": {
1714-
deletePods: true,
1715-
jobSpec: jobSpecIndexedDefault,
1716-
wantActive: int(podCount),
1717-
wantFailed: int(podCount),
1718-
},
1719-
"feature flag true, delete pods and verify terminating status": {
1720-
podReplacementPolicyEnabled: true,
1721-
deletePods: true,
1722-
jobSpec: jobSpecIndexedDefault,
1723-
wantTerminating: ptr.To(podCount),
1724-
wantFailed: int(podCount),
1709+
"feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
1710+
jobSpec: &batchv1.JobSpec{
1711+
Parallelism: ptr.To[int32](2),
1712+
Completions: ptr.To[int32](2),
1713+
CompletionMode: &indexedCompletion,
1714+
Template: v1.PodTemplateSpec{
1715+
ObjectMeta: metav1.ObjectMeta{
1716+
Finalizers: []string{"fake.example.com/blockDeletion"},
1717+
},
1718+
},
1719+
},
1720+
wantStatusAfterDeletion: jobStatus{
1721+
active: 2,
1722+
failed: 2,
1723+
},
1724+
wantStatusAfterFailure: jobStatus{
1725+
active: 2,
1726+
failed: 2,
1727+
},
17251728
},
1726-
"feature flag true, delete pods, verify terminating status and recreate upon terminating": {
1729+
"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
17271730
podReplacementPolicyEnabled: true,
1728-
deletePods: true,
17291731
jobSpec: &batchv1.JobSpec{
1730-
Parallelism: ptr.To(podCount),
1731-
Completions: ptr.To(podCount),
1732+
Parallelism: ptr.To[int32](2),
1733+
Completions: ptr.To[int32](2),
17321734
CompletionMode: &indexedCompletion,
17331735
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
1736+
Template: v1.PodTemplateSpec{
1737+
ObjectMeta: metav1.ObjectMeta{
1738+
Finalizers: []string{"fake.example.com/blockDeletion"},
1739+
},
1740+
},
17341741
},
1735-
wantTerminating: ptr.To(podCount),
1736-
wantFailed: int(podCount),
1737-
},
1738-
"feature flag true, delete pods, verify terminating status and recreate once failed": {
1739-
podReplacementPolicyEnabled: true,
1740-
deletePods: true,
1741-
jobSpec: &batchv1.JobSpec{
1742-
Parallelism: ptr.To(podCount),
1743-
Completions: ptr.To(podCount),
1744-
CompletionMode: &nonIndexedCompletion,
1745-
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
1742+
wantStatusAfterDeletion: jobStatus{
1743+
active: 2,
1744+
failed: 2,
1745+
terminating: ptr.To[int32](2),
1746+
},
1747+
wantStatusAfterFailure: jobStatus{
1748+
active: 2,
1749+
failed: 2,
1750+
terminating: ptr.To[int32](0),
17461751
},
1747-
wantTerminating: ptr.To(podCount),
17481752
},
1749-
"feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": {
1753+
"feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
17501754
podReplacementPolicyEnabled: true,
1751-
deletePods: true,
17521755
jobSpec: &batchv1.JobSpec{
1753-
Parallelism: ptr.To(podCount),
1754-
Completions: ptr.To(podCount),
1755-
CompletionMode: &nonIndexedCompletion,
1756-
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
1756+
Parallelism: ptr.To[int32](2),
1757+
Completions: ptr.To[int32](2),
1758+
CompletionMode: &indexedCompletion,
1759+
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
1760+
Template: v1.PodTemplateSpec{
1761+
ObjectMeta: metav1.ObjectMeta{
1762+
Finalizers: []string{"fake.example.com/blockDeletion"},
1763+
},
1764+
},
1765+
},
1766+
wantStatusAfterDeletion: jobStatus{
1767+
active: 2,
1768+
failed: 2,
1769+
terminating: ptr.To[int32](2),
1770+
},
1771+
wantStatusAfterFailure: jobStatus{
1772+
active: 2,
1773+
failed: 2,
1774+
terminating: ptr.To[int32](0),
17571775
},
1758-
wantTerminating: ptr.To(podCount),
17591776
},
1760-
"feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": {
1777+
"feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
17611778
podReplacementPolicyEnabled: false,
1762-
deletePods: true,
17631779
jobSpec: &batchv1.JobSpec{
1764-
Parallelism: ptr.To(podCount),
1765-
Completions: ptr.To(podCount),
1780+
Parallelism: ptr.To[int32](2),
1781+
Completions: ptr.To[int32](2),
17661782
CompletionMode: &nonIndexedCompletion,
17671783
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
1784+
Template: v1.PodTemplateSpec{
1785+
ObjectMeta: metav1.ObjectMeta{
1786+
Finalizers: []string{"fake.example.com/blockDeletion"},
1787+
},
1788+
},
17681789
PodFailurePolicy: &batchv1.PodFailurePolicy{
17691790
Rules: []batchv1.PodFailurePolicyRule{
17701791
{
@@ -1777,33 +1798,60 @@ func TestJobPodReplacementPolicy(t *testing.T) {
17771798
},
17781799
},
17791800
},
1780-
wantActive: int(podCount),
1801+
wantStatusAfterDeletion: jobStatus{
1802+
active: 2,
1803+
},
1804+
wantStatusAfterFailure: jobStatus{
1805+
active: 2,
1806+
},
17811807
},
1782-
"feature flag true, recreate failed pods, and verify active and failed counters": {
1808+
"feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
17831809
podReplacementPolicyEnabled: true,
1784-
failPods: true,
17851810
jobSpec: &batchv1.JobSpec{
1786-
Parallelism: ptr.To(podCount),
1787-
Completions: ptr.To(podCount),
1811+
Parallelism: ptr.To[int32](2),
1812+
Completions: ptr.To[int32](2),
17881813
CompletionMode: &indexedCompletion,
17891814
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
1815+
Template: v1.PodTemplateSpec{
1816+
ObjectMeta: metav1.ObjectMeta{
1817+
Finalizers: []string{"fake.example.com/blockDeletion"},
1818+
},
1819+
},
1820+
},
1821+
wantStatusAfterDeletion: jobStatus{
1822+
active: 0,
1823+
failed: 0,
1824+
terminating: ptr.To[int32](2),
1825+
},
1826+
wantStatusAfterFailure: jobStatus{
1827+
active: 2,
1828+
failed: 2,
1829+
terminating: ptr.To[int32](0),
17901830
},
1791-
wantActive: int(podCount),
1792-
wantFailed: int(podCount),
1793-
wantTerminating: ptr.To[int32](0),
17941831
},
1795-
"feature flag true with NonIndexedJob, recreate failed pods, and verify active and failed counters": {
1832+
"feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
17961833
podReplacementPolicyEnabled: true,
1797-
failPods: true,
17981834
jobSpec: &batchv1.JobSpec{
1799-
Parallelism: ptr.To(podCount),
1800-
Completions: ptr.To(podCount),
1835+
Parallelism: ptr.To[int32](2),
1836+
Completions: ptr.To[int32](2),
18011837
CompletionMode: &nonIndexedCompletion,
18021838
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
1839+
Template: v1.PodTemplateSpec{
1840+
ObjectMeta: metav1.ObjectMeta{
1841+
Finalizers: []string{"fake.example.com/blockDeletion"},
1842+
},
1843+
},
1844+
},
1845+
wantStatusAfterDeletion: jobStatus{
1846+
active: 0,
1847+
failed: 0,
1848+
terminating: ptr.To[int32](2),
1849+
},
1850+
wantStatusAfterFailure: jobStatus{
1851+
active: 2,
1852+
failed: 2,
1853+
terminating: ptr.To[int32](0),
18031854
},
1804-
wantActive: int(podCount),
1805-
wantFailed: int(podCount),
1806-
wantTerminating: ptr.To[int32](0),
18071855
},
18081856
}
18091857
for name, tc := range cases {
@@ -1813,9 +1861,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
18131861
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)()
18141862

18151863
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
1816-
defer closeFn()
1864+
t.Cleanup(closeFn)
18171865
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1818-
defer cancel()
1866+
t.Cleanup(cancel)
18191867
resetMetrics()
18201868

18211869
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@@ -1826,41 +1874,24 @@ func TestJobPodReplacementPolicy(t *testing.T) {
18261874
}
18271875
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
18281876

1829-
// Wait for pods to start up.
1830-
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
1831-
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
1832-
if err != nil {
1833-
return false, err
1834-
}
1835-
if job.Status.Active == podCount {
1836-
return true, nil
1837-
}
1838-
return false, nil
1877+
waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
1878+
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
1879+
1880+
deletePods(ctx, t, clientSet, ns.Name)
1881+
1882+
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
1883+
Terminating: tc.wantStatusAfterDeletion.terminating,
1884+
Failed: tc.wantStatusAfterDeletion.failed,
1885+
Active: tc.wantStatusAfterDeletion.active,
1886+
Ready: ptr.To[int32](0),
18391887
})
1840-
if err != nil {
1841-
t.Fatalf("Error waiting for Job pods to become active: %v", err)
1842-
}
1843-
if tc.deletePods {
1844-
err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
1845-
metav1.DeleteOptions{},
1846-
metav1.ListOptions{
1847-
Limit: 1000,
1848-
})
1849-
if err != nil {
1850-
t.Fatalf("Failed to delete Pods: %v", err)
1851-
}
1852-
}
1853-
if tc.failPods {
1854-
err, _ = setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, int(podCount))
1855-
if err != nil {
1856-
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
1857-
}
1858-
}
1888+
1889+
failTerminatingPods(ctx, t, clientSet, ns.Name)
18591890

18601891
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
1861-
Terminating: tc.wantTerminating,
1862-
Failed: tc.wantFailed,
1863-
Active: tc.wantActive,
1892+
Terminating: tc.wantStatusAfterFailure.terminating,
1893+
Failed: tc.wantStatusAfterFailure.failed,
1894+
Active: tc.wantStatusAfterFailure.active,
18641895
Ready: ptr.To[int32](0),
18651896
})
18661897
})
@@ -3022,3 +3053,80 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri
30223053
})
30233054
return job, err
30243055
}
3056+
3057+
func waitForPodsToBeActive(ctx context.Context, t *testing.T, jobClient typedv1.JobInterface, podCount int32, jobObj *batchv1.Job) {
3058+
t.Helper()
3059+
err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(context.Context) (done bool, err error) {
3060+
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
3061+
if err != nil {
3062+
return false, err
3063+
}
3064+
return job.Status.Active == podCount, nil
3065+
})
3066+
if err != nil {
3067+
t.Fatalf("Error waiting for Job pods to become active: %v", err)
3068+
}
3069+
}
3070+
3071+
func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
3072+
t.Helper()
3073+
err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx,
3074+
metav1.DeleteOptions{},
3075+
metav1.ListOptions{
3076+
Limit: 1000,
3077+
})
3078+
if err != nil {
3079+
t.Fatalf("Failed to cleanup Pods: %v", err)
3080+
}
3081+
}
3082+
3083+
func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
3084+
t.Helper()
3085+
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
3086+
if err != nil {
3087+
t.Fatalf("Failed to list pods: %v", err)
3088+
}
3089+
updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) {
3090+
for i, finalizer := range pod.Finalizers {
3091+
if finalizer == "fake.example.com/blockDeletion" {
3092+
pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...)
3093+
}
3094+
}
3095+
})
3096+
}
3097+
3098+
func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
3099+
t.Helper()
3100+
for _, val := range pods {
3101+
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
3102+
newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{})
3103+
if err != nil {
3104+
return err
3105+
}
3106+
updateFunc(newPod)
3107+
_, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{})
3108+
return err
3109+
}); err != nil {
3110+
t.Fatalf("Failed to update pod %s: %v", val.Name, err)
3111+
}
3112+
}
3113+
}
3114+
3115+
func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
3116+
t.Helper()
3117+
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
3118+
if err != nil {
3119+
t.Fatalf("Failed to list pods: %v", err)
3120+
}
3121+
var terminatingPods []v1.Pod
3122+
for _, pod := range pods.Items {
3123+
if pod.DeletionTimestamp != nil {
3124+
pod.Status.Phase = v1.PodFailed
3125+
terminatingPods = append(terminatingPods, pod)
3126+
}
3127+
}
3128+
_, err = updatePodStatuses(ctx, clientSet, terminatingPods)
3129+
if err != nil {
3130+
t.Fatalf("Failed to update pod statuses: %v", err)
3131+
}
3132+
}

0 commit comments

Comments
 (0)