Skip to content

Commit da8c59b

Browse files
skontoMarcelo Vanzin
authored andcommitted
[SPARK-12559][SPARK SUBMIT] fix --packages for stand-alone cluster mode
Fixes --packages flag for the stand-alone case in cluster mode. Adds to the driver classpath the jars that are resolved via ivy along with any other jars passed to `spark.jars`. Jars not resolved by ivy are downloaded explicitly to a tmp folder on the driver node. Similar code is available in SparkSubmit so we refactored part of it to use it at the DriverWrapper class which is responsible for launching driver in standalone cluster mode. Note: In stand-alone mode `spark.jars` contains the user jar so it can be fetched later on at the executor side. Manually by submitting a driver in cluster mode within a standalone cluster and checking if dependencies were resolved at the driver side. Author: Stavros Kontopoulos <[email protected]> Closes apache#18630 from skonto/fix_packages_stand_alone_cluster.
1 parent 7f16c69 commit da8c59b

File tree

3 files changed

+137
-37
lines changed

3 files changed

+137
-37
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import java.io.File
21+
import java.nio.file.Files
22+
23+
import scala.collection.mutable.HashMap
24+
25+
import org.apache.commons.io.FileUtils
26+
import org.apache.commons.lang3.StringUtils
27+
import org.apache.hadoop.conf.Configuration
28+
29+
import org.apache.spark.util.MutableURLClassLoader
30+
31+
private[deploy] object DependencyUtils {
32+
33+
def resolveMavenDependencies(
34+
packagesExclusions: String,
35+
packages: String,
36+
repositories: String,
37+
ivyRepoPath: String): String = {
38+
val exclusions: Seq[String] =
39+
if (!StringUtils.isBlank(packagesExclusions)) {
40+
packagesExclusions.split(",")
41+
} else {
42+
Nil
43+
}
44+
// Create the IvySettings, either load from file or build defaults
45+
val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile =>
46+
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), Option(ivyRepoPath))
47+
}.getOrElse {
48+
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
49+
}
50+
51+
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
52+
}
53+
54+
def createTempDir(): File = {
55+
val targetDir = Files.createTempDirectory("tmp").toFile
56+
// scalastyle:off runtimeaddshutdownhook
57+
Runtime.getRuntime.addShutdownHook(new Thread() {
58+
override def run(): Unit = {
59+
FileUtils.deleteQuietly(targetDir)
60+
}
61+
})
62+
// scalastyle:on runtimeaddshutdownhook
63+
targetDir
64+
}
65+
66+
def resolveAndDownloadJars(jars: String, userJar: String): String = {
67+
val targetDir = DependencyUtils.createTempDir()
68+
val hadoopConf = new Configuration()
69+
val sparkProperties = new HashMap[String, String]()
70+
val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore",
71+
"spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
72+
"spark.ssl.fs.protocol", "spark.ssl.protocol")
73+
74+
securityProperties.foreach { pName =>
75+
sys.props.get(pName).foreach { pValue =>
76+
sparkProperties.put(pName, pValue)
77+
}
78+
}
79+
80+
Option(jars)
81+
.map {
82+
SparkSubmit.resolveGlobPaths(_, hadoopConf)
83+
.split(",")
84+
.filterNot(_.contains(userJar.split("/").last))
85+
.mkString(",")
86+
}
87+
.filterNot(_ == "")
88+
.map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf))
89+
.orNull
90+
}
91+
92+
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
93+
if (jars != null) {
94+
for (jar <- jars.split(",")) {
95+
SparkSubmit.addJarToClasspath(jar, loader)
96+
}
97+
}
98+
}
99+
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.deploy
2020
import java.io._
2121
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
2222
import java.net.URL
23-
import java.nio.file.Files
2423
import java.security.{KeyStore, PrivilegedExceptionAction}
2524
import java.security.cert.X509Certificate
2625
import java.text.ParseException
@@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
3130
import scala.util.Properties
3231

3332
import com.google.common.io.ByteStreams
34-
import org.apache.commons.io.FileUtils
3533
import org.apache.commons.lang3.StringUtils
3634
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
3735
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -300,28 +298,13 @@ object SparkSubmit extends CommandLineUtils {
300298
}
301299
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
302300
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
301+
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
303302

304-
if (!isMesosCluster) {
303+
if (!isMesosCluster && !isStandAloneCluster) {
305304
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
306305
// too for packages that include Python code
307-
val exclusions: Seq[String] =
308-
if (!StringUtils.isBlank(args.packagesExclusions)) {
309-
args.packagesExclusions.split(",")
310-
} else {
311-
Nil
312-
}
313-
314-
// Create the IvySettings, either load from file or build defaults
315-
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
316-
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
317-
Option(args.ivyRepoPath))
318-
}.getOrElse {
319-
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
320-
}
321-
322-
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
323-
ivySettings, exclusions = exclusions)
324-
306+
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
307+
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath)
325308

326309
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
327310
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
@@ -338,14 +321,7 @@ object SparkSubmit extends CommandLineUtils {
338321
}
339322

340323
val hadoopConf = new HadoopConfiguration()
341-
val targetDir = Files.createTempDirectory("tmp").toFile
342-
// scalastyle:off runtimeaddshutdownhook
343-
Runtime.getRuntime.addShutdownHook(new Thread() {
344-
override def run(): Unit = {
345-
FileUtils.deleteQuietly(targetDir)
346-
}
347-
})
348-
// scalastyle:on runtimeaddshutdownhook
324+
val targetDir = DependencyUtils.createTempDir()
349325

350326
// Resolve glob path for different resources.
351327
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
@@ -473,11 +449,13 @@ object SparkSubmit extends CommandLineUtils {
473449
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
474450
sysProp = "spark.driver.extraLibraryPath"),
475451

476-
// Mesos only - propagate attributes for dependency resolution at the driver side
477-
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
478-
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
479-
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
480-
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
452+
// Propagate attributes for dependency resolution at the driver side
453+
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"),
454+
OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
455+
sysProp = "spark.jars.repositories"),
456+
OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
457+
OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
458+
CLUSTER, sysProp = "spark.jars.excludes"),
481459

482460
// Yarn only
483461
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
@@ -780,7 +758,7 @@ object SparkSubmit extends CommandLineUtils {
780758
}
781759
}
782760

783-
private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
761+
private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
784762
val uri = Utils.resolveURI(localJar)
785763
uri.getScheme match {
786764
case "file" | "local" =>
@@ -845,7 +823,7 @@ object SparkSubmit extends CommandLineUtils {
845823
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
846824
* no files, into a single comma-separated string.
847825
*/
848-
private def mergeFileLists(lists: String*): String = {
826+
private[deploy] def mergeFileLists(lists: String*): String = {
849827
val merged = lists.filterNot(StringUtils.isBlank)
850828
.flatMap(_.split(","))
851829
.mkString(",")
@@ -968,7 +946,7 @@ object SparkSubmit extends CommandLineUtils {
968946
}
969947
}
970948

971-
private def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
949+
private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
972950
require(paths != null, "paths cannot be null.")
973951
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
974952
val uri = Utils.resolveURI(path)

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker
1919

2020
import java.io.File
2121

22+
import org.apache.commons.lang3.StringUtils
23+
2224
import org.apache.spark.{SecurityManager, SparkConf}
25+
import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
2326
import org.apache.spark.rpc.RpcEnv
2427
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
2528

@@ -51,6 +54,7 @@ object DriverWrapper {
5154
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
5255
}
5356
Thread.currentThread.setContextClassLoader(loader)
57+
setupDependencies(loader, userJar)
5458

5559
// Delegate to supplied main class
5660
val clazz = Utils.classForName(mainClass)
@@ -66,4 +70,23 @@ object DriverWrapper {
6670
System.exit(-1)
6771
}
6872
}
73+
74+
private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
75+
val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
76+
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
77+
.map(sys.props.get(_).orNull)
78+
79+
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
80+
packages, repositories, ivyRepoPath)
81+
val jars = {
82+
val jarsProp = sys.props.get("spark.jars").orNull
83+
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
84+
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
85+
} else {
86+
jarsProp
87+
}
88+
}
89+
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
90+
DependencyUtils.addJarsToClassPath(localJars, loader)
91+
}
6992
}

0 commit comments

Comments
 (0)