Skip to content

Commit 515d1ce

Browse files
authored
Merge pull request kubernetes#121393 from mimowo/backoff-limit-per-index-load-test
Benchmark job with backoff limit per index
2 parents 227d1b2 + 168e016 commit 515d1ce

File tree

1 file changed

+162
-32
lines changed

1 file changed

+162
-32
lines changed

test/integration/job/job_test.go

Lines changed: 162 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2097,25 +2097,53 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
20972097
Steps: 30,
20982098
Cap: 5 * time.Minute,
20992099
}
2100+
cases := map[string]struct {
2101+
nPods int32
2102+
backoffLimitPerIndex *int32
2103+
}{
2104+
"regular indexed job without failures; size=10": {
2105+
nPods: 10,
2106+
},
2107+
"job with backoffLimitPerIndex without failures; size=10": {
2108+
nPods: 10,
2109+
backoffLimitPerIndex: ptr.To[int32](1),
2110+
},
2111+
"regular indexed job without failures; size=100": {
2112+
nPods: 100,
2113+
},
2114+
"job with backoffLimitPerIndex without failures; size=100": {
2115+
nPods: 100,
2116+
backoffLimitPerIndex: ptr.To[int32](1),
2117+
},
2118+
}
21002119
mode := batchv1.IndexedCompletion
2101-
for _, nPods := range []int32{1000, 10_000} {
2102-
b.Run(fmt.Sprintf("nPods=%d", nPods), func(b *testing.B) {
2120+
for name, tc := range cases {
2121+
b.Run(name, func(b *testing.B) {
2122+
enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
2123+
defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
21032124
b.ResetTimer()
21042125
for n := 0; n < b.N; n++ {
2126+
b.StartTimer()
21052127
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
21062128
ObjectMeta: metav1.ObjectMeta{
2107-
Name: fmt.Sprintf("npods-%d-%d", nPods, n),
2129+
Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
21082130
},
21092131
Spec: batchv1.JobSpec{
2110-
Parallelism: ptr.To(nPods),
2111-
Completions: ptr.To(nPods),
2112-
CompletionMode: &mode,
2132+
Parallelism: ptr.To(tc.nPods),
2133+
Completions: ptr.To(tc.nPods),
2134+
CompletionMode: &mode,
2135+
BackoffLimitPerIndex: tc.backoffLimitPerIndex,
21132136
},
21142137
})
21152138
if err != nil {
21162139
b.Fatalf("Failed to create Job: %v", err)
21172140
}
2118-
remaining := int(nPods)
2141+
b.Cleanup(func() {
2142+
if err := cleanUp(ctx, clientSet, jobObj); err != nil {
2143+
b.Fatalf("Failed cleanup: %v", err)
2144+
}
2145+
})
2146+
remaining := int(tc.nPods)
21192147
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
21202148
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
21212149
remaining -= succ
@@ -2127,38 +2155,134 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
21272155
b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
21282156
}
21292157
validateJobSucceeded(ctx, b, clientSet, jobObj)
2158+
b.StopTimer()
2159+
}
2160+
})
2161+
}
2162+
}
21302163

2131-
// Cleanup Pods and Job.
2164+
// BenchmarkLargeFailureHandling benchmarks the handling of numerous pod failures
2165+
// of an Indexed Job. We set minimal backoff delay to make the job controller
2166+
// performance comparable for indexed jobs with global backoffLimit, and those
2167+
// with backoffLimit per-index, despite different patterns of handling failures.
2168+
func BenchmarkLargeFailureHandling(b *testing.B) {
2169+
b.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2170+
b.Cleanup(setDurationDuringTest(&jobcontroller.MaxJobPodFailureBackOff, fastPodFailureBackoff))
2171+
closeFn, restConfig, clientSet, ns := setup(b, "indexed")
2172+
restConfig.QPS = 100
2173+
restConfig.Burst = 100
2174+
defer closeFn()
2175+
ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
2176+
defer cancel()
2177+
backoff := wait.Backoff{
2178+
Duration: time.Second,
2179+
Factor: 1.5,
2180+
Steps: 30,
2181+
Cap: 5 * time.Minute,
2182+
}
2183+
cases := map[string]struct {
2184+
nPods int32
2185+
backoffLimitPerIndex *int32
2186+
customTimeout *time.Duration
2187+
}{
2188+
"regular indexed job with failures; size=10": {
2189+
nPods: 10,
2190+
},
2191+
"job with backoffLimitPerIndex with failures; size=10": {
2192+
nPods: 10,
2193+
backoffLimitPerIndex: ptr.To[int32](1),
2194+
},
2195+
"regular indexed job with failures; size=100": {
2196+
nPods: 100,
2197+
},
2198+
"job with backoffLimitPerIndex with failures; size=100": {
2199+
nPods: 100,
2200+
backoffLimitPerIndex: ptr.To[int32](1),
2201+
},
2202+
}
2203+
mode := batchv1.IndexedCompletion
2204+
for name, tc := range cases {
2205+
b.Run(name, func(b *testing.B) {
2206+
enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
2207+
timeout := ptr.Deref(tc.customTimeout, wait.ForeverTestTimeout)
2208+
defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
2209+
b.ResetTimer()
2210+
for n := 0; n < b.N; n++ {
21322211
b.StopTimer()
2133-
// Clean up pods in pages, because DeleteCollection might timeout.
2134-
// #90743
2135-
for {
2136-
pods, err := clientSet.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{Limit: 1})
2137-
if err != nil {
2138-
b.Fatalf("Failed to list Pods for cleanup: %v", err)
2139-
}
2140-
if len(pods.Items) == 0 {
2141-
break
2142-
}
2143-
err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
2144-
metav1.DeleteOptions{},
2145-
metav1.ListOptions{
2146-
Limit: 1000,
2147-
})
2148-
if err != nil {
2149-
b.Fatalf("Failed to cleanup Pods: %v", err)
2150-
}
2151-
}
2152-
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
2212+
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2213+
ObjectMeta: metav1.ObjectMeta{
2214+
Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
2215+
},
2216+
Spec: batchv1.JobSpec{
2217+
Parallelism: ptr.To(tc.nPods),
2218+
Completions: ptr.To(tc.nPods),
2219+
CompletionMode: &mode,
2220+
BackoffLimitPerIndex: tc.backoffLimitPerIndex,
2221+
BackoffLimit: ptr.To(tc.nPods),
2222+
},
2223+
})
21532224
if err != nil {
2154-
b.Fatalf("Failed to cleanup Job: %v", err)
2225+
b.Fatalf("Failed to create Job: %v", err)
21552226
}
2227+
b.Cleanup(func() {
2228+
if err := cleanUp(ctx, clientSet, jobObj); err != nil {
2229+
b.Fatalf("Failed cleanup: %v", err)
2230+
}
2231+
})
2232+
validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
2233+
Active: int(tc.nPods),
2234+
Ready: ptr.To[int32](0),
2235+
Terminating: ptr.To[int32](0),
2236+
}, timeout)
2237+
21562238
b.StartTimer()
2239+
remaining := int(tc.nPods)
2240+
if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
2241+
if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
2242+
remaining -= fail
2243+
b.Logf("Transient failure failing pods: %v", err)
2244+
return false, nil
2245+
}
2246+
return true, nil
2247+
}); err != nil {
2248+
b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
2249+
}
2250+
validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
2251+
Active: int(tc.nPods),
2252+
Ready: ptr.To[int32](0),
2253+
Failed: int(tc.nPods),
2254+
Terminating: ptr.To[int32](0),
2255+
}, timeout)
2256+
b.StopTimer()
21572257
}
21582258
})
21592259
}
21602260
}
21612261

2262+
// cleanUp deletes all pods and the job
2263+
func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error {
2264+
// Clean up pods in pages, because DeleteCollection might timeout.
2265+
// #90743
2266+
for {
2267+
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{Limit: 1})
2268+
if err != nil {
2269+
return err
2270+
}
2271+
if len(pods.Items) == 0 {
2272+
break
2273+
}
2274+
err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx,
2275+
metav1.DeleteOptions{},
2276+
metav1.ListOptions{
2277+
Limit: 1000,
2278+
})
2279+
if err != nil {
2280+
return err
2281+
}
2282+
}
2283+
return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
2284+
}
2285+
21622286
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
21632287
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
21642288
t.Run(string(policy), func(t *testing.T) {
@@ -2617,10 +2741,15 @@ type podsByStatus struct {
26172741
Terminating *int32
26182742
}
26192743

2620-
func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
2744+
func validateJobsPodsStatusOnly(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
2745+
t.Helper()
2746+
validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, desired, wait.ForeverTestTimeout)
2747+
}
2748+
2749+
func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, timeout time.Duration) {
26212750
t.Helper()
26222751
var actualCounts podsByStatus
2623-
if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
2752+
if err := wait.PollUntilContextTimeout(ctx, waitInterval, timeout, true, func(ctx context.Context) (bool, error) {
26242753
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
26252754
if err != nil {
26262755
t.Fatalf("Failed to get updated Job: %v", err)
@@ -2638,7 +2767,8 @@ func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet cli
26382767
t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
26392768
}
26402769
}
2641-
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
2770+
2771+
func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
26422772
t.Helper()
26432773
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
26442774
var active []*v1.Pod

0 commit comments

Comments
 (0)