Skip to content

Commit d8257b9

Browse files
skontoMarcelo Vanzin
authored andcommitted
[SPARK-21403][MESOS] fix --packages for mesos
## What changes were proposed in this pull request? Fixes --packages flag for mesos in cluster mode. Probably I will handle standalone and Yarn in another commit, I need to investigate those cases as they are different. ## How was this patch tested? Tested with a community 1.9 dc/os cluster. packages were successfully resolved in cluster mode within a container. andrewor14 susanxhuynh ArtRand srowen pls review. Author: Stavros Kontopoulos <[email protected]> Closes apache#18587 from skonto/fix_packages_mesos_cluster.
1 parent af80e01 commit d8257b9

File tree

1 file changed

+50
-39
lines changed

1 file changed

+50
-39
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,25 @@ object SparkSubmit extends CommandLineUtils {
273273
}
274274
}
275275

276+
// Fail fast, the following modes are not supported or applicable
277+
(clusterManager, deployMode) match {
278+
case (STANDALONE, CLUSTER) if args.isPython =>
279+
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
280+
"applications on standalone clusters.")
281+
case (STANDALONE, CLUSTER) if args.isR =>
282+
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
283+
"applications on standalone clusters.")
284+
case (LOCAL, CLUSTER) =>
285+
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
286+
case (_, CLUSTER) if isShell(args.primaryResource) =>
287+
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
288+
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
289+
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
290+
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
291+
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
292+
case _ =>
293+
}
294+
276295
// Update args.deployMode if it is null. It will be passed down as a Spark property later.
277296
(args.deployMode, deployMode) match {
278297
case (null, CLIENT) => args.deployMode = "client"
@@ -282,36 +301,40 @@ object SparkSubmit extends CommandLineUtils {
282301
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
283302
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
284303

285-
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
286-
// too for packages that include Python code
287-
val exclusions: Seq[String] =
304+
if (!isMesosCluster) {
305+
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
306+
// too for packages that include Python code
307+
val exclusions: Seq[String] =
288308
if (!StringUtils.isBlank(args.packagesExclusions)) {
289309
args.packagesExclusions.split(",")
290310
} else {
291311
Nil
292312
}
293313

294-
// Create the IvySettings, either load from file or build defaults
295-
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
296-
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
297-
Option(args.ivyRepoPath))
298-
}.getOrElse {
299-
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
300-
}
314+
// Create the IvySettings, either load from file or build defaults
315+
val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile =>
316+
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories),
317+
Option(args.ivyRepoPath))
318+
}.getOrElse {
319+
SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath))
320+
}
301321

302-
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
303-
ivySettings, exclusions = exclusions)
304-
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
305-
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
306-
if (args.isPython) {
307-
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
322+
val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages,
323+
ivySettings, exclusions = exclusions)
324+
325+
326+
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
327+
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
328+
if (args.isPython) {
329+
args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
330+
}
308331
}
309-
}
310332

311-
// install any R packages that may have been passed through --jars or --packages.
312-
// Spark Packages may contain R source code inside the jar.
313-
if (args.isR && !StringUtils.isBlank(args.jars)) {
314-
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
333+
// install any R packages that may have been passed through --jars or --packages.
334+
// Spark Packages may contain R source code inside the jar.
335+
if (args.isR && !StringUtils.isBlank(args.jars)) {
336+
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
337+
}
315338
}
316339

317340
val hadoopConf = new HadoopConfiguration()
@@ -343,24 +366,6 @@ object SparkSubmit extends CommandLineUtils {
343366
}.orNull
344367
}
345368

346-
// The following modes are not supported or applicable
347-
(clusterManager, deployMode) match {
348-
case (STANDALONE, CLUSTER) if args.isPython =>
349-
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
350-
"applications on standalone clusters.")
351-
case (STANDALONE, CLUSTER) if args.isR =>
352-
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
353-
"applications on standalone clusters.")
354-
case (LOCAL, CLUSTER) =>
355-
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
356-
case (_, CLUSTER) if isShell(args.primaryResource) =>
357-
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
358-
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
359-
printErrorAndExit("Cluster deploy mode is not applicable to Spark SQL shell.")
360-
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
361-
printErrorAndExit("Cluster deploy mode is not applicable to Spark Thrift server.")
362-
case _ =>
363-
}
364369

365370
// If we're running a python app, set the main class to our specific python runner
366371
if (args.isPython && deployMode == CLIENT) {
@@ -468,6 +473,12 @@ object SparkSubmit extends CommandLineUtils {
468473
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
469474
sysProp = "spark.driver.extraLibraryPath"),
470475

476+
// Mesos only - propagate attributes for dependency resolution at the driver side
477+
OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"),
478+
OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"),
479+
OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"),
480+
OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"),
481+
471482
// Yarn only
472483
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
473484
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,

0 commit comments

Comments
 (0)