Skip to content

Commit 1813c4a

Browse files
jerryshaoMarcelo Vanzin
authored andcommitted
[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode
## What changes were proposed in this pull request? With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it. ## How was this patch tested? This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode. Author: jerryshao <[email protected]> Closes apache#18962 from jerryshao/SPARK-21714.
1 parent 1f24cee commit 1813c4a

File tree

5 files changed

+114
-49
lines changed

5 files changed

+114
-49
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,20 @@ object SparkSubmit extends CommandLineUtils {
212212

213213
/**
214214
* Prepare the environment for submitting an application.
215-
* This returns a 4-tuple:
216-
* (1) the arguments for the child process,
217-
* (2) a list of classpath entries for the child,
218-
* (3) a map of system properties, and
219-
* (4) the main class for the child
215+
*
216+
* @param args the parsed SparkSubmitArguments used for environment preparation.
217+
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
218+
* @return a 4-tuple:
219+
* (1) the arguments for the child process,
220+
* (2) a list of classpath entries for the child,
221+
* (3) a map of system properties, and
222+
* (4) the main class for the child
223+
*
220224
* Exposed for testing.
221225
*/
222-
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
226+
private[deploy] def prepareSubmitEnvironment(
227+
args: SparkSubmitArguments,
228+
conf: Option[HadoopConfiguration] = None)
223229
: (Seq[String], Seq[String], Map[String, String], String) = {
224230
// Return values
225231
val childArgs = new ArrayBuffer[String]()
@@ -322,7 +328,7 @@ object SparkSubmit extends CommandLineUtils {
322328
}
323329
}
324330

325-
val hadoopConf = new HadoopConfiguration()
331+
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
326332
val targetDir = DependencyUtils.createTempDir()
327333

328334
// Resolve glob path for different resources.
@@ -332,19 +338,21 @@ object SparkSubmit extends CommandLineUtils {
332338
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
333339

334340
// In client mode, download remote files.
341+
var localPrimaryResource: String = null
342+
var localJars: String = null
343+
var localPyFiles: String = null
335344
if (deployMode == CLIENT) {
336-
args.primaryResource = Option(args.primaryResource).map {
345+
localPrimaryResource = Option(args.primaryResource).map {
337346
downloadFile(_, targetDir, args.sparkProperties, hadoopConf)
338347
}.orNull
339-
args.jars = Option(args.jars).map {
348+
localJars = Option(args.jars).map {
340349
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
341350
}.orNull
342-
args.pyFiles = Option(args.pyFiles).map {
351+
localPyFiles = Option(args.pyFiles).map {
343352
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
344353
}.orNull
345354
}
346355

347-
348356
// If we're running a python app, set the main class to our specific python runner
349357
if (args.isPython && deployMode == CLIENT) {
350358
if (args.primaryResource == PYSPARK_SHELL) {
@@ -353,7 +361,7 @@ object SparkSubmit extends CommandLineUtils {
353361
// If a python file is provided, add it to the child arguments and list of files to deploy.
354362
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
355363
args.mainClass = "org.apache.spark.deploy.PythonRunner"
356-
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
364+
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
357365
if (clusterManager != YARN) {
358366
// The YARN backend distributes the primary file differently, so don't merge it.
359367
args.files = mergeFileLists(args.files, args.primaryResource)
@@ -363,8 +371,8 @@ object SparkSubmit extends CommandLineUtils {
363371
// The YARN backend handles python files differently, so don't merge the lists.
364372
args.files = mergeFileLists(args.files, args.pyFiles)
365373
}
366-
if (args.pyFiles != null) {
367-
sysProps("spark.submit.pyFiles") = args.pyFiles
374+
if (localPyFiles != null) {
375+
sysProps("spark.submit.pyFiles") = localPyFiles
368376
}
369377
}
370378

@@ -418,7 +426,7 @@ object SparkSubmit extends CommandLineUtils {
418426
// If an R file is provided, add it to the child arguments and list of files to deploy.
419427
// Usage: RRunner <main R file> [app arguments]
420428
args.mainClass = "org.apache.spark.deploy.RRunner"
421-
args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
429+
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
422430
args.files = mergeFileLists(args.files, args.primaryResource)
423431
}
424432
}
@@ -463,6 +471,7 @@ object SparkSubmit extends CommandLineUtils {
463471
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
464472
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
465473
sysProp = "spark.executor.instances"),
474+
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
466475
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
467476
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
468477
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
@@ -486,15 +495,28 @@ object SparkSubmit extends CommandLineUtils {
486495
sysProp = "spark.driver.cores"),
487496
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
488497
sysProp = "spark.driver.supervise"),
489-
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
498+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
499+
500+
// An internal option used only for spark-shell to add user jars to repl's classloader,
501+
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
502+
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
503+
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
490504
)
491505

492506
// In client mode, launch the application main class directly
493507
// In addition, add the main application jar and any added jars (if any) to the classpath
494-
// Also add the main application jar and any added jars to classpath in case YARN client
495-
// requires these jars.
496-
if (deployMode == CLIENT || isYarnCluster) {
508+
if (deployMode == CLIENT) {
497509
childMainClass = args.mainClass
510+
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
511+
childClasspath += localPrimaryResource
512+
}
513+
if (localJars != null) { childClasspath ++= localJars.split(",") }
514+
}
515+
// Add the main application jar and any added jars to classpath in case YARN client
516+
// requires these jars.
517+
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
518+
// added to the classpath of YARN client.
519+
if (isYarnCluster) {
498520
if (isUserJar(args.primaryResource)) {
499521
childClasspath += args.primaryResource
500522
}
@@ -551,10 +573,6 @@ object SparkSubmit extends CommandLineUtils {
551573
if (args.isPython) {
552574
sysProps.put("spark.yarn.isPython", "true")
553575
}
554-
555-
if (args.pyFiles != null) {
556-
sysProps("spark.submit.pyFiles") = args.pyFiles
557-
}
558576
}
559577

560578
// assure a keytab is available from any place in a JVM

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ package object config {
8787
.intConf
8888
.createOptional
8989

90-
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
90+
private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
9191
.internal()
9292
.stringConf
9393
.toSequence

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2601,18 +2601,23 @@ private[spark] object Utils extends Logging {
26012601
}
26022602

26032603
/**
2604-
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
2605-
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
2606-
* only the "spark.jars" property.
2604+
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
2605+
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
2606+
* has its own mechanism to distribute jars.
26072607
*/
2608-
def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
2608+
def getUserJars(conf: SparkConf): Seq[String] = {
26092609
val sparkJars = conf.getOption("spark.jars")
2610-
if (conf.get("spark.master") == "yarn" && isShell) {
2611-
val yarnJars = conf.getOption("spark.yarn.dist.jars")
2612-
unionFileLists(sparkJars, yarnJars).toSeq
2613-
} else {
2614-
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
2615-
}
2610+
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
2611+
}
2612+
2613+
/**
2614+
* Return the local jar files which will be added to REPL's classpath. These jar files are
2615+
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
2616+
* SparkSubmit at first.
2617+
*/
2618+
def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
2619+
val localJars = conf.getOption("spark.repl.local.jars")
2620+
localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
26162621
}
26172622

26182623
private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.io.Source
2929
import com.google.common.io.ByteStreams
3030
import org.apache.commons.io.FileUtils
3131
import org.apache.hadoop.conf.Configuration
32-
import org.apache.hadoop.fs.Path
32+
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
3333
import org.scalatest.{BeforeAndAfterEach, Matchers}
3434
import org.scalatest.concurrent.Timeouts
3535
import org.scalatest.time.SpanSugar._
@@ -762,7 +762,7 @@ class SparkSubmitSuite
762762
(Set(jar1.toURI.toString, jar2.toURI.toString))
763763
sysProps("spark.yarn.dist.files").split(",").toSet should be
764764
(Set(file1.toURI.toString, file2.toURI.toString))
765-
sysProps("spark.submit.pyFiles").split(",").toSet should be
765+
sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be
766766
(Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
767767
sysProps("spark.yarn.dist.archives").split(",").toSet should be
768768
(Set(archive1.toURI.toString, archive2.toURI.toString))
@@ -802,10 +802,7 @@ class SparkSubmitSuite
802802
test("downloadFile - file doesn't exist") {
803803
val hadoopConf = new Configuration()
804804
val tmpDir = Utils.createTempDir()
805-
// Set s3a implementation to local file system for testing.
806-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
807-
// Disable file system impl cache to make sure the test file system is picked up.
808-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
805+
updateConfWithFakeS3Fs(hadoopConf)
809806
intercept[FileNotFoundException] {
810807
SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf)
811808
}
@@ -826,10 +823,7 @@ class SparkSubmitSuite
826823
FileUtils.write(jarFile, content)
827824
val hadoopConf = new Configuration()
828825
val tmpDir = Files.createTempDirectory("tmp").toFile
829-
// Set s3a implementation to local file system for testing.
830-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
831-
// Disable file system impl cache to make sure the test file system is picked up.
832-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
826+
updateConfWithFakeS3Fs(hadoopConf)
833827
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
834828
val outputPath =
835829
SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf)
@@ -844,10 +838,7 @@ class SparkSubmitSuite
844838
FileUtils.write(jarFile, content)
845839
val hadoopConf = new Configuration()
846840
val tmpDir = Files.createTempDirectory("tmp").toFile
847-
// Set s3a implementation to local file system for testing.
848-
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
849-
// Disable file system impl cache to make sure the test file system is picked up.
850-
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
841+
updateConfWithFakeS3Fs(hadoopConf)
851842
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
852843
val outputPaths = SparkSubmit.downloadFileList(
853844
sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",")
@@ -859,6 +850,43 @@ class SparkSubmitSuite
859850
}
860851
}
861852

853+
test("Avoid re-upload remote resources in yarn client mode") {
854+
val hadoopConf = new Configuration()
855+
updateConfWithFakeS3Fs(hadoopConf)
856+
857+
val tmpDir = Utils.createTempDir()
858+
val file = File.createTempFile("tmpFile", "", tmpDir)
859+
val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)
860+
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
861+
val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
862+
val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}"
863+
864+
val args = Seq(
865+
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
866+
"--name", "testApp",
867+
"--master", "yarn",
868+
"--deploy-mode", "client",
869+
"--jars", tmpJarPath,
870+
"--files", s"s3a://${file.getAbsolutePath}",
871+
"--py-files", s"s3a://${pyFile.getAbsolutePath}",
872+
s"s3a://$mainResource"
873+
)
874+
875+
val appArgs = new SparkSubmitArguments(args)
876+
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3
877+
878+
// All the resources should still be remote paths, so that YARN client will not upload again.
879+
sysProps("spark.yarn.dist.jars") should be (tmpJarPath)
880+
sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}")
881+
sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}")
882+
883+
// Local repl jars should be a local path.
884+
sysProps("spark.repl.local.jars") should (startWith("file:"))
885+
886+
// local py files should not be a URI format.
887+
sysProps("spark.submit.pyFiles") should (startWith("/"))
888+
}
889+
862890
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
863891
private def runSparkSubmit(args: Seq[String]): Unit = {
864892
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -898,6 +926,11 @@ class SparkSubmitSuite
898926
Utils.deleteRecursively(tmpDir)
899927
}
900928
}
929+
930+
private def updateConfWithFakeS3Fs(conf: Configuration): Unit = {
931+
conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName)
932+
conf.set("fs.s3a.impl.disable.cache", "true")
933+
}
901934
}
902935

903936
object JarCreationTest extends Logging {
@@ -967,4 +1000,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
9671000
// Ignore the scheme for testing.
9681001
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
9691002
}
1003+
1004+
override def globStatus(pathPattern: Path): Array[FileStatus] = {
1005+
val newPath = new Path(pathPattern.toUri.getPath)
1006+
super.globStatus(newPath).map { status =>
1007+
val path = s"s3a://${status.getPath.toUri.getPath}"
1008+
status.setPath(new Path(path))
1009+
status
1010+
}
1011+
}
9701012
}

repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ object Main extends Logging {
5757
// Visible for testing
5858
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
5959
interp = _interp
60-
val jars = Utils.getUserJars(conf, isShell = true)
60+
val jars = Utils.getLocalUserJarsForShell(conf)
6161
// Remove file:///, file:// or file:/ scheme if exists for each jar
6262
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
6363
.mkString(File.pathSeparator)

0 commit comments

Comments
 (0)