18
18
package org .apache .spark .deploy
19
19
20
20
import java .io ._
21
- import java .lang .reflect .{InvocationTargetException , UndeclaredThrowableException }
21
+ import java .lang .reflect .{InvocationTargetException , Modifier , UndeclaredThrowableException }
22
22
import java .net .URL
23
23
import java .security .PrivilegedExceptionAction
24
24
import java .text .ParseException
25
25
26
26
import scala .annotation .tailrec
27
- import scala .collection .mutable .ArrayBuffer
27
+ import scala .collection .mutable .{ ArrayBuffer , HashMap , Map }
28
28
import scala .util .{Properties , Try }
29
29
30
30
import org .apache .commons .lang3 .StringUtils
@@ -99,7 +99,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
99
99
private [deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf [RestSubmissionClientApp ].getName()
100
100
private [deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf [ClientApp ].getName()
101
101
private [deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
102
- " org.apache.spark.deploy.k8s.submit.Client "
102
+ " org.apache.spark.deploy.k8s.submit.KubernetesClientApplication "
103
103
104
104
// scalastyle:off println
105
105
private [spark] def printVersionAndExit (): Unit = {
@@ -310,10 +310,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
310
310
311
311
// Fail fast, the following modes are not supported or applicable
312
312
(clusterManager, deployMode) match {
313
- case (KUBERNETES , CLIENT ) =>
314
- printErrorAndExit(" Client mode is currently not supported for Kubernetes." )
315
- case (KUBERNETES , CLUSTER ) if args.isR =>
316
- printErrorAndExit(" Kubernetes does not currently support R applications." )
317
313
case (STANDALONE , CLUSTER ) if args.isPython =>
318
314
printErrorAndExit(" Cluster deploy mode is currently not supported for python " +
319
315
" applications on standalone clusters." )
@@ -324,6 +320,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
324
320
printErrorAndExit(" Python applications are currently not supported for Kubernetes." )
325
321
case (KUBERNETES , _) if args.isR =>
326
322
printErrorAndExit(" R applications are currently not supported for Kubernetes." )
323
+ case (KUBERNETES , CLIENT ) =>
324
+ printErrorAndExit(" Client mode is currently not supported for Kubernetes." )
327
325
case (LOCAL , CLUSTER ) =>
328
326
printErrorAndExit(" Cluster deploy mode is not compatible with master \" local\" " )
329
327
case (_, CLUSTER ) if isShell(args.primaryResource) =>
@@ -343,8 +341,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
343
341
}
344
342
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
345
343
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
346
- val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
347
344
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
345
+ val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
348
346
349
347
if (! isMesosCluster && ! isStandAloneCluster) {
350
348
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
@@ -579,9 +577,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
579
577
OptionAssigner (args.principal, YARN , ALL_DEPLOY_MODES , confKey = " spark.yarn.principal" ),
580
578
OptionAssigner (args.keytab, YARN , ALL_DEPLOY_MODES , confKey = " spark.yarn.keytab" ),
581
579
582
- OptionAssigner (args.kubernetesNamespace, KUBERNETES , ALL_DEPLOY_MODES ,
583
- confKey = " spark.kubernetes.namespace" ),
584
-
585
580
// Other options
586
581
OptionAssigner (args.executorCores, STANDALONE | YARN | KUBERNETES , ALL_DEPLOY_MODES ,
587
582
confKey = " spark.executor.cores" ),
@@ -649,9 +644,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
649
644
650
645
// Add the application jar automatically so the user doesn't have to call sc.addJar
651
646
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
652
- // In Kubernetes cluster mode, the jar will be uploaded by the client separately.
653
647
// For python and R files, the primary resource is already distributed as a regular file
654
- if (! isYarnCluster && ! isKubernetesCluster && ! args.isPython && ! args.isR) {
648
+ if (! isYarnCluster && ! args.isPython && ! args.isR) {
655
649
var jars = sparkConf.getOption(" spark.jars" ).map(x => x.split(" ," ).toSeq).getOrElse(Seq .empty)
656
650
if (isUserJar(args.primaryResource)) {
657
651
jars = jars ++ Seq (args.primaryResource)
@@ -733,21 +727,14 @@ object SparkSubmit extends CommandLineUtils with Logging {
733
727
734
728
if (isKubernetesCluster) {
735
729
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
736
- if (args.isPython) {
737
- childArgs ++= Array (" --primary-py-file" , args.primaryResource)
738
- childArgs ++= Array (" --main-class" , " org.apache.spark.deploy.PythonRunner" )
739
- if (args.pyFiles != null ) {
740
- childArgs ++= Array (" --other-py-files" , args.pyFiles)
741
- }
742
- } else {
743
- if (args.primaryResource != SparkLauncher .NO_RESOURCE ) {
744
- childArgs ++= Array (" --primary-java-resource" , args.primaryResource)
745
- }
746
- childArgs ++= Array (" --main-class" , args.mainClass)
730
+ if (args.primaryResource != SparkLauncher .NO_RESOURCE ) {
731
+ childArgs ++= Array (" --primary-java-resource" , args.primaryResource)
747
732
}
748
- args.childArgs.foreach { arg =>
749
- childArgs += " --arg"
750
- childArgs += arg
733
+ childArgs ++= Array (" --main-class" , args.mainClass)
734
+ if (args.childArgs != null ) {
735
+ args.childArgs.foreach { arg =>
736
+ childArgs += (" --arg" , arg)
737
+ }
751
738
}
752
739
}
753
740
0 commit comments