Skip to content

Commit e16e8c7

Browse files
Devaraj Kzsxwing
authored andcommitted
[SPARK-21146][CORE] Master/Worker should handle and shutdown when any thread gets UncaughtException
## What changes were proposed in this pull request? Adding the default UncaughtExceptionHandler to the Worker. ## How was this patch tested? I verified it manually, when any of the worker thread gets uncaught exceptions then the default UncaughtExceptionHandler will handle those exceptions. Author: Devaraj K <[email protected]> Closes apache#18357 from devaraj-kavali/SPARK-21146.
1 parent 24367f2 commit e16e8c7

File tree

6 files changed

+17
-10
lines changed

6 files changed

+17
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
3636
import org.apache.spark.metrics.MetricsSystem
3737
import org.apache.spark.rpc._
3838
import org.apache.spark.serializer.{JavaSerializer, Serializer}
39-
import org.apache.spark.util.{ThreadUtils, Utils}
39+
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
4040

4141
private[deploy] class Master(
4242
override val rpcEnv: RpcEnv,
@@ -1045,6 +1045,8 @@ private[deploy] object Master extends Logging {
10451045
val ENDPOINT_NAME = "Master"
10461046

10471047
def main(argStrings: Array[String]) {
1048+
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
1049+
exitOnUncaughtException = false))
10481050
Utils.initDaemon(log)
10491051
val conf = new SparkConf
10501052
val args = new MasterArguments(argStrings, conf)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
3838
import org.apache.spark.internal.Logging
3939
import org.apache.spark.metrics.MetricsSystem
4040
import org.apache.spark.rpc._
41-
import org.apache.spark.util.{ThreadUtils, Utils}
41+
import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
4242

4343
private[deploy] class Worker(
4444
override val rpcEnv: RpcEnv,
@@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging {
737737
val ENDPOINT_NAME = "Worker"
738738

739739
def main(argStrings: Array[String]) {
740+
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
741+
exitOnUncaughtException = false))
740742
Utils.initDaemon(log)
741743
val conf = new SparkConf
742744
val args = new WorkerArguments(argStrings, conf)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[spark] class Executor(
5656
env: SparkEnv,
5757
userClassPath: Seq[URL] = Nil,
5858
isLocal: Boolean = false,
59-
uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
59+
uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
6060
extends Logging {
6161

6262
logInfo(s"Starting executor ID $executorId on host $executorHostname")

core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.util
2020
import org.apache.spark.internal.Logging
2121

2222
/**
23-
* The default uncaught exception handler for Executors terminates the whole process, to avoid
24-
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
25-
* to fail fast when things go wrong.
23+
* The default uncaught exception handler for Spark daemons. It terminates the whole process for
24+
* any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
25+
*
26+
* @param exitOnUncaughtException Whether to exit the process on UncaughtException.
2627
*/
27-
private[spark] object SparkUncaughtExceptionHandler
28+
private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
2829
extends Thread.UncaughtExceptionHandler with Logging {
2930

3031
override def uncaughtException(thread: Thread, exception: Throwable) {
@@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler
4041
if (!ShutdownHookManager.inShutdown()) {
4142
if (exception.isInstanceOf[OutOfMemoryError]) {
4243
System.exit(SparkExitCode.OOM)
43-
} else {
44+
} else if (exitOnUncaughtException) {
4445
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
4546
}
4647
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ private[spark] object CallSite {
7676
private[spark] object Utils extends Logging {
7777
val random = new Random()
7878

79+
private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
80+
7981
/**
8082
* Define a default value for driver memory here since this value is referenced across the code
8183
* base and nearly all files already use Utils.scala
@@ -1274,7 +1276,7 @@ private[spark] object Utils extends Logging {
12741276
block
12751277
} catch {
12761278
case e: ControlThrowable => throw e
1277-
case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
1279+
case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
12781280
}
12791281
}
12801282

resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
9797
with CommandLineUtils {
9898

9999
override def main(args: Array[String]) {
100-
Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
100+
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
101101
Utils.initDaemon(log)
102102
val conf = new SparkConf
103103
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)

0 commit comments

Comments
 (0)