Skip to content

Commit 8eb9e34

Browse files
wbo4958HyukjinKwon
authored andcommitted
[SPARK-51537][CONNECT][CORE][3.5] construct the session-specific classloader based on the default session classloader on executor
### What changes were proposed in this pull request? This PR is to construct the session-specific classloader based on the default session classloader which has already added the global jars (e.g., added by `--jars` ) into the classpath on the executor side in the connect mode. ### Why are the changes needed? In Spark Connect mode, when connecting to a non-local (e.g., standalone) cluster, the executor creates an isolated session state that includes a session-specific classloader for each task. However, a notable issue arises: this session-specific classloader does not include the global JARs specified by the --jars option in the classpath. This oversight can lead to deserialization exceptions. For example: ``` console Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The newly added test can pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50475 from wbo4958/classloader-3.5. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 50634db commit 8eb9e34

File tree

2 files changed

+57
-13
lines changed

2 files changed

+57
-13
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ private[spark] class Executor(
175175
val currentJars = new HashMap[String, Long]
176176
val currentArchives = new HashMap[String, Long]
177177
val urlClassLoader =
178-
createClassLoader(currentJars, isStubbingEnabledForState(jobArtifactState.uuid))
178+
createClassLoader(currentJars, isStubbingEnabledForState(jobArtifactState.uuid),
179+
isDefaultState(jobArtifactState.uuid))
179180
val replClassLoader = addReplClassLoaderIfNeeded(
180181
urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
181182
new IsolatedSessionState(
@@ -1029,7 +1030,8 @@ private[spark] class Executor(
10291030
*/
10301031
private def createClassLoader(
10311032
currentJars: HashMap[String, Long],
1032-
useStub: Boolean): MutableURLClassLoader = {
1033+
useStub: Boolean,
1034+
isDefaultSession: Boolean): MutableURLClassLoader = {
10331035
// Bootstrap the list of jars with the user class path.
10341036
val now = System.currentTimeMillis()
10351037
userClassPath.foreach { url =>
@@ -1041,43 +1043,57 @@ private[spark] class Executor(
10411043
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
10421044
new File(uri.split("/").last).toURI.toURL
10431045
}
1044-
createClassLoader(urls, useStub)
1046+
createClassLoader(urls, useStub, isDefaultSession)
10451047
}
10461048

1047-
private def createClassLoader(urls: Array[URL], useStub: Boolean): MutableURLClassLoader = {
1049+
private def createClassLoader(urls: Array[URL],
1050+
useStub: Boolean,
1051+
isDefaultSession: Boolean): MutableURLClassLoader = {
10481052
logInfo(
10491053
s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
10501054
urls.mkString("'", ",", "'")
10511055
)
10521056

10531057
if (useStub) {
1054-
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
1058+
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES), isDefaultSession)
10551059
} else {
1056-
createClassLoader(urls)
1060+
createClassLoader(urls, isDefaultSession)
10571061
}
10581062
}
10591063

1060-
private def createClassLoader(urls: Array[URL]): MutableURLClassLoader = {
1064+
private def createClassLoader(urls: Array[URL],
1065+
isDefaultSession: Boolean): MutableURLClassLoader = {
1066+
// SPARK-51537: The isolated session must *inherit* the classloader from the default session,
1067+
// which has already included the global JARs specified via --jars. For Spark plugins, we
1068+
// cannot simply add the plugin JARs to the classpath of the isolated session, as this may
1069+
// cause the plugin to be reloaded, leading to potential conflicts or unexpected behavior.
1070+
val loader = if (isDefaultSession) systemLoader else defaultSessionState.replClassLoader
10611071
if (userClassPathFirst) {
1062-
new ChildFirstURLClassLoader(urls, systemLoader)
1072+
new ChildFirstURLClassLoader(urls, loader)
10631073
} else {
1064-
new MutableURLClassLoader(urls, systemLoader)
1074+
new MutableURLClassLoader(urls, loader)
10651075
}
10661076
}
10671077

10681078
private def createClassLoaderWithStub(
10691079
urls: Array[URL],
1070-
binaryName: Seq[String]): MutableURLClassLoader = {
1080+
binaryName: Seq[String],
1081+
isDefaultSession: Boolean): MutableURLClassLoader = {
1082+
// SPARK-51537: The isolated session must *inherit* the classloader from the default session,
1083+
// which has already included the global JARs specified via --jars. For Spark plugins, we
1084+
// cannot simply add the plugin JARs to the classpath of the isolated session, as this may
1085+
// cause the plugin to be reloaded, leading to potential conflicts or unexpected behavior.
1086+
val loader = if (isDefaultSession) systemLoader else defaultSessionState.replClassLoader
10711087
if (userClassPathFirst) {
10721088
// user -> (sys -> stub)
10731089
val stubClassLoader =
1074-
StubClassLoader(systemLoader, binaryName)
1090+
StubClassLoader(loader, binaryName)
10751091
new ChildFirstURLClassLoader(urls, stubClassLoader)
10761092
} else {
10771093
// sys -> user -> stub
10781094
val stubClassLoader =
10791095
StubClassLoader(null, binaryName)
1080-
new ChildFirstURLClassLoader(urls, stubClassLoader, systemLoader)
1096+
new ChildFirstURLClassLoader(urls, stubClassLoader, loader)
10811097
}
10821098
}
10831099

@@ -1176,7 +1192,8 @@ private[spark] class Executor(
11761192
}
11771193
if (renewClassLoader) {
11781194
// Recreate the class loader to ensure all classes are updated.
1179-
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)
1195+
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs,
1196+
useStub = true, isDefaultState(state.sessionUUID))
11801197
state.replClassLoader =
11811198
addReplClassLoaderIfNeeded(state.urlClassLoader, state.replClassDirUri, state.sessionUUID)
11821199
}

core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,31 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
109109
}
110110
}
111111
}
112+
113+
test("SPARK-51537 Executor isolation session classloader inherits from " +
114+
"default session classloader") {
115+
sc = new SparkContext(new SparkConf()
116+
.setAppName("test")
117+
.setMaster("local")
118+
.set("spark.jars", jar2))
119+
120+
// TestHelloV2's test method returns '2'
121+
val artifactSetWithHelloV2 = new JobArtifactSet(
122+
Some(JobArtifactState(uuid = "hello2", replClassDirUri = None)),
123+
jars = Map.empty,
124+
files = Map.empty,
125+
archives = Map.empty
126+
)
127+
128+
JobArtifactSet.withActiveJobArtifactState(artifactSetWithHelloV2.state.get) {
129+
sc.parallelize(1 to 1).foreach { i =>
130+
val cls = Utils.classForName("com.example.Hello$")
131+
val module = cls.getField("MODULE$").get(null)
132+
val result = cls.getMethod("test").invoke(module).asInstanceOf[Int]
133+
if (result != 2) {
134+
throw new RuntimeException("Unexpected result: " + result)
135+
}
136+
}
137+
}
138+
}
112139
}

0 commit comments

Comments
 (0)