Skip to content

Commit adf005d

Browse files
John LeeTom Graves
authored andcommitted
[SPARK-21656][CORE] spark dynamic allocation should not idle timeout executors when tasks still to run
## What changes were proposed in this pull request? Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run. We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run. ## How was this patch tested? Tested by manually adding executors to `executorsIdsToBeRemoved` list and seeing if those executors were removed when there are a lot of tasks and a high `numExecutorsTarget` value. Code used In `ExecutorAllocationManager.start()` ``` start_time = clock.getTimeMillis() ``` In `ExecutorAllocationManager.schedule()` ``` val executorIdsToBeRemoved = ArrayBuffer[String]() if ( now > start_time + 1000 * 60 * 2) { logInfo("--- REMOVING 1/2 of the EXECUTORS ---") start_time += 1000 * 60 * 100 var counter = 0 for (x <- executorIds) { counter += 1 if (counter == 2) { counter = 0 executorIdsToBeRemoved += x } } } Author: John Lee <[email protected]> Closes apache#18874 from yoonlee95/SPARK-21656.
1 parent 0bb8d1f commit adf005d

File tree

2 files changed

+89
-35
lines changed

2 files changed

+89
-35
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,10 @@ private[spark] class ExecutorAllocationManager(
420420
executors.foreach { executorIdToBeRemoved =>
421421
if (newExecutorTotal - 1 < minNumExecutors) {
422422
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
423-
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
423+
s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
424+
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
425+
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
426+
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
424427
} else if (canBeKilled(executorIdToBeRemoved)) {
425428
executorIdsToBeRemoved += executorIdToBeRemoved
426429
newExecutorTotal -= 1

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
314314
assert(executorsPendingToRemove(manager).isEmpty)
315315
}
316316

317+
test ("Removing with various numExecutorsTarget condition") {
318+
sc = createSparkContext(5, 12, 5)
319+
val manager = sc.executorAllocationManager.get
320+
321+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8)))
322+
323+
// Remove when numExecutorsTarget is the same as the current number of executors
324+
assert(addExecutors(manager) === 1)
325+
assert(addExecutors(manager) === 2)
326+
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
327+
info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
328+
assert(executorIds(manager).size === 8)
329+
assert(numExecutorsTarget(manager) === 8)
330+
assert(maxNumExecutorsNeeded(manager) == 8)
331+
assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors
332+
333+
// Remove executors when numExecutorsTarget is lower than current number of executors
334+
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
335+
info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) }
336+
adjustRequestedExecutors(manager)
337+
assert(executorIds(manager).size === 8)
338+
assert(numExecutorsTarget(manager) === 5)
339+
assert(maxNumExecutorsNeeded(manager) == 5)
340+
assert(removeExecutor(manager, "1"))
341+
assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
342+
onExecutorRemoved(manager, "1")
343+
onExecutorRemoved(manager, "2")
344+
onExecutorRemoved(manager, "3")
345+
346+
// numExecutorsTarget is lower than minNumExecutors
347+
sc.listenerBus.postToAll(
348+
SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null))
349+
assert(executorIds(manager).size === 5)
350+
assert(numExecutorsTarget(manager) === 5)
351+
assert(maxNumExecutorsNeeded(manager) == 4)
352+
assert(!removeExecutor(manager, "4")) // lower limit
353+
assert(addExecutors(manager) === 0) // upper limit
354+
}
355+
317356
test ("interleaving add and remove") {
318-
sc = createSparkContext(5, 10, 5)
357+
sc = createSparkContext(5, 12, 5)
319358
val manager = sc.executorAllocationManager.get
320359
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
321360

@@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
331370
onExecutorAdded(manager, "7")
332371
onExecutorAdded(manager, "8")
333372
assert(executorIds(manager).size === 8)
373+
assert(numExecutorsTarget(manager) === 8)
334374

335-
// Remove until limit
336-
assert(removeExecutor(manager, "1"))
337-
assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
338-
assert(!removeExecutor(manager, "4")) // lower limit reached
339-
assert(!removeExecutor(manager, "5"))
340-
onExecutorRemoved(manager, "1")
341-
onExecutorRemoved(manager, "2")
342-
onExecutorRemoved(manager, "3")
343-
assert(executorIds(manager).size === 5)
344375

345-
// Add until limit
346-
assert(addExecutors(manager) === 2) // upper limit reached
347-
assert(addExecutors(manager) === 0)
348-
assert(!removeExecutor(manager, "4")) // still at lower limit
349-
assert((manager, Seq("5")) !== Seq("5"))
376+
// Remove when numTargetExecutors is equal to the current number of executors
377+
assert(!removeExecutor(manager, "1"))
378+
assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))
379+
380+
// Remove until limit
350381
onExecutorAdded(manager, "9")
351382
onExecutorAdded(manager, "10")
352383
onExecutorAdded(manager, "11")
353384
onExecutorAdded(manager, "12")
354-
onExecutorAdded(manager, "13")
355-
assert(executorIds(manager).size === 10)
385+
assert(executorIds(manager).size === 12)
386+
assert(numExecutorsTarget(manager) === 8)
356387

357-
// Remove succeeds again, now that we are no longer at the lower limit
358-
assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
359-
assert(removeExecutor(manager, "7"))
360-
assert(executorIds(manager).size === 10)
361-
assert(addExecutors(manager) === 0)
388+
assert(removeExecutor(manager, "1"))
389+
assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
390+
assert(!removeExecutor(manager, "5")) // lower limit reached
391+
assert(!removeExecutor(manager, "6"))
392+
onExecutorRemoved(manager, "1")
393+
onExecutorRemoved(manager, "2")
394+
onExecutorRemoved(manager, "3")
362395
onExecutorRemoved(manager, "4")
363-
onExecutorRemoved(manager, "5")
364396
assert(executorIds(manager).size === 8)
365397

366-
// Number of executors pending restarts at 1
367-
assert(numExecutorsToAdd(manager) === 1)
368-
assert(addExecutors(manager) === 0)
369-
assert(executorIds(manager).size === 8)
370-
onExecutorRemoved(manager, "6")
371-
onExecutorRemoved(manager, "7")
398+
// Add until limit
399+
assert(!removeExecutor(manager, "7")) // still at lower limit
400+
assert((manager, Seq("8")) !== Seq("8"))
401+
onExecutorAdded(manager, "13")
372402
onExecutorAdded(manager, "14")
373403
onExecutorAdded(manager, "15")
374-
assert(executorIds(manager).size === 8)
375-
assert(addExecutors(manager) === 0) // still at upper limit
376404
onExecutorAdded(manager, "16")
405+
assert(executorIds(manager).size === 12)
406+
407+
// Remove succeeds again, now that we are no longer at the lower limit
408+
assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
409+
assert(removeExecutor(manager, "8"))
410+
assert(executorIds(manager).size === 12)
411+
onExecutorRemoved(manager, "5")
412+
onExecutorRemoved(manager, "6")
413+
assert(executorIds(manager).size === 10)
414+
assert(numExecutorsToAdd(manager) === 4)
415+
onExecutorRemoved(manager, "9")
416+
onExecutorRemoved(manager, "10")
417+
assert(addExecutors(manager) === 4) // at upper limit
377418
onExecutorAdded(manager, "17")
419+
onExecutorAdded(manager, "18")
378420
assert(executorIds(manager).size === 10)
379-
assert(numExecutorsTarget(manager) === 10)
421+
assert(addExecutors(manager) === 0) // still at upper limit
422+
onExecutorAdded(manager, "19")
423+
onExecutorAdded(manager, "20")
424+
assert(executorIds(manager).size === 12)
425+
assert(numExecutorsTarget(manager) === 12)
380426
}
381427

382428
test("starting/canceling add timer") {
@@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
915961
onExecutorAdded(manager, "third")
916962
onExecutorAdded(manager, "fourth")
917963
onExecutorAdded(manager, "fifth")
918-
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
964+
onExecutorAdded(manager, "sixth")
965+
onExecutorAdded(manager, "seventh")
966+
onExecutorAdded(manager, "eighth")
967+
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
968+
"sixth", "seventh", "eighth"))
919969

920970
removeExecutor(manager, "first")
921971
removeExecutors(manager, Seq("second", "third"))
922972
assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
923-
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
973+
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
974+
"sixth", "seventh", "eighth"))
924975

925976

926977
// Cluster manager lost will make all the live executors lost, so here simulate this behavior

0 commit comments

Comments
 (0)