Skip to content

Commit 51066b4

Browse files
Devaraj KMarcelo Vanzin
authored andcommitted
[SPARK-14228][CORE][YARN] Lost executor of RPC disassociated, and occurs exception: Could not find CoarseGrainedScheduler or it has been stopped
## What changes were proposed in this pull request? I see the two instances where the exception is occurring. **Instance 1:** ``` 17/11/10 15:49:32 ERROR util.Utils: Uncaught exception in thread driver-revive-thread org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140) at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:187) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:521) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1.apply$mcV$sp(CoarseGrainedSchedulerBackend.scala:125) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1344) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1.run(CoarseGrainedSchedulerBackend.scala:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` In CoarseGrainedSchedulerBackend.scala, driver-revive-thread starts with DriverEndpoint.onStart() and keeps sending the ReviveOffers messages periodically till it gets shutdown as part DriverEndpoint.onStop(). There is no proper coordination between the driver-revive-thread(shutdown) and the RpcEndpoint unregister, RpcEndpoint unregister happens first and then driver-revive-thread shuts down as part of DriverEndpoint.onStop(), In-between driver-revive-thread may try to send the ReviveOffers message which is leading to the above exception. To fix this issue, this PR moves the shutting down of driver-revive-thread to CoarseGrainedSchedulerBackend.stop() which executes before the DriverEndpoint unregister. **Instance 2:** ``` 17/11/10 16:31:38 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Error requesting driver to remove executor 1 for reason Executor for container container_1508535467865_0226_01_000002 exited because of a YARN event (e.g., pre-emption) and not because of an error in the running job. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160) at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135) at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:516) at org.apache.spark.rpc.RpcEndpointRef.ask(RpcEndpointRef.scala:63) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:269) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Here YarnDriverEndpoint tries to send remove executor messages after the Yarn scheduler backend service stop, which is leading to the above exception. To avoid the above exception, 1) We may add a condition(which checks whether service has stopped or not) before sending executor remove message 2) Add a warn log message in onFailure case when the service is already stopped In this PR, chosen the 2) option which adds a log message in the case of onFailure without the exception stack trace since the option 1) would need to to go through for every remove executor message. ## How was this patch tested? I verified it manually, I don't see these exceptions with the PR changes. Author: Devaraj K <[email protected]> Closes #19741 from devaraj-kavali/SPARK-14228.
1 parent 4286cba commit 51066b4

File tree

4 files changed

+22
-40
lines changed

4 files changed

+22
-40
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
165165
}
166166

167167
if (notifyDriver && driver.nonEmpty) {
168-
driver.get.ask[Boolean](
169-
RemoveExecutor(executorId, new ExecutorLossReason(reason))
170-
).failed.foreach(e =>
171-
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
172-
)(ThreadUtils.sameThread)
168+
driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
173169
}
174170

175171
System.exit(code)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9595
// The num of current max ExecutorId used to re-register appMaster
9696
@volatile protected var currentExecutorIdCounter = 0
9797

98+
private val reviveThread =
99+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
100+
98101
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
99102
extends ThreadSafeRpcEndpoint with Logging {
100103

@@ -103,9 +106,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
103106

104107
protected val addressToExecutorId = new HashMap[RpcAddress, String]
105108

106-
private val reviveThread =
107-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
108-
109109
override def onStart() {
110110
// Periodically revive offers to allow delay scheduling to work
111111
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
@@ -154,6 +154,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
154154
executorDataMap.values.foreach { ed =>
155155
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
156156
}
157+
158+
case RemoveExecutor(executorId, reason) =>
159+
// We will remove the executor's state and cannot restore it. However, the connection
160+
// between the driver and the executor may be still alive so that the executor won't exit
161+
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
162+
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
163+
removeExecutor(executorId, reason)
157164
}
158165

159166
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -215,14 +222,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
215222
}
216223
context.reply(true)
217224

218-
case RemoveExecutor(executorId, reason) =>
219-
// We will remove the executor's state and cannot restore it. However, the connection
220-
// between the driver and the executor may be still alive so that the executor won't exit
221-
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
222-
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
223-
removeExecutor(executorId, reason)
224-
context.reply(true)
225-
226225
case RemoveWorker(workerId, host, message) =>
227226
removeWorker(workerId, host, message)
228227
context.reply(true)
@@ -373,10 +372,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
373372

374373
shouldDisable
375374
}
376-
377-
override def onStop() {
378-
reviveThread.shutdownNow()
379-
}
380375
}
381376

382377
var driverEndpoint: RpcEndpointRef = null
@@ -417,6 +412,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
417412
}
418413

419414
override def stop() {
415+
reviveThread.shutdownNow()
420416
stopExecutors()
421417
try {
422418
if (driverEndpoint != null) {
@@ -465,9 +461,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
465461
* at once.
466462
*/
467463
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
468-
// Only log the failure since we don't care about the result.
469-
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
470-
logError(t.getMessage, t))(ThreadUtils.sameThread)
464+
driverEndpoint.send(RemoveExecutor(executorId, reason))
471465
}
472466

473467
protected def removeWorker(workerId: String, host: String, message: String): Unit = {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
282282
// No more deletion attempts of the executors.
283283
// This is graceful termination and should not be detected as a failure.
284284
verify(podOperations, times(1)).delete(resolvedPod)
285-
verify(driverEndpointRef, times(1)).ask[Boolean](
285+
verify(driverEndpointRef, times(1)).send(
286286
RemoveExecutor("1", ExecutorExited(
287287
0,
288288
exitCausedByApp = false,
@@ -318,7 +318,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
318318
requestExecutorRunnable.getValue.run()
319319
allocatorRunnable.getAllValues.asScala.last.run()
320320
verify(podOperations, never()).delete(firstResolvedPod)
321-
verify(driverEndpointRef).ask[Boolean](
321+
verify(driverEndpointRef).send(
322322
RemoveExecutor("1", ExecutorExited(
323323
1,
324324
exitCausedByApp = true,
@@ -356,7 +356,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
356356
val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
357357
allocatorRunnable.getValue.run()
358358
verify(podOperations).delete(firstResolvedPod)
359-
verify(driverEndpointRef).ask[Boolean](
359+
verify(driverEndpointRef).send(
360360
RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
361361
}
362362

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
2020
import java.util.concurrent.atomic.{AtomicBoolean}
2121

2222
import scala.concurrent.{ExecutionContext, Future}
23+
import scala.concurrent.ExecutionContext.Implicits.global
2324
import scala.util.{Failure, Success}
2425
import scala.util.control.NonFatal
2526

@@ -245,14 +246,7 @@ private[spark] abstract class YarnSchedulerBackend(
245246
Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
246247
}
247248

248-
removeExecutorMessage
249-
.flatMap { message =>
250-
driverEndpoint.ask[Boolean](message)
251-
}(ThreadUtils.sameThread)
252-
.onFailure {
253-
case NonFatal(e) => logError(
254-
s"Error requesting driver to remove executor $executorId after disconnection.", e)
255-
}(ThreadUtils.sameThread)
249+
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
256250
}
257251

258252
override def receive: PartialFunction[Any, Unit] = {
@@ -265,12 +259,10 @@ private[spark] abstract class YarnSchedulerBackend(
265259
addWebUIFilter(filterName, filterParams, proxyBase)
266260

267261
case r @ RemoveExecutor(executorId, reason) =>
268-
logWarning(reason.toString)
269-
driverEndpoint.ask[Boolean](r).onFailure {
270-
case e =>
271-
logError("Error requesting driver to remove executor" +
272-
s" $executorId for reason $reason", e)
273-
}(ThreadUtils.sameThread)
262+
if (!stopped.get) {
263+
logWarning(s"Requesting driver to remove executor $executorId for reason $reason")
264+
driverEndpoint.send(r)
265+
}
274266
}
275267

276268

0 commit comments

Comments
 (0)