Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 8319432

Browse files
jerryshaocloud-fan
authored andcommitted
[SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode
## What changes were proposed in this pull request? In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with: ``` java.io.IOException: No FileSystem for scheme: http at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354) at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173) ``` This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`. ## How was this patch tested? Unit test added, also verified in local cluster. Author: jerryshao <[email protected]> Closes apache#19130 from jerryshao/SPARK-21917.
1 parent 581200a commit 8319432

File tree

6 files changed

+143
-4
lines changed

6 files changed

+143
-4
lines changed

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[deploy] object DependencyUtils {
9494
hadoopConf: Configuration,
9595
secMgr: SecurityManager): String = {
9696
require(fileList != null, "fileList cannot be null.")
97-
fileList.split(",")
97+
Utils.stringToSeq(fileList)
9898
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
9999
.mkString(",")
100100
}
@@ -121,6 +121,11 @@ private[deploy] object DependencyUtils {
121121

122122
uri.getScheme match {
123123
case "file" | "local" => path
124+
case "http" | "https" | "ftp" if Utils.isTesting =>
125+
// This is only used for SparkSubmitSuite unit test. Instead of downloading file remotely,
126+
// return a dummy local path instead.
127+
val file = new File(uri.getPath)
128+
new File(targetDir, file.getName).toURI.toString
124129
case _ =>
125130
val fname = new Path(uri).getName()
126131
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
@@ -131,7 +136,7 @@ private[deploy] object DependencyUtils {
131136

132137
def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
133138
require(paths != null, "paths cannot be null.")
134-
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
139+
Utils.stringToSeq(paths).flatMap { path =>
135140
val uri = Utils.resolveURI(path)
136141
uri.getScheme match {
137142
case "local" | "http" | "https" | "ftp" => Array(path)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import java.text.ParseException
2525

2626
import scala.annotation.tailrec
2727
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
28-
import scala.util.Properties
28+
import scala.util.{Properties, Try}
2929

3030
import org.apache.commons.lang3.StringUtils
3131
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
32-
import org.apache.hadoop.fs.Path
32+
import org.apache.hadoop.fs.{FileSystem, Path}
3333
import org.apache.hadoop.security.UserGroupInformation
3434
import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.ivy.Ivy
@@ -48,6 +48,7 @@ import org.apache.spark._
4848
import org.apache.spark.api.r.RUtils
4949
import org.apache.spark.deploy.rest._
5050
import org.apache.spark.internal.Logging
51+
import org.apache.spark.internal.config._
5152
import org.apache.spark.launcher.SparkLauncher
5253
import org.apache.spark.util._
5354

@@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging {
367368
}.orNull
368369
}
369370

371+
// When running in YARN, for some remote resources with scheme:
372+
// 1. Hadoop FileSystem doesn't support them.
373+
// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
374+
// We will download them to local disk prior to add to YARN's distributed cache.
375+
// For yarn client mode, since we already download them with above code, so we only need to
376+
// figure out the local path and replace the remote one.
377+
if (clusterManager == YARN) {
378+
sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
379+
val secMgr = new SecurityManager(sparkConf)
380+
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
381+
382+
def shouldDownload(scheme: String): Boolean = {
383+
forceDownloadSchemes.contains(scheme) ||
384+
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
385+
}
386+
387+
def downloadResource(resource: String): String = {
388+
val uri = Utils.resolveURI(resource)
389+
uri.getScheme match {
390+
case "local" | "file" => resource
391+
case e if shouldDownload(e) =>
392+
val file = new File(targetDir, new Path(uri).getName)
393+
if (file.exists()) {
394+
file.toURI.toString
395+
} else {
396+
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
397+
}
398+
case _ => uri.toString
399+
}
400+
}
401+
402+
args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
403+
args.files = Option(args.files).map { files =>
404+
Utils.stringToSeq(files).map(downloadResource).mkString(",")
405+
}.orNull
406+
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
407+
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
408+
}.orNull
409+
args.jars = Option(args.jars).map { jars =>
410+
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
411+
}.orNull
412+
args.archives = Option(args.archives).map { archives =>
413+
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
414+
}.orNull
415+
}
416+
370417
// If we're running a python app, set the main class to our specific python runner
371418
if (args.isPython && deployMode == CLIENT) {
372419
if (args.primaryResource == PYSPARK_SHELL) {

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,4 +400,14 @@ package object config {
400400
.doc("Memory to request as a multiple of the size that used to unroll the block.")
401401
.doubleConf
402402
.createWithDefault(1.5)
403+
404+
private[spark] val FORCE_DOWNLOAD_SCHEMES =
405+
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
406+
.doc("Comma-separated list of schemes for which files will be downloaded to the " +
407+
"local disk prior to being added to YARN's distributed cache. For use in cases " +
408+
"where the YARN service does not support schemes that are supported by Spark, like http, " +
409+
"https and ftp.")
410+
.stringConf
411+
.toSequence
412+
.createWithDefault(Nil)
403413
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2684,6 +2684,9 @@ private[spark] object Utils extends Logging {
26842684
redact(redactionPattern, kvs.toArray)
26852685
}
26862686

2687+
def stringToSeq(str: String): Seq[String] = {
2688+
str.split(",").map(_.trim()).filter(_.nonEmpty)
2689+
}
26872690
}
26882691

26892692
private[util] object CallerContext extends Logging {

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,71 @@ class SparkSubmitSuite
897897
sysProps("spark.submit.pyFiles") should (startWith("/"))
898898
}
899899

900+
test("download remote resource if it is not supported by yarn service") {
901+
testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false)
902+
}
903+
904+
test("avoid downloading remote resource if it is supported by yarn service") {
905+
testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true)
906+
}
907+
908+
test("force download from blacklisted schemes") {
909+
testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true)
910+
}
911+
912+
private def testRemoteResources(isHttpSchemeBlacklisted: Boolean,
913+
supportMockHttpFs: Boolean): Unit = {
914+
val hadoopConf = new Configuration()
915+
updateConfWithFakeS3Fs(hadoopConf)
916+
if (supportMockHttpFs) {
917+
hadoopConf.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName)
918+
hadoopConf.set("fs.http.impl.disable.cache", "true")
919+
}
920+
921+
val tmpDir = Utils.createTempDir()
922+
val mainResource = File.createTempFile("tmpPy", ".py", tmpDir)
923+
val tmpS3Jar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
924+
val tmpS3JarPath = s"s3a://${new File(tmpS3Jar.toURI).getAbsolutePath}"
925+
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
926+
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"
927+
928+
val args = Seq(
929+
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
930+
"--name", "testApp",
931+
"--master", "yarn",
932+
"--deploy-mode", "client",
933+
"--jars", s"$tmpS3JarPath,$tmpHttpJarPath",
934+
s"s3a://$mainResource"
935+
) ++ (
936+
if (isHttpSchemeBlacklisted) {
937+
Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http,https")
938+
} else {
939+
Nil
940+
}
941+
)
942+
943+
val appArgs = new SparkSubmitArguments(args)
944+
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3
945+
946+
val jars = sysProps("spark.yarn.dist.jars").split(",").toSet
947+
948+
// The URI of remote S3 resource should still be remote.
949+
assert(jars.contains(tmpS3JarPath))
950+
951+
if (supportMockHttpFs) {
952+
// If Http FS is supported by yarn service, the URI of remote http resource should
953+
// still be remote.
954+
assert(jars.contains(tmpHttpJarPath))
955+
} else {
956+
// If Http FS is not supported by yarn service, or http scheme is configured to be force
957+
// downloading, the URI of remote http resource should be changed to a local one.
958+
val jarName = new File(tmpHttpJar.toURI).getName
959+
val localHttpJar = jars.filter(_.contains(jarName))
960+
localHttpJar.size should be(1)
961+
localHttpJar.head should startWith("file:")
962+
}
963+
}
964+
900965
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
901966
private def runSparkSubmit(args: Seq[String]): Unit = {
902967
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))

docs/running-on-yarn.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@ To use a custom metrics.properties for the application master and executors, upd
211211
Comma-separated list of jars to be placed in the working directory of each executor.
212212
</td>
213213
</tr>
214+
<tr>
215+
<td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
216+
<td><code>(none)</code></td>
217+
<td>
218+
Comma-separated list of schemes for which files will be downloaded to the local disk prior to
219+
being added to YARN's distributed cache. For use in cases where the YARN service does not
220+
support schemes that are supported by Spark, like http, https and ftp.
221+
</td>
222+
</tr>
214223
<tr>
215224
<td><code>spark.executor.instances</code></td>
216225
<td><code>2</code></td>

0 commit comments

Comments
 (0)