Skip to content

Commit bc5ccad

Browse files
robreevesMridul Muralidharan
authored andcommitted
[SPARK-51660][CORE] Gracefully handle when MDC is not supported
### What changes were proposed in this pull request? This improves the handling when MDC is not supported. The previous handling only works when one task runs on an executor. After this fix multiple tasks can run on an executor when MDC is not supported. ### Why are the changes needed? In #35141 it added handling to gracefully handle when MDC is not available. It catches NoSuchFieldError, which is thrown during MDC initialization. This will work for the first task. If a second task runs it will not throw a NoSuchFieldError. Instead it throws NoClassDefFoundError. Then the job will hang indefinitely. This improves the fix so it works for all tasks instead of just the first one. ### Does this PR introduce _any_ user-facing change? Yes, it fixes a bug. It also changes a log message to include the exception for easier debugging. ### How was this patch tested? I tested it manually in spark-shell by manually throwing `NoSuchFieldError`. It logs this message and does not hang. ``` 25/03/28 14:11:48 INFO Executor: MDC is not supported. java.lang.NoSuchFieldError: this is a test at org.apache.spark.executor.Executor.liftedTree1$1(Executor.scala:118) at org.apache.spark.executor.Executor.mdcIsSupported$lzycompute(Executor.scala:114) at org.apache.spark.executor.Executor.mdcIsSupported(Executor.scala:113) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$setMDCForTask(Executor.scala:948) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:584) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #50452 from robreeves/SPARK-51660. Authored-by: Rob Reeves <roreeves@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 1fa05b8 commit bc5ccad

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -933,21 +933,17 @@ private[spark] class Executor(
933933
}
934934

935935
private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
936-
try {
936+
if (Executor.mdcIsSupported) {
937937
mdc.foreach { case (key, value) => MDC.put(key, value) }
938938
// avoid overriding the takName by the user
939939
MDC.put(taskNameMDCKey, taskName)
940-
} catch {
941-
case _: NoSuchFieldError => logInfo("MDC is not supported.")
942940
}
943941
}
944942

945943
private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
946-
try {
944+
if (Executor.mdcIsSupported) {
947945
mdc.foreach { case (key, _) => MDC.remove(key) }
948946
MDC.remove(taskNameMDCKey)
949-
} catch {
950-
case _: NoSuchFieldError => logInfo("MDC is not supported.")
951947
}
952948
}
953949

@@ -1318,7 +1314,7 @@ private[spark] class Executor(
13181314
}
13191315
}
13201316

1321-
private[spark] object Executor {
1317+
private[spark] object Executor extends Logging {
13221318
// This is reserved for internal use by components that need to read task properties before a
13231319
// task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be
13241320
// used instead.
@@ -1327,6 +1323,21 @@ private[spark] object Executor {
13271323
// Used to store executorSource, for local mode only
13281324
var executorSourceLocalModeOnly: ExecutorSource = null
13291325

1326+
lazy val mdcIsSupported: Boolean = {
1327+
try {
1328+
// This tests if any class initialization error is thrown
1329+
val testKey = System.nanoTime().toString
1330+
MDC.put(testKey, "testValue")
1331+
MDC.remove(testKey)
1332+
1333+
true
1334+
} catch {
1335+
case t: Throwable =>
1336+
logInfo("MDC is not supported.", t)
1337+
false
1338+
}
1339+
}
1340+
13301341
/**
13311342
* Whether a `Throwable` thrown from a task is a fatal error. We will use this to decide whether
13321343
* to kill the executor.

0 commit comments

Comments
 (0)