Skip to content

Commit 61a7414

Browse files
committed
Refactored tests and included modifications to pass all tests regardless of environment
1 parent e3f14e1 commit 61a7414

File tree

24 files changed

+112
-22
lines changed

24 files changed

+112
-22
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
473473

474474
// Validate spark.executor.extraJavaOptions
475475
getOption(executorOptsKey).foreach { javaOpts =>
476-
if (javaOpts.contains("-Dspark")) {
476+
if (javaOpts.contains("-Dspark") && !javaOpts.contains("-Dspark.hadoop.")) {
477477
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
478478
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
479479
throw new Exception(msg)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import java.io.File
2121
import scala.collection.JavaConverters._
2222

2323
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
24+
import org.apache.hadoop.security.UserGroupInformation
2425

2526
import org.apache.spark.deploy.kubernetes.constants._
2627
import org.apache.spark.internal.Logging
2728

28-
2929
/**
3030
* This is separated out from the HadoopConf steps API because this component can be reused to
3131
* set up the Hadoop Configuration for executors as well.
@@ -74,6 +74,10 @@ private[spark] class HadoopConfBootstrapImpl(
7474
.withName(ENV_HADOOP_CONF_DIR)
7575
.withValue(HADOOP_CONF_DIR_PATH)
7676
.endEnv()
77+
.addNewEnv()
78+
.withName(ENV_SPARK_USER)
79+
.withValue(UserGroupInformation.getCurrentUser.getShortUserName)
80+
.endEnv()
7781
.build()
7882
originalPodWithMainContainer.copy(
7983
pod = hadoopSupportedPod,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KerberosTokenConfBootstrap.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ private[spark] trait KerberosTokenBootstrapConf {
3636

3737
private[spark] class KerberosTokenConfBootstrapImpl(
3838
secretName: String,
39-
secretLabel: String) extends KerberosTokenBootstrapConf with Logging{
39+
secretLabel: String,
40+
userName: String) extends KerberosTokenBootstrapConf with Logging{
41+
4042

4143
override def bootstrapMainContainerAndVolumes(
4244
originalPodWithMainContainer: PodWithMainContainer)
@@ -62,6 +64,10 @@ private[spark] class KerberosTokenConfBootstrapImpl(
6264
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
6365
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretLabel")
6466
.endEnv()
67+
.addNewEnv()
68+
.withName(ENV_SPARK_USER)
69+
.withValue(userName)
70+
.endEnv()
6571
.build()
6672
originalPodWithMainContainer.copy(
6773
pod = dtMountedPod,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ package object constants {
6060
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
6161
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
6262
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
63+
private[spark] val ENV_EXECUTOR_JAVA_OPTS = "SPARK_EXECUTOR_JAVA_OPTS"
6364
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
6465
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
6566
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
@@ -74,6 +75,7 @@ package object constants {
7475
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7576
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
7677
private[spark] val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
78+
private[spark] val ENV_SPARK_USER = "SPARK_USER"
7779

7880
// Bootstrapping dependencies with the init-container
7981
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
@@ -113,6 +115,9 @@ package object constants {
113115
"spark.kubernetes.kerberos.labelname"
114116
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
115117
"hadoop-tokens"
118+
private[spark] val SPARK_HADOOP_PREFIX = "spark.hadoop."
119+
private[spark] val HADOOP_SECURITY_AUTHENTICATION =
120+
SPARK_HADOOP_PREFIX + "hadoop.security.authentication"
116121

117122
// Miscellaneous
118123
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ private[spark] class Client(
7878

7979
private val driverJavaOptions = submissionSparkConf.get(
8080
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
81+
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
82+
private val maybeSimpleAuthentication =
83+
if (isKerberosEnabled) s" -D$HADOOP_SECURITY_AUTHENTICATION=simple" else ""
8184

8285
/**
8386
* Run command that initalizes a DriverSpec that will be updated after each
@@ -98,7 +101,7 @@ private[spark] class Client(
98101
.getAll
99102
.map {
100103
case (confKey, confValue) => s"-D$confKey=$confValue"
101-
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
104+
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") + maybeSimpleAuthentication
102105
val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer)
103106
.addNewEnv()
104107
.withName(ENV_DRIVER_JAVA_OPTS)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ private[spark] class HadoopKerberosKeytabResolverStep(
112112
.build()
113113
val bootstrapKerberos = new KerberosTokenConfBootstrapImpl(
114114
HADOOP_KERBEROS_SECRET_NAME,
115-
initialTokenLabelName)
115+
initialTokenLabelName,
116+
jobUserUGI.getShortUserName)
116117
val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes(
117118
PodWithMainContainer(
118119
hadoopConfigSpec.driverPod,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
1818

19+
import org.apache.hadoop.security.UserGroupInformation
20+
1921
import org.apache.spark.SparkConf
2022
import org.apache.spark.deploy.kubernetes.{KerberosTokenConfBootstrapImpl, PodWithMainContainer}
2123
import org.apache.spark.deploy.kubernetes.constants._
@@ -34,7 +36,8 @@ private[spark] class HadoopKerberosSecretResolverStep(
3436
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
3537
val bootstrapKerberos = new KerberosTokenConfBootstrapImpl(
3638
tokenSecretName,
37-
tokenLabelName)
39+
tokenLabelName,
40+
UserGroupInformation.getCurrentUser.getShortUserName)
3841
val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes(
3942
PodWithMainContainer(
4043
hadoopConfigSpec.driverPod,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121
import org.apache.spark.SparkConf
2222
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, OptionRequirements}
2323
import org.apache.spark.deploy.kubernetes.config._
24+
import org.apache.spark.internal.Logging
2425

2526

2627
/**
@@ -30,15 +31,16 @@ private[spark] class HadoopStepsOrchestrator(
3031
namespace: String,
3132
hadoopConfigMapName: String,
3233
submissionSparkConf: SparkConf,
33-
hadoopConfDir: String) {
34-
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
35-
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
36-
private val maybeKeytab = submissionSparkConf.get(KUBERNETES_KERBEROS_KEYTAB)
37-
.map(k => new File(k))
38-
private val maybeExistingSecret = submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
39-
private val maybeExistingSecretLabel =
40-
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_LABEL)
41-
private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
34+
hadoopConfDir: String) extends Logging{
35+
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
36+
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
37+
private val maybeKeytab = submissionSparkConf.get(KUBERNETES_KERBEROS_KEYTAB)
38+
.map(k => new File(k))
39+
private val maybeExistingSecret = submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
40+
private val maybeExistingSecretLabel =
41+
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_LABEL)
42+
private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
43+
logInfo(s"Hadoop Conf directory: $hadoopConfDir")
4244

4345
require(maybeKeytab.forall( _ => isKerberosEnabled ),
4446
"You must enable Kerberos support if you are specifying a Kerberos Keytab")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.deploy.kubernetes.config._
2525
import org.apache.spark.deploy.kubernetes.constants._
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
28+
import org.apache.spark.util.Utils
2829

2930
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
3031
import org.apache.spark.SparkContext
@@ -91,7 +92,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
9192
} yield {
9293
new KerberosTokenConfBootstrapImpl(
9394
secretName,
94-
secretLabel)
95+
secretLabel,
96+
Utils.getCurrentUserName)
9597
}
9698
if (maybeConfigMap.isEmpty) {
9799
logWarning("The executor's init-container config map was not specified. Executors will" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
7272
private val executorsToRemove = Collections.newSetFromMap[String](
7373
new ConcurrentHashMap[String, java.lang.Boolean]()).asScala
7474

75+
private val executorExtraJavaOpts = conf.get(
76+
org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
7577
private val executorExtraClasspath = conf.get(
7678
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
7779
private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
78-
80+
private val isKerberosEnabled = conf.get(KUBERNETES_KERBEROS_SUPPORT)
81+
private val maybeSimpleAuthentication =
82+
if (isKerberosEnabled) s" -D$HADOOP_SECURITY_AUTHENTICATION=simple" else ""
7983
private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
8084
conf,
8185
KUBERNETES_EXECUTOR_LABEL_PREFIX,
@@ -451,6 +455,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
451455
val executorCpuQuantity = new QuantityBuilder(false)
452456
.withAmount(executorCores.toString)
453457
.build()
458+
val executorJavaOpts = executorExtraJavaOpts.getOrElse("") + maybeSimpleAuthentication
459+
val executorJavaOptsEnv = if (executorJavaOpts.nonEmpty) {
460+
Some(new EnvVarBuilder()
461+
.withName(ENV_EXECUTOR_JAVA_OPTS)
462+
.withValue(executorJavaOpts)
463+
.build()) } else None
454464
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
455465
new EnvVarBuilder()
456466
.withName(ENV_EXECUTOR_EXTRA_CLASSPATH)
@@ -499,6 +509,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
499509
.endResources()
500510
.addAllToEnv(requiredEnv.asJava)
501511
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
512+
.addToEnv(executorJavaOptsEnv.toSeq: _*)
502513
.withPorts(requiredPorts.asJava)
503514
.build()
504515

0 commit comments

Comments
 (0)