Skip to content

Commit 3cb8204

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process launcher.
The current in-process launcher implementation just calls the SparkSubmit object, which, in case of errors, will more often than not exit the JVM. This is not desirable since this launcher is meant to be used inside other applications, and that would kill the application. The change turns SparkSubmit into a class, and abstracts aways some of the functionality used to print error messages and abort the submission process. The default implementation uses the logging system for messages, and throws exceptions for errors. As part of that I also moved some code that doesn't really belong in SparkSubmit to a better location. The command line invocation of spark-submit now uses a special implementation of the SparkSubmit class that overrides those behaviors to do what is expected from the command line version (print to the terminal, exit the JVM, etc). A lot of the changes are to replace calls to methods such as "printErrorAndExit" with the new API. As part of adding tests for this, I had to fix some small things in the launcher option parser so that things like "--version" can work when used in the launcher library. There is still code that prints directly to the terminal, like all the Ivy-related code in SparkSubmitUtils, and other areas where some re-factoring would help, like the CommandLineUtils class, but I chose to leave those alone to keep this change more focused. Aside from existing and added unit tests, I ran command line tools with a bunch of different arguments to make sure messages and errors behave like before. Author: Marcelo Vanzin <[email protected]> Closes apache#20925 from vanzin/SPARK-22941.
1 parent 653fe02 commit 3cb8204

File tree

14 files changed

+401
-292
lines changed

14 files changed

+401
-292
lines changed

core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{FileSystem, Path}
2626

2727
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
28+
import org.apache.spark.internal.Logging
2829
import org.apache.spark.util.{MutableURLClassLoader, Utils}
2930

30-
private[deploy] object DependencyUtils {
31+
private[deploy] object DependencyUtils extends Logging {
3132

3233
def resolveMavenDependencies(
3334
packagesExclusions: String,
@@ -75,7 +76,7 @@ private[deploy] object DependencyUtils {
7576
def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
7677
if (jars != null) {
7778
for (jar <- jars.split(",")) {
78-
SparkSubmit.addJarToClasspath(jar, loader)
79+
addJarToClasspath(jar, loader)
7980
}
8081
}
8182
}
@@ -151,6 +152,31 @@ private[deploy] object DependencyUtils {
151152
}.mkString(",")
152153
}
153154

155+
def addJarToClasspath(localJar: String, loader: MutableURLClassLoader): Unit = {
156+
val uri = Utils.resolveURI(localJar)
157+
uri.getScheme match {
158+
case "file" | "local" =>
159+
val file = new File(uri.getPath)
160+
if (file.exists()) {
161+
loader.addURL(file.toURI.toURL)
162+
} else {
163+
logWarning(s"Local jar $file does not exist, skipping.")
164+
}
165+
case _ =>
166+
logWarning(s"Skip remote jar $uri.")
167+
}
168+
}
169+
170+
/**
171+
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
172+
* no files, into a single comma-separated string.
173+
*/
174+
def mergeFileLists(lists: String*): String = {
175+
val merged = lists.filterNot(StringUtils.isBlank)
176+
.flatMap(Utils.stringToSeq)
177+
if (merged.nonEmpty) merged.mkString(",") else null
178+
}
179+
154180
private def splitOnFragment(path: String): (URI, Option[String]) = {
155181
val uri = Utils.resolveURI(path)
156182
val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 161 additions & 157 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
2929
import scala.io.Source
3030
import scala.util.Try
3131

32+
import org.apache.spark.{SparkException, SparkUserAppException}
3233
import org.apache.spark.deploy.SparkSubmitAction._
34+
import org.apache.spark.internal.Logging
3335
import org.apache.spark.launcher.SparkSubmitArgumentsParser
3436
import org.apache.spark.network.util.JavaUtils
3537
import org.apache.spark.util.Utils
@@ -40,7 +42,7 @@ import org.apache.spark.util.Utils
4042
* The env argument is used for testing.
4143
*/
4244
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
43-
extends SparkSubmitArgumentsParser {
45+
extends SparkSubmitArgumentsParser with Logging {
4446
var master: String = null
4547
var deployMode: String = null
4648
var executorMemory: String = null
@@ -85,8 +87,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8587
/** Default properties present in the currently defined defaults file. */
8688
lazy val defaultSparkProperties: HashMap[String, String] = {
8789
val defaultProperties = new HashMap[String, String]()
88-
// scalastyle:off println
89-
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
90+
if (verbose) {
91+
logInfo(s"Using properties file: $propertiesFile")
92+
}
9093
Option(propertiesFile).foreach { filename =>
9194
val properties = Utils.getPropertiesFromFile(filename)
9295
properties.foreach { case (k, v) =>
@@ -95,21 +98,16 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
9598
// Property files may contain sensitive information, so redact before printing
9699
if (verbose) {
97100
Utils.redact(properties).foreach { case (k, v) =>
98-
SparkSubmit.printStream.println(s"Adding default property: $k=$v")
101+
logInfo(s"Adding default property: $k=$v")
99102
}
100103
}
101104
}
102-
// scalastyle:on println
103105
defaultProperties
104106
}
105107

106108
// Set parameters from command line arguments
107-
try {
108-
parse(args.asJava)
109-
} catch {
110-
case e: IllegalArgumentException =>
111-
SparkSubmit.printErrorAndExit(e.getMessage())
112-
}
109+
parse(args.asJava)
110+
113111
// Populate `sparkProperties` map from properties file
114112
mergeDefaultSparkProperties()
115113
// Remove keys that don't start with "spark." from `sparkProperties`.
@@ -141,7 +139,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
141139
sparkProperties.foreach { case (k, v) =>
142140
if (!k.startsWith("spark.")) {
143141
sparkProperties -= k
144-
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
142+
logWarning(s"Ignoring non-spark config property: $k=$v")
145143
}
146144
}
147145
}
@@ -215,10 +213,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
215213
}
216214
} catch {
217215
case _: Exception =>
218-
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
216+
error(s"Cannot load main class from JAR $primaryResource")
219217
}
220218
case _ =>
221-
SparkSubmit.printErrorAndExit(
219+
error(
222220
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
223221
"Please specify a class through --class.")
224222
}
@@ -248,6 +246,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
248246
case SUBMIT => validateSubmitArguments()
249247
case KILL => validateKillArguments()
250248
case REQUEST_STATUS => validateStatusRequestArguments()
249+
case PRINT_VERSION =>
251250
}
252251
}
253252

@@ -256,62 +255,61 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
256255
printUsageAndExit(-1)
257256
}
258257
if (primaryResource == null) {
259-
SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python or R file)")
258+
error("Must specify a primary resource (JAR or Python or R file)")
260259
}
261260
if (mainClass == null && SparkSubmit.isUserJar(primaryResource)) {
262-
SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class")
261+
error("No main class set in JAR; please specify one with --class")
263262
}
264263
if (driverMemory != null
265264
&& Try(JavaUtils.byteStringAsBytes(driverMemory)).getOrElse(-1L) <= 0) {
266-
SparkSubmit.printErrorAndExit("Driver Memory must be a positive number")
265+
error("Driver memory must be a positive number")
267266
}
268267
if (executorMemory != null
269268
&& Try(JavaUtils.byteStringAsBytes(executorMemory)).getOrElse(-1L) <= 0) {
270-
SparkSubmit.printErrorAndExit("Executor Memory cores must be a positive number")
269+
error("Executor memory must be a positive number")
271270
}
272271
if (executorCores != null && Try(executorCores.toInt).getOrElse(-1) <= 0) {
273-
SparkSubmit.printErrorAndExit("Executor cores must be a positive number")
272+
error("Executor cores must be a positive number")
274273
}
275274
if (totalExecutorCores != null && Try(totalExecutorCores.toInt).getOrElse(-1) <= 0) {
276-
SparkSubmit.printErrorAndExit("Total executor cores must be a positive number")
275+
error("Total executor cores must be a positive number")
277276
}
278277
if (numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
279-
SparkSubmit.printErrorAndExit("Number of executors must be a positive number")
278+
error("Number of executors must be a positive number")
280279
}
281280
if (pyFiles != null && !isPython) {
282-
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
281+
error("--py-files given but primary resource is not a Python script")
283282
}
284283

285284
if (master.startsWith("yarn")) {
286285
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
287286
if (!hasHadoopEnv && !Utils.isTesting) {
288-
throw new Exception(s"When running with master '$master' " +
287+
error(s"When running with master '$master' " +
289288
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
290289
}
291290
}
292291

293292
if (proxyUser != null && principal != null) {
294-
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
293+
error("Only one of --proxy-user or --principal can be provided.")
295294
}
296295
}
297296

298297
private def validateKillArguments(): Unit = {
299298
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
300-
SparkSubmit.printErrorAndExit(
301-
"Killing submissions is only supported in standalone or Mesos mode!")
299+
error("Killing submissions is only supported in standalone or Mesos mode!")
302300
}
303301
if (submissionToKill == null) {
304-
SparkSubmit.printErrorAndExit("Please specify a submission to kill.")
302+
error("Please specify a submission to kill.")
305303
}
306304
}
307305

308306
private def validateStatusRequestArguments(): Unit = {
309307
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
310-
SparkSubmit.printErrorAndExit(
308+
error(
311309
"Requesting submission statuses is only supported in standalone or Mesos mode!")
312310
}
313311
if (submissionToRequestStatusFor == null) {
314-
SparkSubmit.printErrorAndExit("Please specify a submission to request status for.")
312+
error("Please specify a submission to request status for.")
315313
}
316314
}
317315

@@ -368,7 +366,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
368366

369367
case DEPLOY_MODE =>
370368
if (value != "client" && value != "cluster") {
371-
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
369+
error("--deploy-mode must be either \"client\" or \"cluster\"")
372370
}
373371
deployMode = value
374372

@@ -405,14 +403,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
405403
case KILL_SUBMISSION =>
406404
submissionToKill = value
407405
if (action != null) {
408-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $KILL.")
406+
error(s"Action cannot be both $action and $KILL.")
409407
}
410408
action = KILL
411409

412410
case STATUS =>
413411
submissionToRequestStatusFor = value
414412
if (action != null) {
415-
SparkSubmit.printErrorAndExit(s"Action cannot be both $action and $REQUEST_STATUS.")
413+
error(s"Action cannot be both $action and $REQUEST_STATUS.")
416414
}
417415
action = REQUEST_STATUS
418416

@@ -444,7 +442,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
444442
repositories = value
445443

446444
case CONF =>
447-
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
445+
val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value)
448446
sparkProperties(confName) = confValue
449447

450448
case PROXY_USER =>
@@ -463,15 +461,15 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
463461
verbose = true
464462

465463
case VERSION =>
466-
SparkSubmit.printVersionAndExit()
464+
action = SparkSubmitAction.PRINT_VERSION
467465

468466
case USAGE_ERROR =>
469467
printUsageAndExit(1)
470468

471469
case _ =>
472-
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
470+
error(s"Unexpected argument '$opt'.")
473471
}
474-
true
472+
action != SparkSubmitAction.PRINT_VERSION
475473
}
476474

477475
/**
@@ -482,7 +480,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
482480
*/
483481
override protected def handleUnknown(opt: String): Boolean = {
484482
if (opt.startsWith("-")) {
485-
SparkSubmit.printErrorAndExit(s"Unrecognized option '$opt'.")
483+
error(s"Unrecognized option '$opt'.")
486484
}
487485

488486
primaryResource =
@@ -501,20 +499,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
501499
}
502500

503501
private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
504-
// scalastyle:off println
505-
val outStream = SparkSubmit.printStream
506502
if (unknownParam != null) {
507-
outStream.println("Unknown/unsupported param " + unknownParam)
503+
logInfo("Unknown/unsupported param " + unknownParam)
508504
}
509505
val command = sys.env.get("_SPARK_CMD_USAGE").getOrElse(
510506
"""Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
511507
|Usage: spark-submit --kill [submission ID] --master [spark://...]
512508
|Usage: spark-submit --status [submission ID] --master [spark://...]
513509
|Usage: spark-submit run-example [options] example-class [example args]""".stripMargin)
514-
outStream.println(command)
510+
logInfo(command)
515511

516512
val mem_mb = Utils.DEFAULT_DRIVER_MEM_MB
517-
outStream.println(
513+
logInfo(
518514
s"""
519515
|Options:
520516
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
@@ -596,12 +592,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
596592
)
597593

598594
if (SparkSubmit.isSqlShell(mainClass)) {
599-
outStream.println("CLI options:")
600-
outStream.println(getSqlShellOptions())
595+
logInfo("CLI options:")
596+
logInfo(getSqlShellOptions())
601597
}
602-
// scalastyle:on println
603598

604-
SparkSubmit.exitFn(exitCode)
599+
throw new SparkUserAppException(exitCode)
605600
}
606601

607602
/**
@@ -655,4 +650,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
655650
System.setErr(currentErr)
656651
}
657652
}
653+
654+
private def error(msg: String): Unit = throw new SparkException(msg)
655+
658656
}

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
2525
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.rpc.RpcEnv
28-
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
28+
import org.apache.spark.util._
2929

3030
/**
3131
* Utility object for launching driver programs such that they share fate with the Worker process.
@@ -93,7 +93,7 @@ object DriverWrapper extends Logging {
9393
val jars = {
9494
val jarsProp = sys.props.get("spark.jars").orNull
9595
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
96-
SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
96+
DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates)
9797
} else {
9898
jarsProp
9999
}

core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,14 @@ private[spark] trait CommandLineUtils {
3333
private[spark] var printStream: PrintStream = System.err
3434

3535
// scalastyle:off println
36-
37-
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
36+
private[spark] def printMessage(str: String): Unit = printStream.println(str)
37+
// scalastyle:on println
3838

3939
private[spark] def printErrorAndExit(str: String): Unit = {
40-
printStream.println("Error: " + str)
41-
printStream.println("Run with --help for usage help or --verbose for debug output")
40+
printMessage("Error: " + str)
41+
printMessage("Run with --help for usage help or --verbose for debug output")
4242
exitFn(1)
4343
}
4444

45-
// scalastyle:on println
46-
47-
private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
48-
pair.split("=", 2).toSeq match {
49-
case Seq(k, v) => (k, v)
50-
case _ => printErrorAndExit(s"Spark config without '=': $pair")
51-
throw new SparkException(s"Spark config without '=': $pair")
52-
}
53-
}
54-
5545
def main(args: Array[String]): Unit
5646
}

0 commit comments

Comments
 (0)