Skip to content

Commit 15a72f3

Browse files
yaooqinnjiangxb1987
authored andcommitted
[SPARK-29287][CORE] Add LaunchedExecutor message to tell driver which executor is ready for making offers
### What changes were proposed in this pull request? Add `LaunchedExecuto`r message and send it to the driver when the executor if fully constructed, then the driver can assign the associated executor's totalCores to freeCores for making offers. ### Why are the changes needed? The executors send RegisterExecutor messages to the driver when onStart. The driver put the executor data in “the ready to serve map” if it could be, then send RegisteredExecutor back to the executor. The driver now can make an offer to this executor. But the executor is not fully constructed yet. When it received RegisteredExecutor, it start to construct itself, initializing block manager, maybe register to the local shuffle server in the way of retrying, then start the heart beating to driver ... The task allocated here may fail if the executor fails to start or cannot get heart beating to the driver in time. Sometimes, even worse, when dynamic allocation and blacklisting is enabled and when the runtime executor number down to min executor setting, and those executors receive tasks before fully constructed and if any error happens, the application may be blocked or tear down. ### Does this PR introduce any user-facing change? NO ### How was this patch tested? Closes apache#25964 from yaooqinn/SPARK-29287. Authored-by: Kent Yao <[email protected]> Signed-off-by: Xingbo Jiang <[email protected]>
1 parent 39b502a commit 15a72f3

File tree

4 files changed

+12
-3
lines changed

4 files changed

+12
-3
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
127127
logInfo("Successfully registered with driver")
128128
try {
129129
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
130+
driver.get.send(LaunchedExecutor(executorId))
130131
} catch {
131132
case NonFatal(e) =>
132133
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ private[spark] object CoarseGrainedClusterMessages {
6969
resources: Map[String, ResourceInformation])
7070
extends CoarseGrainedClusterMessage
7171

72+
case class LaunchedExecutor(executorId: String) extends CoarseGrainedClusterMessage
73+
7274
case class StatusUpdate(
7375
executorId: String,
7476
taskId: Long,

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
194194
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
195195
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
196196
removeExecutor(executorId, reason)
197+
198+
case LaunchedExecutor(executorId) =>
199+
executorDataMap.get(executorId).foreach { data =>
200+
data.freeCores = data.totalCores
201+
}
202+
makeOffers(executorId)
197203
}
198204

199205
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -230,7 +236,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
230236
taskResourceNumParts.getOrElse(v.name, 1)))
231237
}
232238
val data = new ExecutorData(executorRef, executorAddress, hostname,
233-
cores, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
239+
0, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes,
234240
resourcesInfo)
235241
// This must be synchronized because variables mutated
236242
// in this block are read when requesting executors
@@ -249,7 +255,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
249255
context.reply(true)
250256
listenerBus.post(
251257
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
252-
makeOffers()
253258
}
254259

255260
case StopDriver =>

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.internal.config
3434
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
3535
import org.apache.spark.scheduler.TaskSchedulerImpl
3636
import org.apache.spark.scheduler.cluster._
37-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RegisterExecutorFailed}
37+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed}
3838

3939
/**
4040
* End-to-end tests for dynamic allocation in standalone mode.
@@ -634,6 +634,7 @@ class StandaloneDynamicAllocationSuite
634634
Map.empty)
635635
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
636636
backend.driverEndpoint.askSync[Boolean](message)
637+
backend.driverEndpoint.send(LaunchedExecutor(id))
637638
}
638639
}
639640

0 commit comments

Comments
 (0)