Skip to content

Commit 3f435fd

Browse files
rynorrisRobert Kruszewski
authored andcommitted
Fix dynamic allocation with external shuffle service (apache-spark-on-k8s#445)
1 parent d2a8723 commit 3f435fd

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
2525

2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

28-
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
@@ -118,6 +118,8 @@ private[spark] class ExecutorAllocationManager(
118118
private val inactiveShuffleExecutorIdleTimeoutS = conf.getTimeAsSeconds(
119119
"spark.dynamicAllocation.inactiveShuffleExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")
120120

121+
private val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
122+
121123
// During testing, the methods to actually kill and add executors are mocked out
122124
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
123125

@@ -599,7 +601,7 @@ private[spark] class ExecutorAllocationManager(
599601
*/
600602
private def onExecutorIdle(executorId: String): Unit = synchronized {
601603
if (executorIds.contains(executorId)) {
602-
val hasActiveShuffleBlocks =
604+
val hasActiveShuffleBlocks = !externalShuffleServiceEnabled &&
603605
mapOutputTracker.hasOutputsOnExecutor(executorId, activeOnly = true)
604606
if (!removeTimes.contains(executorId)
605607
&& !executorsPendingToRemove.contains(executorId)
@@ -608,7 +610,8 @@ private[spark] class ExecutorAllocationManager(
608610
// blocks we are concerned with are reported to the driver. Note that this
609611
// does not include broadcast blocks.
610612
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
611-
val hasAnyShuffleBlocks = mapOutputTracker.hasOutputsOnExecutor(executorId)
613+
val hasAnyShuffleBlocks =
614+
!externalShuffleServiceEnabled && mapOutputTracker.hasOutputsOnExecutor(executorId)
612615
val now = clock.getTimeMillis()
613616

614617
// Use the maximum of all the timeouts that apply.

0 commit comments

Comments
 (0)