Skip to content

Commit ff98d78

Browse files
jinxingRobert Kruszewski
authored andcommitted
[SPARK-23637][YARN] Yarn might allocate more resource if a same executor is killed multiple times.
## What changes were proposed in this pull request? `YarnAllocator` uses `numExecutorsRunning` to track the number of running executor. `numExecutorsRunning` is used to check if there're executors missing and need to allocate more. In current code, `numExecutorsRunning` can be negative when driver asks to kill a same idle executor multiple times. ## How was this patch tested? UT added Author: jinxing <[email protected]> Closes apache#20781 from jinxing64/SPARK-23637.
1 parent 1f01d81 commit ff98d78

File tree

2 files changed

+66
-18
lines changed

2 files changed

+66
-18
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ private[yarn] class YarnAllocator(
8181
private val releasedContainers = Collections.newSetFromMap[ContainerId](
8282
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
8383

84-
private val numExecutorsRunning = new AtomicInteger(0)
84+
private val runningExecutors = Collections.newSetFromMap[String](
85+
new ConcurrentHashMap[String, java.lang.Boolean]())
8586

8687
private val numExecutorsStarting = new AtomicInteger(0)
8788

@@ -166,7 +167,7 @@ private[yarn] class YarnAllocator(
166167
clock = newClock
167168
}
168169

169-
def getNumExecutorsRunning: Int = numExecutorsRunning.get()
170+
def getNumExecutorsRunning: Int = runningExecutors.size()
170171

171172
def getNumExecutorsFailed: Int = synchronized {
172173
val endTime = clock.getTimeMillis()
@@ -242,12 +243,11 @@ private[yarn] class YarnAllocator(
242243
* Request that the ResourceManager release the container running the specified executor.
243244
*/
244245
def killExecutor(executorId: String): Unit = synchronized {
245-
if (executorIdToContainer.contains(executorId)) {
246-
val container = executorIdToContainer.get(executorId).get
247-
internalReleaseContainer(container)
248-
numExecutorsRunning.decrementAndGet()
249-
} else {
250-
logWarning(s"Attempted to kill unknown executor $executorId!")
246+
executorIdToContainer.get(executorId) match {
247+
case Some(container) if !releasedContainers.contains(container.getId) =>
248+
internalReleaseContainer(container)
249+
runningExecutors.remove(executorId)
250+
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
251251
}
252252
}
253253

@@ -274,7 +274,7 @@ private[yarn] class YarnAllocator(
274274
"Launching executor count: %d. Cluster resources: %s.")
275275
.format(
276276
allocatedContainers.size,
277-
numExecutorsRunning.get,
277+
runningExecutors.size,
278278
numExecutorsStarting.get,
279279
allocateResponse.getAvailableResources))
280280

@@ -286,7 +286,7 @@ private[yarn] class YarnAllocator(
286286
logDebug("Completed %d containers".format(completedContainers.size))
287287
processCompletedContainers(completedContainers.asScala)
288288
logDebug("Finished processing %d completed containers. Current running executor count: %d."
289-
.format(completedContainers.size, numExecutorsRunning.get))
289+
.format(completedContainers.size, runningExecutors.size))
290290
}
291291
}
292292

@@ -300,9 +300,9 @@ private[yarn] class YarnAllocator(
300300
val pendingAllocate = getPendingAllocate
301301
val numPendingAllocate = pendingAllocate.size
302302
val missing = targetNumExecutors - numPendingAllocate -
303-
numExecutorsStarting.get - numExecutorsRunning.get
303+
numExecutorsStarting.get - runningExecutors.size
304304
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
305-
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
305+
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
306306
s"executorsStarting: ${numExecutorsStarting.get}")
307307

308308
if (missing > 0) {
@@ -502,7 +502,7 @@ private[yarn] class YarnAllocator(
502502
s"for executor with ID $executorId")
503503

504504
def updateInternalState(): Unit = synchronized {
505-
numExecutorsRunning.incrementAndGet()
505+
runningExecutors.add(executorId)
506506
numExecutorsStarting.decrementAndGet()
507507
executorIdToContainer(executorId) = container
508508
containerIdToExecutorId(container.getId) = executorId
@@ -513,7 +513,7 @@ private[yarn] class YarnAllocator(
513513
allocatedContainerToHostMap.put(containerId, executorHostname)
514514
}
515515

516-
if (numExecutorsRunning.get < targetNumExecutors) {
516+
if (runningExecutors.size() < targetNumExecutors) {
517517
numExecutorsStarting.incrementAndGet()
518518
if (launchContainers) {
519519
launcherPool.execute(new Runnable {
@@ -554,7 +554,7 @@ private[yarn] class YarnAllocator(
554554
} else {
555555
logInfo(("Skip launching executorRunnable as running executors count: %d " +
556556
"reached target executors count: %d.").format(
557-
numExecutorsRunning.get, targetNumExecutors))
557+
runningExecutors.size, targetNumExecutors))
558558
}
559559
}
560560
}
@@ -569,7 +569,11 @@ private[yarn] class YarnAllocator(
569569
val exitReason = if (!alreadyReleased) {
570570
// Decrement the number of executors running. The next iteration of
571571
// the ApplicationMaster's reporting thread will take care of allocating.
572-
numExecutorsRunning.decrementAndGet()
572+
containerIdToExecutorId.get(containerId) match {
573+
case Some(executorId) => runningExecutors.remove(executorId)
574+
case None => logWarning(s"Cannot find executorId for container: ${containerId.toString}")
575+
}
576+
573577
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
574578
containerId,
575579
onHostStr,

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
251251
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
252252
}
253253
handler.updateResourceRequests()
254-
handler.processCompletedContainers(statuses.toSeq)
254+
handler.processCompletedContainers(statuses)
255255
handler.getNumExecutorsRunning should be (0)
256256
handler.getPendingAllocate.size should be (1)
257257
}
258258

259+
test("kill same executor multiple times") {
260+
val handler = createAllocator(2)
261+
handler.updateResourceRequests()
262+
handler.getNumExecutorsRunning should be (0)
263+
handler.getPendingAllocate.size should be (2)
264+
265+
val container1 = createContainer("host1")
266+
val container2 = createContainer("host2")
267+
handler.handleAllocatedContainers(Array(container1, container2))
268+
handler.getNumExecutorsRunning should be (2)
269+
handler.getPendingAllocate.size should be (0)
270+
271+
val executorToKill = handler.executorIdToContainer.keys.head
272+
handler.killExecutor(executorToKill)
273+
handler.getNumExecutorsRunning should be (1)
274+
handler.killExecutor(executorToKill)
275+
handler.killExecutor(executorToKill)
276+
handler.killExecutor(executorToKill)
277+
handler.getNumExecutorsRunning should be (1)
278+
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
279+
handler.updateResourceRequests()
280+
handler.getPendingAllocate.size should be (1)
281+
}
282+
283+
test("process same completed container multiple times") {
284+
val handler = createAllocator(2)
285+
handler.updateResourceRequests()
286+
handler.getNumExecutorsRunning should be (0)
287+
handler.getPendingAllocate.size should be (2)
288+
289+
val container1 = createContainer("host1")
290+
val container2 = createContainer("host2")
291+
handler.handleAllocatedContainers(Array(container1, container2))
292+
handler.getNumExecutorsRunning should be (2)
293+
handler.getPendingAllocate.size should be (0)
294+
295+
val statuses = Seq(container1, container1, container2).map { c =>
296+
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
297+
}
298+
handler.processCompletedContainers(statuses)
299+
handler.getNumExecutorsRunning should be (0)
300+
301+
}
302+
259303
test("lost executor removed from backend") {
260304
val handler = createAllocator(4)
261305
handler.updateResourceRequests()
@@ -272,7 +316,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
272316
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
273317
}
274318
handler.updateResourceRequests()
275-
handler.processCompletedContainers(statuses.toSeq)
319+
handler.processCompletedContainers(statuses)
276320
handler.updateResourceRequests()
277321
handler.getNumExecutorsRunning should be (0)
278322
handler.getPendingAllocate.size should be (2)

0 commit comments

Comments
 (0)