@@ -38,8 +38,9 @@ import org.apache.spark.rpc._
38
38
import org .apache .spark .scheduler .{ExecutorExited , LiveListenerBus , SlaveLost , TaskSchedulerImpl }
39
39
import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RegisterExecutor , RemoveExecutor }
40
40
import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
41
+ import org .apache .spark .util .ThreadUtils
41
42
42
- private [spark] class KubernetesClusterSchedulerBackendSuite
43
+ class KubernetesClusterSchedulerBackendSuite
43
44
extends SparkFunSuite with BeforeAndAfter {
44
45
45
46
private val APP_ID = " test-spark-app"
@@ -121,6 +122,9 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
121
122
@ Mock
122
123
private var executorPodsWatch : Watch = _
123
124
125
+ @ Mock
126
+ private var successFuture : Future [Boolean ] = _
127
+
124
128
private var sparkConf : SparkConf = _
125
129
private var executorPodsWatcherArgument : ArgumentCaptor [Watcher [Pod ]] = _
126
130
private var allocatorRunnable : ArgumentCaptor [Runnable ] = _
@@ -169,9 +173,15 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
169
173
when(rpcEnv.setupEndpoint(
170
174
mockitoEq(CoarseGrainedSchedulerBackend .ENDPOINT_NAME ), driverEndpoint.capture()))
171
175
.thenReturn(driverEndpointRef)
176
+
177
+ // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
172
178
when(driverEndpointRef.ask[Boolean ]
173
179
(any(classOf [Any ]))
174
- (any())).thenReturn(mock[Future [Boolean ]])
180
+ (any())).thenReturn(successFuture)
181
+ when(successFuture.failed).thenReturn(Future [Throwable ] {
182
+ // emulate behavior of the Future.failed method.
183
+ throw new NoSuchElementException ()
184
+ }(ThreadUtils .sameThread))
175
185
}
176
186
177
187
test(" Basic lifecycle expectations when starting and stopping the scheduler." ) {
@@ -239,13 +249,14 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
239
249
verify(podOperations).create(SECOND_EXECUTOR_POD )
240
250
}
241
251
242
- test(" Deleting executors and then running an allocator pass after finding the loss reason" +
243
- " should only delete the pod once." ) {
252
+ test(" Scaled down executors should be cleaned up" ) {
244
253
sparkConf
245
254
.set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
246
255
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
247
256
val scheduler = newSchedulerBackend()
248
257
scheduler.start()
258
+
259
+ // The scheduler backend spins up one executor pod.
249
260
requestExecutorRunnable.getValue.run()
250
261
when(podOperations.create(any(classOf [Pod ])))
251
262
.thenAnswer(AdditionalAnswers .returnsFirstArg())
@@ -258,6 +269,8 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
258
269
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq .empty)
259
270
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext ])
260
271
.apply(registerFirstExecutorMessage)
272
+
273
+ // Request that there are 0 executors and trigger deletion from driver.
261
274
scheduler.doRequestTotalExecutors(0 )
262
275
requestExecutorRunnable.getAllValues.asScala.last.run()
263
276
scheduler.doKillExecutors(Seq (" 1" ))
@@ -268,6 +281,9 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
268
281
val exitedPod = exitPod(FIRST_EXECUTOR_POD , 0 )
269
282
executorPodsWatcherArgument.getValue.eventReceived(Action .DELETED , exitedPod)
270
283
allocatorRunnable.getValue.run()
284
+
285
+ // No more deletion attempts of the executors.
286
+ // This is graceful termination and should not be detected as a failure.
271
287
verify(podOperations, times(1 )).delete(FIRST_EXECUTOR_POD )
272
288
verify(driverEndpointRef, times(1 )).ask[Boolean ](
273
289
RemoveExecutor (" 1" , ExecutorExited (
@@ -277,10 +293,11 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
277
293
s " explicit termination request. " )))
278
294
}
279
295
280
- test(" Executors that disconnect from application errors are noted as exits caused by app ." ) {
296
+ test(" Executors that fail should not be deleted ." ) {
281
297
sparkConf
282
298
.set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
283
299
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
300
+
284
301
val scheduler = newSchedulerBackend()
285
302
scheduler.start()
286
303
expectPodCreationWithId(1 , FIRST_EXECUTOR_POD )
@@ -298,6 +315,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
298
315
executorPodsWatcherArgument.getValue.eventReceived(
299
316
Action .ERROR , exitPod(FIRST_EXECUTOR_POD , 1 ))
300
317
318
+ // A replacement executor should be created but the error pod should persist.
301
319
expectPodCreationWithId(2 , SECOND_EXECUTOR_POD )
302
320
scheduler.doRequestTotalExecutors(1 )
303
321
requestExecutorRunnable.getValue.run()
@@ -311,11 +329,11 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
311
329
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD )
312
330
}
313
331
314
- test(" Executors should only try to get the loss reason a number of times before giving up and" +
315
- " removing the executor." ) {
332
+ test(" Executors disconnected due to unknown reasons are deleted and replaced." ) {
316
333
sparkConf
317
334
.set(KUBERNETES_ALLOCATION_BATCH_SIZE , 1 )
318
335
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES , 1 )
336
+
319
337
val scheduler = newSchedulerBackend()
320
338
scheduler.start()
321
339
expectPodCreationWithId(1 , FIRST_EXECUTOR_POD )
@@ -329,11 +347,13 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
329
347
when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq .empty)
330
348
driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext ])
331
349
.apply(registerFirstExecutorMessage)
350
+
332
351
driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
333
352
1 to KubernetesClusterSchedulerBackend .MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ =>
334
353
allocatorRunnable.getValue.run()
335
354
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD )
336
355
}
356
+
337
357
expectPodCreationWithId(2 , SECOND_EXECUTOR_POD )
338
358
allocatorRunnable.getValue.run()
339
359
verify(podOperations).delete(FIRST_EXECUTOR_POD )
0 commit comments