1717
1818package org .apache .spark .executor
1919
20+ import java .io .File
21+ import java .net .URL
22+
2023import scala .util .Properties
2124
22- import org .apache .spark .{JobArtifactSet , JobArtifactState , LocalSparkContext , SparkConf , SparkContext , SparkFunSuite }
23- import org .apache .spark .util .Utils
25+ import org .apache .spark .{JobArtifactSet , JobArtifactState , LocalSparkContext , SparkConf , SparkContext , SparkFunSuite , TestUtils }
26+ import org .apache .spark .util .{MutableURLClassLoader , Utils }
27+
2428
2529class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
2630
@@ -126,7 +130,7 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
126130 )
127131
128132 JobArtifactSet .withActiveJobArtifactState(artifactSetWithHelloV2.state.get) {
129- sc.parallelize(1 to 1 ).foreach { i =>
133+ sc.parallelize(1 to 1 ).foreach { _ =>
130134 val cls = Utils .classForName(" com.example.Hello$" )
131135 val module = cls.getField(" MODULE$" ).get(null )
132136 val result = cls.getMethod(" test" ).invoke(module).asInstanceOf [Int ]
@@ -136,4 +140,106 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
136140 }
137141 }
138142 }
143+
144+ test(" SPARK-51537 Executor isolation avoids reloading plugin jars" ) {
145+ val tempDir = Utils .createTempDir()
146+
147+ val testCodeBody =
148+ s """
149+ | public static boolean flag = false;
150+ | """ .stripMargin
151+
152+ val compiledTestCode = TestUtils .createCompiledClass(
153+ " TestFoo" ,
154+ tempDir,
155+ " " ,
156+ null ,
157+ Seq .empty,
158+ Seq .empty,
159+ testCodeBody)
160+
161+ // Initialize the static variable flag in TestFoo when loading plugin at the first time.
162+ // If the plugin is reloaded, the TestFoo.flag will be set to false by default.
163+ val executorPluginCodeBody =
164+ s """
165+ |@Override
166+ |public void init(
167+ | org.apache.spark.api.plugin.PluginContext ctx,
168+ | java.util.Map<String, String> extraConf) {
169+ | TestFoo.flag = true;
170+ |}
171+ """ .stripMargin
172+
173+ val thisClassPath =
174+ sys.props(" java.class.path" ).split(File .pathSeparator).map(p => new File (p).toURI.toURL)
175+
176+ val compiledExecutorPlugin = TestUtils .createCompiledClass(
177+ " TestExecutorPlugin" ,
178+ tempDir,
179+ " " ,
180+ null ,
181+ Seq (tempDir.toURI.toURL) ++ thisClassPath,
182+ Seq (" org.apache.spark.api.plugin.ExecutorPlugin" ),
183+ executorPluginCodeBody)
184+
185+ val sparkPluginCodeBody =
186+ """
187+ |@Override
188+ |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
189+ | return new TestExecutorPlugin();
190+ |}
191+ |
192+ |@Override
193+ |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
194+ """ .stripMargin
195+
196+ val compiledSparkPlugin = TestUtils .createCompiledClass(
197+ " TestSparkPlugin" ,
198+ tempDir,
199+ " " ,
200+ null ,
201+ Seq (tempDir.toURI.toURL) ++ thisClassPath,
202+ Seq (" org.apache.spark.api.plugin.SparkPlugin" ),
203+ sparkPluginCodeBody)
204+
205+ val jarUrl = TestUtils .createJar(
206+ Seq (compiledSparkPlugin, compiledExecutorPlugin, compiledTestCode),
207+ new File (tempDir, " testplugin.jar" ))
208+
209+ def getClassLoader : MutableURLClassLoader = {
210+ val loader = new MutableURLClassLoader (new Array [URL ](0 ),
211+ Thread .currentThread.getContextClassLoader)
212+ Thread .currentThread.setContextClassLoader(loader)
213+ loader
214+ }
215+ // SparkContext does not add plugin jars specified by `spark.jars` configuration
216+ // to the classpath, causing ClassNotFoundException when initializing plugins
217+ // in SparkContext. We manually add the jars to the ClassLoader to resolve this.
218+ val loader = getClassLoader
219+ loader.addURL(jarUrl)
220+
221+ sc = new SparkContext (new SparkConf ()
222+ .setAppName(" avoid-reloading-plugins" )
223+ .setMaster(" local-cluster[1, 1, 1024]" )
224+ .set(" spark.jars" , jarUrl.toString)
225+ .set(" spark.plugins" , " TestSparkPlugin" ))
226+
227+ val jobArtifactSet = new JobArtifactSet (
228+ Some (JobArtifactState (uuid = " avoid-reloading-plugins" , replClassDirUri = None )),
229+ jars = Map .empty,
230+ files = Map .empty,
231+ archives = Map .empty
232+ )
233+
234+ JobArtifactSet .withActiveJobArtifactState(jobArtifactSet.state.get) {
235+ sc.parallelize(1 to 1 ).foreach { _ =>
236+ val cls1 = Utils .classForName(" TestFoo" )
237+ val z = cls1.getField(" flag" ).getBoolean(null )
238+ // If the plugin has been reloaded, the TestFoo.flag will be false.
239+ if (! z) {
240+ throw new RuntimeException (" The spark plugin is reloaded" )
241+ }
242+ }
243+ }
244+ }
139245}
0 commit comments