@@ -2097,25 +2097,53 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
2097
2097
Steps : 30 ,
2098
2098
Cap : 5 * time .Minute ,
2099
2099
}
2100
+ cases := map [string ]struct {
2101
+ nPods int32
2102
+ backoffLimitPerIndex * int32
2103
+ }{
2104
+ "regular indexed job without failures; size=10" : {
2105
+ nPods : 10 ,
2106
+ },
2107
+ "job with backoffLimitPerIndex without failures; size=10" : {
2108
+ nPods : 10 ,
2109
+ backoffLimitPerIndex : ptr.To [int32 ](1 ),
2110
+ },
2111
+ "regular indexed job without failures; size=100" : {
2112
+ nPods : 100 ,
2113
+ },
2114
+ "job with backoffLimitPerIndex without failures; size=100" : {
2115
+ nPods : 100 ,
2116
+ backoffLimitPerIndex : ptr.To [int32 ](1 ),
2117
+ },
2118
+ }
2100
2119
mode := batchv1 .IndexedCompletion
2101
- for _ , nPods := range []int32 {1000 , 10_000 } {
2102
- b .Run (fmt .Sprintf ("nPods=%d" , nPods ), func (b * testing.B ) {
2120
+ for name , tc := range cases {
2121
+ b .Run (name , func (b * testing.B ) {
2122
+ enableJobBackoffLimitPerIndex := tc .backoffLimitPerIndex != nil
2123
+ defer featuregatetesting .SetFeatureGateDuringTest (b , feature .DefaultFeatureGate , features .JobBackoffLimitPerIndex , enableJobBackoffLimitPerIndex )()
2103
2124
b .ResetTimer ()
2104
2125
for n := 0 ; n < b .N ; n ++ {
2126
+ b .StartTimer ()
2105
2127
jobObj , err := createJobWithDefaults (ctx , clientSet , ns .Name , & batchv1.Job {
2106
2128
ObjectMeta : metav1.ObjectMeta {
2107
- Name : fmt .Sprintf ("npods-%d-%d" , nPods , n ),
2129
+ Name : fmt .Sprintf ("npods-%d-%d-%v " , tc . nPods , n , enableJobBackoffLimitPerIndex ),
2108
2130
},
2109
2131
Spec : batchv1.JobSpec {
2110
- Parallelism : ptr .To (nPods ),
2111
- Completions : ptr .To (nPods ),
2112
- CompletionMode : & mode ,
2132
+ Parallelism : ptr .To (tc .nPods ),
2133
+ Completions : ptr .To (tc .nPods ),
2134
+ CompletionMode : & mode ,
2135
+ BackoffLimitPerIndex : tc .backoffLimitPerIndex ,
2113
2136
},
2114
2137
})
2115
2138
if err != nil {
2116
2139
b .Fatalf ("Failed to create Job: %v" , err )
2117
2140
}
2118
- remaining := int (nPods )
2141
+ b .Cleanup (func () {
2142
+ if err := cleanUp (ctx , clientSet , jobObj ); err != nil {
2143
+ b .Fatalf ("Failed cleanup: %v" , err )
2144
+ }
2145
+ })
2146
+ remaining := int (tc .nPods )
2119
2147
if err := wait .ExponentialBackoff (backoff , func () (done bool , err error ) {
2120
2148
if err , succ := setJobPodsPhase (ctx , clientSet , jobObj , v1 .PodSucceeded , remaining ); err != nil {
2121
2149
remaining -= succ
@@ -2127,38 +2155,134 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
2127
2155
b .Fatalf ("Could not succeed the remaining %d pods: %v" , remaining , err )
2128
2156
}
2129
2157
validateJobSucceeded (ctx , b , clientSet , jobObj )
2158
+ b .StopTimer ()
2159
+ }
2160
+ })
2161
+ }
2162
+ }
2130
2163
2131
- // Cleanup Pods and Job.
2164
+ // BenchmarkLargeFailureHandling benchmarks the handling of numerous pod failures
2165
+ // of an Indexed Job. We set minimal backoff delay to make the job controller
2166
+ // performance comparable for indexed jobs with global backoffLimit, and those
2167
+ // with backoffLimit per-index, despite different patterns of handling failures.
2168
+ func BenchmarkLargeFailureHandling (b * testing.B ) {
2169
+ b .Cleanup (setDurationDuringTest (& jobcontroller .DefaultJobPodFailureBackOff , fastPodFailureBackoff ))
2170
+ b .Cleanup (setDurationDuringTest (& jobcontroller .MaxJobPodFailureBackOff , fastPodFailureBackoff ))
2171
+ closeFn , restConfig , clientSet , ns := setup (b , "indexed" )
2172
+ restConfig .QPS = 100
2173
+ restConfig .Burst = 100
2174
+ defer closeFn ()
2175
+ ctx , cancel := startJobControllerAndWaitForCaches (b , restConfig )
2176
+ defer cancel ()
2177
+ backoff := wait.Backoff {
2178
+ Duration : time .Second ,
2179
+ Factor : 1.5 ,
2180
+ Steps : 30 ,
2181
+ Cap : 5 * time .Minute ,
2182
+ }
2183
+ cases := map [string ]struct {
2184
+ nPods int32
2185
+ backoffLimitPerIndex * int32
2186
+ customTimeout * time.Duration
2187
+ }{
2188
+ "regular indexed job with failures; size=10" : {
2189
+ nPods : 10 ,
2190
+ },
2191
+ "job with backoffLimitPerIndex with failures; size=10" : {
2192
+ nPods : 10 ,
2193
+ backoffLimitPerIndex : ptr.To [int32 ](1 ),
2194
+ },
2195
+ "regular indexed job with failures; size=100" : {
2196
+ nPods : 100 ,
2197
+ },
2198
+ "job with backoffLimitPerIndex with failures; size=100" : {
2199
+ nPods : 100 ,
2200
+ backoffLimitPerIndex : ptr.To [int32 ](1 ),
2201
+ },
2202
+ }
2203
+ mode := batchv1 .IndexedCompletion
2204
+ for name , tc := range cases {
2205
+ b .Run (name , func (b * testing.B ) {
2206
+ enableJobBackoffLimitPerIndex := tc .backoffLimitPerIndex != nil
2207
+ timeout := ptr .Deref (tc .customTimeout , wait .ForeverTestTimeout )
2208
+ defer featuregatetesting .SetFeatureGateDuringTest (b , feature .DefaultFeatureGate , features .JobBackoffLimitPerIndex , enableJobBackoffLimitPerIndex )()
2209
+ b .ResetTimer ()
2210
+ for n := 0 ; n < b .N ; n ++ {
2132
2211
b .StopTimer ()
2133
- // Clean up pods in pages, because DeleteCollection might timeout.
2134
- // #90743
2135
- for {
2136
- pods , err := clientSet .CoreV1 ().Pods (ns .Name ).List (ctx , metav1.ListOptions {Limit : 1 })
2137
- if err != nil {
2138
- b .Fatalf ("Failed to list Pods for cleanup: %v" , err )
2139
- }
2140
- if len (pods .Items ) == 0 {
2141
- break
2142
- }
2143
- err = clientSet .CoreV1 ().Pods (ns .Name ).DeleteCollection (ctx ,
2144
- metav1.DeleteOptions {},
2145
- metav1.ListOptions {
2146
- Limit : 1000 ,
2147
- })
2148
- if err != nil {
2149
- b .Fatalf ("Failed to cleanup Pods: %v" , err )
2150
- }
2151
- }
2152
- err = clientSet .BatchV1 ().Jobs (jobObj .Namespace ).Delete (ctx , jobObj .Name , metav1.DeleteOptions {})
2212
+ jobObj , err := createJobWithDefaults (ctx , clientSet , ns .Name , & batchv1.Job {
2213
+ ObjectMeta : metav1.ObjectMeta {
2214
+ Name : fmt .Sprintf ("npods-%d-%d-%v" , tc .nPods , n , enableJobBackoffLimitPerIndex ),
2215
+ },
2216
+ Spec : batchv1.JobSpec {
2217
+ Parallelism : ptr .To (tc .nPods ),
2218
+ Completions : ptr .To (tc .nPods ),
2219
+ CompletionMode : & mode ,
2220
+ BackoffLimitPerIndex : tc .backoffLimitPerIndex ,
2221
+ BackoffLimit : ptr .To (tc .nPods ),
2222
+ },
2223
+ })
2153
2224
if err != nil {
2154
- b .Fatalf ("Failed to cleanup Job: %v" , err )
2225
+ b .Fatalf ("Failed to create Job: %v" , err )
2155
2226
}
2227
+ b .Cleanup (func () {
2228
+ if err := cleanUp (ctx , clientSet , jobObj ); err != nil {
2229
+ b .Fatalf ("Failed cleanup: %v" , err )
2230
+ }
2231
+ })
2232
+ validateJobsPodsStatusOnlyWithTimeout (ctx , b , clientSet , jobObj , podsByStatus {
2233
+ Active : int (tc .nPods ),
2234
+ Ready : ptr.To [int32 ](0 ),
2235
+ Terminating : ptr.To [int32 ](0 ),
2236
+ }, timeout )
2237
+
2156
2238
b .StartTimer ()
2239
+ remaining := int (tc .nPods )
2240
+ if err := wait .ExponentialBackoff (backoff , func () (done bool , err error ) {
2241
+ if err , fail := setJobPodsPhase (ctx , clientSet , jobObj , v1 .PodFailed , remaining ); err != nil {
2242
+ remaining -= fail
2243
+ b .Logf ("Transient failure failing pods: %v" , err )
2244
+ return false , nil
2245
+ }
2246
+ return true , nil
2247
+ }); err != nil {
2248
+ b .Fatalf ("Could not succeed the remaining %d pods: %v" , remaining , err )
2249
+ }
2250
+ validateJobsPodsStatusOnlyWithTimeout (ctx , b , clientSet , jobObj , podsByStatus {
2251
+ Active : int (tc .nPods ),
2252
+ Ready : ptr.To [int32 ](0 ),
2253
+ Failed : int (tc .nPods ),
2254
+ Terminating : ptr.To [int32 ](0 ),
2255
+ }, timeout )
2256
+ b .StopTimer ()
2157
2257
}
2158
2258
})
2159
2259
}
2160
2260
}
2161
2261
2262
+ // cleanUp deletes all pods and the job
2263
+ func cleanUp (ctx context.Context , clientSet clientset.Interface , jobObj * batchv1.Job ) error {
2264
+ // Clean up pods in pages, because DeleteCollection might timeout.
2265
+ // #90743
2266
+ for {
2267
+ pods , err := clientSet .CoreV1 ().Pods (jobObj .Namespace ).List (ctx , metav1.ListOptions {Limit : 1 })
2268
+ if err != nil {
2269
+ return err
2270
+ }
2271
+ if len (pods .Items ) == 0 {
2272
+ break
2273
+ }
2274
+ err = clientSet .CoreV1 ().Pods (jobObj .Namespace ).DeleteCollection (ctx ,
2275
+ metav1.DeleteOptions {},
2276
+ metav1.ListOptions {
2277
+ Limit : 1000 ,
2278
+ })
2279
+ if err != nil {
2280
+ return err
2281
+ }
2282
+ }
2283
+ return clientSet .BatchV1 ().Jobs (jobObj .Namespace ).Delete (ctx , jobObj .Name , metav1.DeleteOptions {})
2284
+ }
2285
+
2162
2286
func TestOrphanPodsFinalizersClearedWithGC (t * testing.T ) {
2163
2287
for _ , policy := range []metav1.DeletionPropagation {metav1 .DeletePropagationOrphan , metav1 .DeletePropagationBackground , metav1 .DeletePropagationForeground } {
2164
2288
t .Run (string (policy ), func (t * testing.T ) {
@@ -2617,10 +2741,15 @@ type podsByStatus struct {
2617
2741
Terminating * int32
2618
2742
}
2619
2743
2620
- func validateJobsPodsStatusOnly (ctx context.Context , t * testing.T , clientSet clientset.Interface , jobObj * batchv1.Job , desired podsByStatus ) {
2744
+ func validateJobsPodsStatusOnly (ctx context.Context , t testing.TB , clientSet clientset.Interface , jobObj * batchv1.Job , desired podsByStatus ) {
2745
+ t .Helper ()
2746
+ validateJobsPodsStatusOnlyWithTimeout (ctx , t , clientSet , jobObj , desired , wait .ForeverTestTimeout )
2747
+ }
2748
+
2749
+ func validateJobsPodsStatusOnlyWithTimeout (ctx context.Context , t testing.TB , clientSet clientset.Interface , jobObj * batchv1.Job , desired podsByStatus , timeout time.Duration ) {
2621
2750
t .Helper ()
2622
2751
var actualCounts podsByStatus
2623
- if err := wait .PollUntilContextTimeout (ctx , waitInterval , wait . ForeverTestTimeout , true , func (ctx context.Context ) (bool , error ) {
2752
+ if err := wait .PollUntilContextTimeout (ctx , waitInterval , timeout , true , func (ctx context.Context ) (bool , error ) {
2624
2753
updatedJob , err := clientSet .BatchV1 ().Jobs (jobObj .Namespace ).Get (ctx , jobObj .Name , metav1.GetOptions {})
2625
2754
if err != nil {
2626
2755
t .Fatalf ("Failed to get updated Job: %v" , err )
@@ -2638,7 +2767,8 @@ func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet cli
2638
2767
t .Errorf ("Waiting for Job Status: %v\n Pods (-want,+got):\n %s" , err , diff )
2639
2768
}
2640
2769
}
2641
- func validateJobPodsStatus (ctx context.Context , t * testing.T , clientSet clientset.Interface , jobObj * batchv1.Job , desired podsByStatus ) {
2770
+
2771
+ func validateJobPodsStatus (ctx context.Context , t testing.TB , clientSet clientset.Interface , jobObj * batchv1.Job , desired podsByStatus ) {
2642
2772
t .Helper ()
2643
2773
validateJobsPodsStatusOnly (ctx , t , clientSet , jobObj , desired )
2644
2774
var active []* v1.Pod
0 commit comments