diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index edf5b9c3912a0..9ab8d1b479885 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3e9096d681642..1825a02f4e86c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -743,6 +743,61 @@ from the other deployment modes. See the [configuration page](configuration.html
spark.kubernetes.kerberos.enabled
spark.kubernetes.kerberos.keytab
spark.kubernetes.kerberos.enabled
to be true. This will let you specify
+ the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
+ may login by running kinit
before running the spark-submit, and the submission client
+ will look within your local TGT cache to resolve this.
+ spark.kubernetes.kerberos.principal
spark.kubernetes.kerberos.enabled
to be true. This will let you specify
+ your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
+ may login by running kinit
before running the spark-submit, and the submission client
+ will look within your local TGT cache to resolve this.
+ spark.kubernetes.kerberos.rewewer.principal
spark.kubernetes.kerberos.enabled
to be true. This will let you specify
+ the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
+ we will set the principal to be the job users principal by default.
+ spark.kubernetes.kerberos.tokensecret.name
spark.kubernetes.kerberos.enabled
to be true. This will let you specify
+ the name of the secret where your existing delegation token data is stored. You must also specify the
+ item key spark.kubernetes.kerberos.tokensecret.itemkey
where your data is stored on the secret.
+ This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
+ created.
+ spark.kubernetes.kerberos.tokensecret.itemkey
spark.kubernetes.kerberos.enabled
to be true. This will let you specify
+ the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
+ We have a default value of spark.kubernetes.kerberos.tokensecret.itemkey
should you not include it. But
+ you should always include this if you are proposing a pre-existing secret contain the delegation token data.
spark.executorEnv.[EnvironmentVariableName]
+ * - What Spark properties should be set on the driver's SparkConf for the executors
+ * - The spec of the main container so that it can be modified to share volumes
+ * - The spec of the driver pod EXCEPT for the addition of the given hadoop configs (e.g. volumes
+ * the hadoop logic needs)
+ * - The properties that will be stored into the config map which have (key, value)
+ * pairs of (path, data)
+ * - The secret containing a DT, either previously specified or built on the fly
+ * - The name of the secret where the DT will be stored
+ * - The data item-key on the secret which correlates with where the current DT data is stored
+ */
+private[spark] case class HadoopConfigSpec(
+ additionalDriverSparkConf: Map[String, String],
+ driverPod: Pod,
+ driverContainer: Container,
+ configMapProperties: Map[String, String],
+ dtSecret: Option[Secret],
+ dtSecretName: String,
+ dtSecretItemKey: String)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala
new file mode 100644
index 0000000000000..b08b180ce8531
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+ /**
+ * Represents a step in preparing the driver with Hadoop Configuration logic.
+ */
+private[spark] trait HadoopConfigurationStep {
+
+ def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala
new file mode 100644
index 0000000000000..1bf7f36821478
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import java.io._
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.SecretBuilder
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.{HadoopUGIUtil, KerberosTokenConfBootstrapImpl, PodWithMainContainer}
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.internal.Logging
+
+ /**
+ * This step does all the heavy lifting for Delegation Token logic. This step
+ * assumes that the job user has either specified a principal and keytab or ran
+ * $kinit before running spark-submit. With a TGT stored locally, by running
+ * UGI.getCurrentUser you are able to obtain the current user, alternatively
+ * you can run UGI.loginUserFromKeytabAndReturnUGI and by running .doAs run
+ * as the logged into user instead of the current user. With the Job User principal
+ * you then retrieve the delegation token from the NameNode and store values in
+ * DelegationToken. Lastly, the class puts the data into a secret. All this is
+ * appended to the current HadoopSpec which in turn will append to the current
+ * DriverSpec.
+ */
+private[spark] class HadoopKerberosKeytabResolverStep(
+ submissionSparkConf: SparkConf,
+ maybePrincipal: Option[String],
+ maybeKeytab: Option[File],
+ maybeRenewerPrincipal: Option[String],
+ hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging{
+ private var originalCredentials: Credentials = _
+ private var dfs : FileSystem = _
+ private var renewer: String = _
+ private var credentials: Credentials = _
+ private var tokens: Iterable[Token[_ <: TokenIdentifier]] = _
+
+ override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
+ logDebug(s"Hadoop Configuration: ${hadoopConf.toString}")
+ if (hadoopUGI.isSecurityEnabled) logDebug("Hadoop not configured with Kerberos")
+ val maybeJobUserUGI =
+ for {
+ principal <- maybePrincipal
+ keytab <- maybeKeytab
+ } yield {
+ // Not necessary with [Spark-16742]
+ // Reliant on [Spark-20328] for changing to YARN principal
+ submissionSparkConf.set("spark.yarn.principal", principal)
+ submissionSparkConf.set("spark.yarn.keytab", keytab.toURI.toString)
+ logDebug("Logged into KDC with keytab using Job User UGI")
+ hadoopUGI.loginUserFromKeytabAndReturnUGI(
+ principal,
+ keytab.toURI.toString)
+ }
+ // In the case that keytab is not specified we will read from Local Ticket Cache
+ val jobUserUGI = maybeJobUserUGI.getOrElse(hadoopUGI.getCurrentUser)
+ // It is necessary to run as jobUserUGI because logged in user != Current User
+ jobUserUGI.doAs(new PrivilegedExceptionAction[Void] {
+ override def run(): Void = {
+ logDebug(s"Retrieved Job User UGI: $jobUserUGI")
+ originalCredentials = jobUserUGI.getCredentials
+ logDebug(s"Original tokens: ${originalCredentials.toString}")
+ logDebug(s"All tokens: ${originalCredentials.getAllTokens}")
+ logDebug(s"All secret keys: ${originalCredentials.getAllSecretKeys}")
+ // TODO: This is not necessary with [Spark-20328] since we would be using
+ // Spark core providers to handle delegation token renewal
+ renewer = maybeRenewerPrincipal.getOrElse(jobUserUGI.getShortUserName)
+ logDebug(s"Renewer is: $renewer")
+ credentials = new Credentials(originalCredentials)
+ hadoopUGI.dfsAddDelegationToken(hadoopConf, renewer, credentials)
+ tokens = credentials.getAllTokens.asScala
+ logDebug(s"Tokens: ${credentials.toString}")
+ logDebug(s"All tokens: ${tokens.mkString(",")}")
+ logDebug(s"All secret keys: ${credentials.getAllSecretKeys}")
+ null
+ }})
+ if (tokens.isEmpty) logDebug("Did not obtain any Delegation Tokens")
+ val data = hadoopUGI.serialize(credentials)
+ val renewalInterval =
+ hadoopUGI.getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue)
+ val currentTime: Long = hadoopUGI.getCurrentTime
+ val initialTokenDataKeyName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalInterval"
+ val uniqueSecretName = s"$HADOOP_KERBEROS_SECRET_NAME.$currentTime"
+ val secretDT =
+ new SecretBuilder()
+ .withNewMetadata()
+ .withName(uniqueSecretName)
+ .withLabels(Map("refresh-hadoop-tokens" -> "yes").asJava)
+ .endMetadata()
+ .addToData(initialTokenDataKeyName, Base64.encodeBase64String(data))
+ .build()
+ val bootstrapKerberos = new KerberosTokenConfBootstrapImpl(
+ uniqueSecretName,
+ initialTokenDataKeyName,
+ jobUserUGI.getShortUserName)
+ val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes(
+ PodWithMainContainer(
+ hadoopConfigSpec.driverPod,
+ hadoopConfigSpec.driverContainer))
+ hadoopConfigSpec.copy(
+ additionalDriverSparkConf =
+ hadoopConfigSpec.additionalDriverSparkConf ++ Map(
+ HADOOP_KERBEROS_CONF_ITEM_KEY -> initialTokenDataKeyName,
+ HADOOP_KERBEROS_CONF_SECRET -> uniqueSecretName),
+ driverPod = withKerberosEnvPod.pod,
+ driverContainer = withKerberosEnvPod.mainContainer,
+ dtSecret = Some(secretDT),
+ dtSecretName = uniqueSecretName,
+ dtSecretItemKey = initialTokenDataKeyName)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala
new file mode 100644
index 0000000000000..9d60f932bc736
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{KerberosTokenConfBootstrapImpl, PodWithMainContainer}
+import org.apache.spark.deploy.k8s.constants._
+
+ /**
+ * This step assumes that you have already done all the heavy lifting in retrieving a
+ * delegation token and storing the following data in a secret before running this job.
+ * This step requires that you just specify the secret name and data item-key corresponding
+ * to the data where the delegation token is stored.
+ */
+private[spark] class HadoopKerberosSecretResolverStep(
+ submissionSparkConf: SparkConf,
+ tokenSecretName: String,
+ tokenItemKeyName: String) extends HadoopConfigurationStep {
+
+ override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
+ val bootstrapKerberos = new KerberosTokenConfBootstrapImpl(
+ tokenSecretName,
+ tokenItemKeyName,
+ UserGroupInformation.getCurrentUser.getShortUserName)
+ val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes(
+ PodWithMainContainer(
+ hadoopConfigSpec.driverPod,
+ hadoopConfigSpec.driverContainer))
+ hadoopConfigSpec.copy(
+ driverPod = withKerberosEnvPod.pod,
+ driverContainer = withKerberosEnvPod.mainContainer,
+ additionalDriverSparkConf =
+ hadoopConfigSpec.additionalDriverSparkConf ++ Map(
+ HADOOP_KERBEROS_CONF_ITEM_KEY -> tokenItemKeyName,
+ HADOOP_KERBEROS_CONF_SECRET -> tokenSecretName),
+ dtSecret = None,
+ dtSecretName = tokenSecretName,
+ dtSecretItemKey = tokenItemKeyName)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala
new file mode 100644
index 0000000000000..8f052662ba600
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import java.io.File
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtil, OptionRequirements}
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.internal.Logging
+
+ /**
+ * Returns the complete ordered list of steps required to configure the hadoop configurations.
+ */
+private[spark] class HadoopStepsOrchestrator(
+ namespace: String,
+ hadoopConfigMapName: String,
+ submissionSparkConf: SparkConf,
+ hadoopConfDir: String) extends Logging{
+ private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
+ private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
+ private val maybeKeytab = submissionSparkConf.get(KUBERNETES_KERBEROS_KEYTAB)
+ .map(k => new File(k))
+ private val maybeExistingSecret = submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
+ private val maybeExistingSecretItemKey =
+ submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
+ private val maybeRenewerPrincipal =
+ submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL)
+ private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
+ private val hadoopUGI = new HadoopUGIUtil
+ logInfo(s"Hadoop Conf directory: $hadoopConfDir")
+
+ require(maybeKeytab.forall( _ => isKerberosEnabled ),
+ "You must enable Kerberos support if you are specifying a Kerberos Keytab")
+
+ require(maybeExistingSecret.forall( _ => isKerberosEnabled ),
+ "You must enable Kerberos support if you are specifying a Kerberos Secret")
+
+ OptionRequirements.requireBothOrNeitherDefined(
+ maybeKeytab,
+ maybePrincipal,
+ "If a Kerberos keytab is specified you must also specify a Kerberos principal",
+ "If a Kerberos principal is specified you must also specify a Kerberos keytab")
+
+ OptionRequirements.requireBothOrNeitherDefined(
+ maybeExistingSecret,
+ maybeExistingSecretItemKey,
+ "If a secret storing a Kerberos Delegation Token is specified you must also" +
+ " specify the label where the data is stored",
+ "If a secret data item-key where the data of the Kerberos Delegation Token is specified" +
+ " you must also specify the name of the secret")
+
+ def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
+ val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
+ hadoopConfigMapName,
+ hadoopConfigurationFiles,
+ hadoopUGI)
+ val hadoopConfMounterStep = new HadoopConfMounterStep(
+ hadoopConfigMapName,
+ hadoopConfigurationFiles,
+ hadoopConfBootstrapImpl,
+ hadoopConfDir)
+ val maybeKerberosStep =
+ if (isKerberosEnabled) {
+ maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep(
+ submissionSparkConf,
+ existingSecretName,
+ maybeExistingSecretItemKey.get))).getOrElse(Some(
+ new HadoopKerberosKeytabResolverStep(
+ submissionSparkConf,
+ maybePrincipal,
+ maybeKeytab,
+ maybeRenewerPrincipal,
+ hadoopUGI)))
+ } else {
+ Option.empty[HadoopConfigurationStep]
+ }
+ Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq
+ }
+
+ private def getHadoopConfFiles(path: String) : Seq[File] = {
+ def isFile(file: File) = if (file.isFile) Some(file) else None
+ val dir = new File(path)
+ if (dir.isDirectory) {
+ dir.listFiles.flatMap { file => isFile(file) }.toSeq
+ } else {
+ Seq.empty[File]
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
index 6f4ba1c8b888f..dc8e28ae2c169 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
+import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap}
@@ -46,7 +46,9 @@ private[spark] class ExecutorPodFactoryImpl(
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
- shuffleManager: Option[KubernetesExternalShuffleManager])
+ shuffleManager: Option[KubernetesExternalShuffleManager],
+ hadoopBootStrap: Option[HadoopConfBootstrap],
+ kerberosBootstrap: Option[KerberosTokenConfBootstrap])
extends ExecutorPodFactory {
import ExecutorPodFactoryImpl._
@@ -55,6 +57,9 @@ private[spark] class ExecutorPodFactoryImpl(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+ private val isKerberosEnabled = sparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
+ private val maybeSimpleAuthentication =
+ if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX,
@@ -136,15 +141,19 @@ private[spark] class ExecutorPodFactoryImpl(
.withValue(cp)
.build()
}
- val executorExtraJavaOptionsEnv = sparkConf
- .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
- .map { opts =>
- val delimitedOpts = Utils.splitCommandString(opts)
- delimitedOpts.zipWithIndex.map {
- case (opt, index) =>
- new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
- }
- }.getOrElse(Seq.empty[EnvVar])
+ val executorExtraJavaOptions = (
+ sparkConf.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
+ ++ maybeSimpleAuthentication).mkString(" ") match {
+ case "" => None
+ case str => Some(str)
+ }
+ val executorExtraJavaOptionsEnv = executorExtraJavaOptions.map { opts =>
+ val delimitedOpts = Utils.splitCommandString(opts)
+ delimitedOpts.zipWithIndex.map {
+ case (opt, index) =>
+ new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+ }
+ }.getOrElse(Seq.empty[EnvVar])
val executorEnv = (Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
@@ -262,9 +271,24 @@ private[spark] class ExecutorPodFactoryImpl(
val executorPodWithNodeAffinity =
nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful(
executorPodWithInitContainer, nodeToLocalTaskCount)
- new PodBuilder(executorPodWithNodeAffinity)
+ val (executorHadoopConfPod, executorHadoopConfContainer) =
+ hadoopBootStrap.map { bootstrap =>
+ val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes(
+ PodWithMainContainer(executorPodWithNodeAffinity, initBootstrappedExecutorContainer)
+ )
+ (podWithMainContainer.pod, podWithMainContainer.mainContainer)
+ }.getOrElse(executorPodWithNodeAffinity, initBootstrappedExecutorContainer)
+
+ val (executorKerberosPod, executorKerberosContainer) =
+ kerberosBootstrap.map { bootstrap =>
+ val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes(
+ PodWithMainContainer(executorHadoopConfPod, executorHadoopConfContainer))
+ (podWithMainContainer.pod, podWithMainContainer.mainContainer)
+ }.getOrElse((executorHadoopConfPod, executorHadoopConfContainer))
+ val resolvedExecutorPod = new PodBuilder(executorKerberosPod)
+ new PodBuilder(resolvedExecutorPod)
.editSpec()
- .addToContainers(initBootstrappedExecutorContainer)
+ .addToContainers(executorKerberosContainer)
.endSpec()
.build()
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index cd92df439a7e6..4f67a6f5fc0a7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import org.apache.spark.SparkContext
-import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
+import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopUGIUtil, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, Tas
import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
-
override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
@@ -44,6 +43,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler)
: SchedulerBackend = {
val sparkConf = sc.getConf
+ val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME)
+ val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC)
+ val maybeDTSecretName = sparkConf.getOption(HADOOP_KERBEROS_CONF_SECRET)
+ val maybeDTDataItem = sparkConf.getOption(HADOOP_KERBEROS_CONF_ITEM_KEY)
val maybeInitContainerConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP)
val maybeInitContainerConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY)
val maybeSubmittedFilesSecret = sparkConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET)
@@ -80,7 +83,27 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
configMap,
configMapKey)
}
-
+ val hadoopBootStrap = for {
+ hadoopConfigMap <- maybeHadoopConfigMap
+ } yield {
+ val hadoopUtil = new HadoopUGIUtil
+ val hadoopConfigurations = maybeHadoopConfDir.map(
+ conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File])
+ new HadoopConfBootstrapImpl(
+ hadoopConfigMap,
+ hadoopConfigurations,
+ hadoopUtil
+ )
+ }
+ val kerberosBootstrap = for {
+ secretName <- maybeDTSecretName
+ secretItemKey <- maybeDTDataItem
+ } yield {
+ new KerberosTokenConfBootstrapImpl(
+ secretName,
+ secretItemKey,
+ Utils.getCurrentUserName)
+ }
val mountSmallFilesBootstrap = for {
secretName <- maybeSubmittedFilesSecret
secretMountPath <- maybeSubmittedFilesSecretMountPath
@@ -104,7 +127,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
logWarning("The executor's init-container config map key was not specified. Executors will" +
" therefore not attempt to fetch remote or submitted dependencies.")
}
-
+ if (maybeHadoopConfigMap.isEmpty) {
+ logWarning("The executor's hadoop config map key was not specified. Executors will" +
+ " therefore not attempt to fetch hadoop configuration files.")
+ }
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
@@ -112,7 +138,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
-
val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) {
val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl(
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
@@ -131,7 +156,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
mountSmallFilesBootstrap,
executorInitContainerBootstrap,
executorInitContainerSecretVolumePlugin,
- kubernetesShuffleManager)
+ kubernetesShuffleManager,
+ hadoopBootStrap,
+ kerberosBootstrap)
val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
@@ -149,4 +176,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
+ private def getHadoopConfFiles(path: String) : Array[File] = {
+ def isFile(file: File) = if (file.isFile) Some(file) else None
+ val dir = new File(path)
+ if (dir.isDirectory) {
+ dir.listFiles.flatMap { file => isFile(file) }
+ } else {
+ Array.empty[File]
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index d30c88fcc74bf..2348dd52a5a77 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -121,7 +121,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
- totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
+ totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
@@ -208,7 +208,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.watch(new ExecutorPodsWatcher()))
allocatorExecutor.scheduleWithFixedDelay(
- allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
+ allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
shuffleManager.foreach(_.start(applicationId()))
if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -486,4 +486,3 @@ private object KubernetesClusterSchedulerBackend {
" Consider boosting spark executor memory overhead."
}
}
-
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ShufflePodCache.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ShufflePodCache.scala
new file mode 100644
index 0000000000000..9efeee71dca5c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ShufflePodCache.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.internal.readiness.Readiness
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+
+private[spark] class ShufflePodCache (
+ client: KubernetesClient,
+ dsNamespace: String,
+ dsLabels: Map[String, String]) extends Logging {
+
+ private var shufflePodCache = scala.collection.mutable.Map[String, String]()
+ private var watcher: Watch = _
+
+ def start(): Unit = {
+ // seed the initial cache.
+ val pods = client.pods()
+ .inNamespace(dsNamespace).withLabels(dsLabels.asJava).list()
+ pods.getItems.asScala.foreach {
+ pod =>
+ if (Readiness.isReady(pod)) {
+ addShufflePodToCache(pod)
+ } else {
+ logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " +
+ s"on node ${pod.getSpec.getNodeName}")
+ }
+ }
+
+ watcher = client
+ .pods()
+ .inNamespace(dsNamespace)
+ .withLabels(dsLabels.asJava)
+ .watch(new Watcher[Pod] {
+ override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
+ action match {
+ case Action.DELETED | Action.ERROR =>
+ shufflePodCache.remove(p.getSpec.getNodeName)
+ case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) =>
+ addShufflePodToCache(p)
+ }
+ }
+ override def onClose(e: KubernetesClientException): Unit = {}
+ })
+ }
+
+ private def addShufflePodToCache(pod: Pod): Unit = {
+ if (shufflePodCache.contains(pod.getSpec.getNodeName)) {
+ val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get
+ logError(s"Ambiguous specification of shuffle service pod. " +
+ s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
+ s"${registeredPodName} on ${pod.getSpec.getNodeName}")
+
+ throw new SparkException(s"Ambiguous specification of shuffle service pod. " +
+ s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
+ s"${registeredPodName} on ${pod.getSpec.getNodeName}")
+ } else {
+ shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
+ }
+ }
+
+ def stop(): Unit = {
+ watcher.close()
+ }
+
+ def getShufflePodForExecutor(executorNode: String): String = {
+ shufflePodCache.get(executorNode)
+ .getOrElse(throw new SparkException(s"Unable to find shuffle pod on node $executorNode"))
+ }
+}
+
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/InitContainerResourceStagingServerSecretPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/InitContainerResourceStagingServerSecretPluginSuite.scala
index 12c0dc4a06013..2117a25e27fd4 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/InitContainerResourceStagingServerSecretPluginSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/InitContainerResourceStagingServerSecretPluginSuite.scala
@@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.SparkFunSuite
-class InitContainerResourceStagingServerSecretPluginSuite extends SparkFunSuite with BeforeAndAfter{
+class InitContainerResourceStagingServerSecretPluginSuite
+ extends SparkFunSuite with BeforeAndAfter{
private val INIT_CONTAINER_SECRET_NAME = "init-secret"
private val INIT_CONTAINER_SECRET_MOUNT = "/tmp/secret"
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 041f51e912002..b36f1e4ff6fe4 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -16,23 +16,25 @@
*/
package org.apache.spark.deploy.k8s.submit
+import scala.collection.JavaConverters._
+
import com.google.common.collect.Iterables
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
+import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
-import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable, PodResource, Resource}
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
-import org.mockito.Mockito.{doReturn, verify, when}
import org.mockito.invocation.InvocationOnMock
+import org.mockito.Mockito.{doReturn, verify, when}
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar._
-import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec}
-class ClientSuite extends SparkFunSuite with BeforeAndAfter {
+private[spark] class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val DRIVER_POD_UID = "pod-id"
private val DRIVER_POD_API_VERSION = "v1"
@@ -136,6 +138,10 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
.set(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS,
"-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails")
+ .set(
+ KUBERNETES_KERBEROS_SUPPORT,
+ true
+ )
val submissionClient = new Client(
submissionSteps,
sparkConf,
@@ -150,14 +156,16 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env =>
env.getName.startsWith(ENV_JAVA_OPT_PREFIX)
}.sortBy(_.getName)
- assert(driverJvmOptsEnvs.size === 4)
+ assert(driverJvmOptsEnvs.size === 6)
val expectedJvmOptsValues = Seq(
+ "-Dspark.kubernetes.kerberos.enabled=true",
"-Dspark.logConf=true",
s"-D${SecondTestConfigurationStep.sparkConfKey}=" +
s"${SecondTestConfigurationStep.sparkConfValue}",
s"-XX:+HeapDumpOnOutOfMemoryError",
- s"-XX:+PrintGCDetails")
+ s"-XX:+PrintGCDetails",
+ "-Dspark.hadoop.hadoop.security.authentication=simple")
driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach {
case ((resolvedEnv, expectedJvmOpt), index) =>
assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index")
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
index 6f5d5e571c443..6a15f490c9f2f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.config._
-import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
+import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
@@ -46,6 +46,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
+ None,
sparkConf)
validateStepTypes(
orchestrator,
@@ -69,6 +70,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
+ None,
sparkConf)
validateStepTypes(
orchestrator,
@@ -91,6 +93,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
+ None,
sparkConf)
validateStepTypes(
orchestrator,
@@ -113,6 +116,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
+ None,
sparkConf)
validateStepTypes(
orchestrator,
@@ -137,6 +141,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
MAIN_CLASS,
APP_ARGS,
ADDITIONAL_PYTHON_FILES,
+ None,
sparkConf)
validateStepTypes(
orchestrator,
@@ -146,12 +151,37 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
classOf[DependencyResolutionStep],
classOf[MountSecretsStep])
}
+ test("Submission steps with hdfs interaction and HADOOP_CONF_DIR defined") {
+ val sparkConf = new SparkConf(false)
+ val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
+ val hadoopConf = Some("/etc/hadoop/conf")
+ val orchestrator = new DriverConfigurationStepsOrchestrator(
+ NAMESPACE,
+ APP_ID,
+ LAUNCH_TIME,
+ mainAppResource,
+ APP_NAME,
+ MAIN_CLASS,
+ APP_ARGS,
+ ADDITIONAL_PYTHON_FILES,
+ hadoopConf,
+ sparkConf)
+ val steps = orchestrator.getAllConfigurationSteps()
+ validateStepTypes(
+ orchestrator,
+ classOf[BaseDriverConfigurationStep],
+ classOf[DriverAddressConfigurationStep],
+ classOf[DriverKubernetesCredentialsStep],
+ classOf[DependencyResolutionStep],
+ classOf[HadoopConfigBootstrapStep])
+ }
private def validateStepTypes(
- orchestrator: DriverConfigurationStepsOrchestrator,
- types: Class[_ <: DriverConfigurationStep]*): Unit = {
+ orchestrator: DriverConfigurationStepsOrchestrator,
+ types: Class[_ <: DriverConfigurationStep]*): Unit = {
val steps = orchestrator.getAllConfigurationSteps()
assert(steps.size === types.size)
assert(steps.map(_.getClass) === types)
}
+
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala
new file mode 100644
index 0000000000000..28f6c9e53a4cc
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtil, PodWithMainContainer}
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{
+ private val CONFIG_MAP_NAME = "config-map"
+ private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
+ private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
+ private val SPARK_USER_VALUE = "sparkUser"
+
+ @Mock
+ private var hadoopUtil: HadoopUGIUtil = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(hadoopUtil.getShortName).thenReturn(SPARK_USER_VALUE)
+ }
+
+ test("Test of bootstrapping hadoop_conf_dir files") {
+ val hadoopConfStep = new HadoopConfBootstrapImpl(
+ CONFIG_MAP_NAME,
+ HADOOP_FILES,
+ hadoopUtil)
+ val expectedKeyPaths = Seq(
+ new KeyToPathBuilder()
+ .withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)
+ .withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString)
+ .build())
+ val expectedPod = new PodBuilder()
+ .editOrNewSpec()
+ .addNewVolume()
+ .withName(HADOOP_FILE_VOLUME)
+ .withNewConfigMap()
+ .withName(CONFIG_MAP_NAME)
+ .withItems(expectedKeyPaths.asJava)
+ .endConfigMap()
+ .endVolume()
+ .endSpec()
+ .build()
+
+ val podWithMain = PodWithMainContainer(
+ new PodBuilder().withNewSpec().endSpec().build(),
+ new Container())
+ val returnedPodContainer = hadoopConfStep.bootstrapMainContainerAndVolumes(podWithMain)
+ assert(expectedPod === returnedPodContainer.pod)
+ assert(returnedPodContainer.mainContainer.getVolumeMounts.asScala.map(vm =>
+ (vm.getName, vm.getMountPath)).head === (HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH))
+ assert(returnedPodContainer.mainContainer.getEnv.asScala.head ===
+ new EnvVarBuilder().withName(ENV_HADOOP_CONF_DIR).withValue(HADOOP_CONF_DIR_PATH).build())
+ assert(returnedPodContainer.mainContainer.getEnv.asScala(1) ===
+ new EnvVarBuilder().withName(ENV_SPARK_USER).withValue(SPARK_USER_VALUE).build())
+ }
+ private def createTempFile(contents: String): File = {
+ val dir = Utils.createTempDir()
+ val file = new File(dir, s"${UUID.randomUUID().toString}")
+ Files.write(contents.getBytes, file)
+ file
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala
new file mode 100644
index 0000000000000..13ee6790aa4e1
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.{KerberosTokenConfBootstrapImpl, PodWithMainContainer}
+import org.apache.spark.deploy.k8s.constants._
+
+
+private[spark] class KerberosTokenConfBootstrapSuite extends SparkFunSuite {
+ private val SECRET_NAME = "dtSecret"
+ private val SECRET_LABEL = "dtLabel"
+ private val TEST_SPARK_USER = "hdfs"
+
+ test("Test of bootstrapping kerberos secrets and env") {
+ val kerberosConfStep = new KerberosTokenConfBootstrapImpl(
+ SECRET_NAME,
+ SECRET_LABEL,
+ TEST_SPARK_USER)
+ val expectedPod = new PodBuilder()
+ .editOrNewSpec()
+ .addNewVolume()
+ .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+ .withNewSecret()
+ .withSecretName(SECRET_NAME)
+ .endSecret()
+ .endVolume()
+ .endSpec()
+ .build()
+ val podWithMain = PodWithMainContainer(
+ new PodBuilder().withNewSpec().endSpec().build(),
+ new Container())
+ val returnedPodContainer = kerberosConfStep.bootstrapMainContainerAndVolumes(podWithMain)
+ assert(expectedPod === returnedPodContainer.pod)
+ assert(returnedPodContainer.mainContainer.getVolumeMounts.asScala.map(vm =>
+ (vm.getName, vm.getMountPath)).head ===
+ (SPARK_APP_HADOOP_SECRET_VOLUME_NAME, SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR))
+ assert(returnedPodContainer.mainContainer.getEnv.asScala.head.getName ===
+ ENV_HADOOP_TOKEN_FILE_LOCATION)
+ assert(returnedPodContainer.mainContainer.getEnv.asScala(1).getName === ENV_SPARK_USER)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala
new file mode 100644
index 0000000000000..12de82600f3b9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
+
+
+private[spark] class HadoopConfigBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter{
+ private val CONFIG_MAP_NAME = "config-map"
+ private val POD_LABEL = Map("bootstrap" -> "true")
+ private val DRIVER_CONTAINER_NAME = "driver-container"
+ private val EXPECTED_SECRET = new SecretBuilder()
+ .withNewMetadata()
+ .withName(HADOOP_KERBEROS_SECRET_NAME)
+ .endMetadata()
+ .addToData("data", "secretata")
+ .build()
+
+ @Mock
+ private var hadoopConfigStep : HadoopConfigurationStep = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(hadoopConfigStep.configureContainers(any[HadoopConfigSpec])).thenReturn(
+ HadoopConfigSpec(
+ configMapProperties = Map("data" -> "dataBytesToString"),
+ driverPod = new PodBuilder()
+ .withNewMetadata()
+ .addToLabels("bootstrap", "true")
+ .endMetadata()
+ .withNewSpec().endSpec()
+ .build(),
+ driverContainer = new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(),
+ additionalDriverSparkConf = Map("sparkConf" -> "confValue"),
+ dtSecret =
+ Some(EXPECTED_SECRET),
+ dtSecretName = HADOOP_KERBEROS_SECRET_NAME,
+ dtSecretItemKey = ""))
+ }
+
+ test("Test modification of driverSpec with Hadoop Steps") {
+ val hadoopConfStep = new HadoopConfigBootstrapStep(
+ Seq(hadoopConfigStep),
+ CONFIG_MAP_NAME)
+ val expectedDriverSparkConf = new SparkConf(true)
+ .set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, CONFIG_MAP_NAME)
+ .set("sparkConf", "confValue")
+ val expectedConfigMap = new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIG_MAP_NAME)
+ .endMetadata()
+ .addToData(Map("data" -> "dataBytesToString").asJava)
+ .build()
+ val expectedResources = Seq(expectedConfigMap, EXPECTED_SECRET)
+ val driverSpec = KubernetesDriverSpec(
+ driverPod = new Pod(),
+ driverContainer = new Container(),
+ driverSparkConf = new SparkConf(true),
+ otherKubernetesResources = Seq.empty[HasMetadata])
+ val returnContainerSpec = hadoopConfStep.configureDriver(driverSpec)
+ assert(expectedDriverSparkConf.getAll === returnContainerSpec.driverSparkConf.getAll)
+ assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
+ assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
+ assert(returnContainerSpec.otherKubernetesResources === expectedResources)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala
index ee3b4229b16c1..ee3fe126b40f7 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
import org.apache.spark.util.Utils
-private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
+class InitContainerBootstrapStepSuite extends SparkFunSuite {
private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
private val CONFIG_MAP_NAME = "spark-init-config-map"
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepTest.scala
new file mode 100644
index 0000000000000..f67ddcf4e2010
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSmallLocalFilesStepTest.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps
+
+import java.io.{File, RandomAccessFile}
+
+import com.google.common.base.Charsets
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, HasMetadata, Pod, PodBuilder, Secret}
+import org.junit.Test
+import org.mockito.{Mock, MockitoAnnotations}
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.deploy.k8s.submit.MountSmallFilesBootstrap
+import org.apache.spark.util.Utils
+
+private[spark] class MountSmallLocalFilesStepTest extends SparkFunSuite with BeforeAndAfter {
+
+ private val FIRST_TEMP_FILE_NAME = "file1.txt"
+ private val SECOND_TEMP_FILE_NAME = "file2.txt"
+ private val FIRST_TEMP_FILE_CONTENTS = "123"
+ private val SECOND_TEMP_FILE_CONTENTS = "456"
+ private val REMOTE_FILE_URI = "hdfs://localhost:9000/file3.txt"
+ private val SECRET_NAME = "secret"
+
+ private var tempFolder: File = _
+
+ private val mountSmallFilesBootstrap = new DummyMountSmallFilesBootstrap
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ tempFolder = Utils.createTempDir()
+ }
+
+ after {
+ tempFolder.delete()
+ }
+
+ test("Local files should be added to the secret.") {
+ val firstTempFile = createTempFileWithContents(
+ tempFolder, FIRST_TEMP_FILE_NAME, FIRST_TEMP_FILE_CONTENTS)
+ val secondTempFile = createTempFileWithContents(
+ tempFolder, SECOND_TEMP_FILE_NAME, SECOND_TEMP_FILE_CONTENTS)
+ val sparkFiles = Seq(
+ firstTempFile.getAbsolutePath,
+ secondTempFile.getAbsolutePath,
+ REMOTE_FILE_URI)
+ val configurationStep = new MountSmallLocalFilesStep(
+ sparkFiles,
+ SECRET_NAME,
+ MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
+ mountSmallFilesBootstrap)
+ val baseDriverSpec = new KubernetesDriverSpec(
+ new PodBuilder().build(),
+ new ContainerBuilder().build(),
+ Seq.empty[HasMetadata],
+ new SparkConf(false))
+ val configuredDriverSpec = configurationStep.configureDriver(baseDriverSpec)
+ assert(configuredDriverSpec.otherKubernetesResources.size === 1)
+ assert(configuredDriverSpec.otherKubernetesResources(0).isInstanceOf[Secret])
+ val localFilesSecret = configuredDriverSpec.otherKubernetesResources(0).asInstanceOf[Secret]
+ assert(localFilesSecret.getMetadata.getName === SECRET_NAME)
+ val expectedSecretContents = Map(
+ FIRST_TEMP_FILE_NAME -> BaseEncoding.base64().encode(
+ FIRST_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8)),
+ SECOND_TEMP_FILE_NAME -> BaseEncoding.base64().encode(
+ SECOND_TEMP_FILE_CONTENTS.getBytes(Charsets.UTF_8)))
+ assert(localFilesSecret.getData.asScala === expectedSecretContents)
+ assert(configuredDriverSpec.driverPod.getMetadata.getLabels.asScala ===
+ Map(mountSmallFilesBootstrap.LABEL_KEY -> mountSmallFilesBootstrap.LABEL_VALUE))
+ assert(configuredDriverSpec.driverContainer.getEnv.size() === 1)
+ assert(configuredDriverSpec.driverContainer.getEnv.get(0).getName ===
+ mountSmallFilesBootstrap.ENV_KEY)
+ assert(configuredDriverSpec.driverContainer.getEnv.get(0).getValue ===
+ mountSmallFilesBootstrap.ENV_VALUE)
+ assert(configuredDriverSpec.driverSparkConf.get(
+ EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) ===
+ Some(SECRET_NAME))
+ assert(configuredDriverSpec.driverSparkConf.get(
+ EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH) ===
+ Some(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH))
+ }
+
+ test("Using large files should throw an exception.") {
+ val largeTempFileContents = BaseEncoding.base64().encode(new Array[Byte](10241))
+ val largeTempFile = createTempFileWithContents(tempFolder, "large.txt", largeTempFileContents)
+ val configurationStep = new MountSmallLocalFilesStep(
+ Seq(largeTempFile.getAbsolutePath),
+ SECRET_NAME,
+ MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
+ mountSmallFilesBootstrap)
+ val baseDriverSpec = new KubernetesDriverSpec(
+ new PodBuilder().build(),
+ new ContainerBuilder().build(),
+ Seq.empty[HasMetadata],
+ new SparkConf(false))
+ try {
+ configurationStep.configureDriver(baseDriverSpec)
+ fail("Using the small local files mounter should not be allowed with big files.")
+ } catch {
+ case e: Throwable =>
+ assert(e.getMessage ===
+ s"requirement failed: Total size of all files submitted must be less than" +
+ s" ${MountSmallLocalFilesStep.MAX_SECRET_BUNDLE_SIZE_BYTES_STRING} if you do not" +
+ s" use a resource staging server. The total size of all submitted local" +
+ s" files is ${Utils.bytesToString(largeTempFile.length())}. Please install a" +
+ s" resource staging server and configure your application to use it via" +
+ s" ${RESOURCE_STAGING_SERVER_URI.key}"
+ )
+ }
+ }
+
+ private def createTempFileWithContents(
+ root: File,
+ fileName: String,
+ fileContents: String): File = {
+ val tempFile = new File(root, fileName)
+ tempFile.createNewFile()
+ Files.write(fileContents, tempFile, Charsets.UTF_8)
+ tempFile
+ }
+
+ private class DummyMountSmallFilesBootstrap extends MountSmallFilesBootstrap {
+ val LABEL_KEY = "smallFilesLabelKey"
+ val LABEL_VALUE = "smallFilesLabelValue"
+ val ENV_KEY = "smallFilesEnvKey"
+ val ENV_VALUE = "smallFilesEnvValue"
+
+ override def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = {
+ val editedPod = new PodBuilder(pod)
+ .editOrNewMetadata()
+ .addToLabels(LABEL_KEY, LABEL_VALUE)
+ .endMetadata()
+ .build()
+ val editedContainer = new ContainerBuilder(container)
+ .addNewEnv()
+ .withName(ENV_KEY)
+ .withValue(ENV_VALUE)
+ .endEnv()
+ .build()
+ (editedPod, editedContainer)
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala
new file mode 100644
index 0000000000000..41a889dce7ccb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import java.io.File
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+import org.apache.commons.io.FileUtils.readFileToString
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.when
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.{HadoopConfBootstrap, PodWithMainContainer}
+import org.apache.spark.deploy.k8s.constants.HADOOP_CONF_DIR_LOC
+import org.apache.spark.util.Utils
+
+
+private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with BeforeAndAfter{
+ private val CONFIG_MAP_NAME = "config-map"
+ private val HADOOP_CONF_DIR_VAL = "/etc/hadoop"
+ private val POD_LABEL = Map("bootstrap" -> "true")
+ private val DRIVER_CONTAINER_NAME = "driver-container"
+ private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
+ private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
+
+ @Mock
+ private var hadoopConfBootstrap : HadoopConfBootstrap = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(hadoopConfBootstrap.bootstrapMainContainerAndVolumes(
+ any[PodWithMainContainer])).thenAnswer(new Answer[PodWithMainContainer] {
+ override def answer(invocation: InvocationOnMock) : PodWithMainContainer = {
+ val pod = invocation.getArgumentAt(0, classOf[PodWithMainContainer])
+ pod.copy(
+ pod =
+ new PodBuilder(pod.pod)
+ .withNewMetadata()
+ .addToLabels("bootstrap", "true")
+ .endMetadata()
+ .withNewSpec().endSpec()
+ .build(),
+ mainContainer =
+ new ContainerBuilder()
+ .withName(DRIVER_CONTAINER_NAME).build()
+ )}})
+ }
+
+ test("Test of mounting hadoop_conf_dir files into HadoopConfigSpec") {
+ val hadoopConfStep = new HadoopConfMounterStep(
+ CONFIG_MAP_NAME,
+ HADOOP_FILES,
+ hadoopConfBootstrap,
+ HADOOP_CONF_DIR_VAL)
+ val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL)
+ val expectedConfigMap = Map(
+ TEMP_HADOOP_FILE.toPath.getFileName.toString ->
+ readFileToString(TEMP_HADOOP_FILE)
+ )
+ val hadoopConfSpec = HadoopConfigSpec(
+ Map.empty[String, String],
+ new Pod(),
+ new Container(),
+ Map.empty[String, String],
+ None,
+ "",
+ "")
+ val returnContainerSpec = hadoopConfStep.configureContainers(hadoopConfSpec)
+ assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf)
+ assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
+ assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
+ assert(returnContainerSpec.configMapProperties === expectedConfigMap)
+ }
+ private def createTempFile(contents: String): File = {
+ val dir = Utils.createTempDir()
+ val file = new File(dir, s"${UUID.randomUUID().toString}")
+ Files.write(contents.getBytes, file)
+ file
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala
new file mode 100644
index 0000000000000..f5228112a0453
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import java.io.File
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.when
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.HadoopUGIUtil
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+
+
+private[spark] class HadoopKerberosKeytabResolverStepSuite
+ extends SparkFunSuite with BeforeAndAfter{
+ private val POD_LABEL = Map("bootstrap" -> "true")
+ private val DRIVER_CONTAINER_NAME = "driver-container"
+ private val TEMP_KEYTAB_FILE = createTempFile("keytab")
+ private val KERB_PRINCIPAL = "user@k8s.com"
+ private val SPARK_USER_VALUE = "sparkUser"
+ private val TEST_TOKEN_VALUE = "data"
+ private def getByteArray(input: String) = input.toCharArray.map(_.toByte)
+ private val TEST_DATA = getByteArray(TEST_TOKEN_VALUE)
+ private val OUTPUT_TEST_DATA = Base64.encodeBase64String(TEST_DATA)
+ private val INTERVAL = 500L
+ private val CURR_TIME = System.currentTimeMillis()
+ private val DATA_KEY_NAME =
+ s"$KERBEROS_SECRET_LABEL_PREFIX-$CURR_TIME-$INTERVAL"
+ private val SECRET_NAME = s"$HADOOP_KERBEROS_SECRET_NAME.$CURR_TIME"
+
+ private val hadoopUGI = new HadoopUGIUtil()
+
+ @Mock
+ private var hadoopUtil: HadoopUGIUtil = _
+
+ @Mock
+ private var ugi: UserGroupInformation = _
+
+ @Mock
+ private var creds: Credentials = _
+
+ @Mock
+ private var token: Token[AbstractDelegationTokenIdentifier] = _
+
+ @Mock
+ private var identifier: AbstractDelegationTokenIdentifier = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(hadoopUtil.loginUserFromKeytabAndReturnUGI(any[String], any[String]))
+ .thenAnswer(new Answer[UserGroupInformation] {
+ override def answer(invocation: InvocationOnMock): UserGroupInformation = {
+ hadoopUGI.getCurrentUser
+ }
+ })
+ when(hadoopUtil.getCurrentUser).thenReturn(ugi)
+ when(hadoopUtil.getShortName).thenReturn(SPARK_USER_VALUE)
+ when(hadoopUtil.dfsAddDelegationToken(any(), any(), any())).thenReturn(null)
+ when(ugi.getCredentials).thenReturn(creds)
+ val tokens = List[Token[_ <: TokenIdentifier]](token).asJavaCollection
+ when(creds.getAllTokens).thenReturn(tokens)
+ when(hadoopUtil.serialize(any[Credentials]))
+ .thenReturn(TEST_DATA)
+ when(token.decodeIdentifier()).thenReturn(identifier)
+ when(hadoopUtil.getCurrentTime).thenReturn(CURR_TIME)
+ when(hadoopUtil.getTokenRenewalInterval(any[Iterable[Token[_ <: TokenIdentifier]]],
+ any[Configuration])).thenReturn(Some(INTERVAL))
+ }
+
+ test("Testing keytab login") {
+ when(hadoopUtil.isSecurityEnabled).thenReturn(true)
+ val keytabStep = new HadoopKerberosKeytabResolverStep(
+ new SparkConf(),
+ Some(KERB_PRINCIPAL),
+ Some(TEMP_KEYTAB_FILE),
+ None,
+ hadoopUtil)
+ val hadoopConfSpec = HadoopConfigSpec(
+ Map.empty[String, String],
+ new PodBuilder()
+ .withNewMetadata()
+ .addToLabels("bootstrap", "true")
+ .endMetadata()
+ .withNewSpec().endSpec()
+ .build(),
+ new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(),
+ Map.empty[String, String],
+ None,
+ "",
+ "")
+ val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec)
+ assert(returnContainerSpec.additionalDriverSparkConf(HADOOP_KERBEROS_CONF_ITEM_KEY)
+ .contains(KERBEROS_SECRET_LABEL_PREFIX))
+ assert(returnContainerSpec.additionalDriverSparkConf ===
+ Map(HADOOP_KERBEROS_CONF_ITEM_KEY -> DATA_KEY_NAME,
+ HADOOP_KERBEROS_CONF_SECRET -> SECRET_NAME))
+ assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
+ assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
+ assert(returnContainerSpec.dtSecretItemKey === DATA_KEY_NAME)
+ assert(returnContainerSpec.dtSecret.get.getData.asScala === Map(
+ DATA_KEY_NAME -> OUTPUT_TEST_DATA))
+ assert(returnContainerSpec.dtSecretName === SECRET_NAME)
+ assert(returnContainerSpec.dtSecret.get.getMetadata.getLabels.asScala ===
+ Map("refresh-hadoop-tokens" -> "yes"))
+ assert(returnContainerSpec.dtSecret.nonEmpty)
+ assert(returnContainerSpec.dtSecret.get.getMetadata.getName === SECRET_NAME)
+ }
+
+ private def createTempFile(contents: String): File = {
+ val dir = Utils.createTempDir()
+ val file = new File(dir, s"${UUID.randomUUID().toString}")
+ Files.write(contents.getBytes, file)
+ file
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala
new file mode 100644
index 0000000000000..2b5a8c57e109c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.constants._
+
+private[spark] class HadoopKerberosSecretResolverStepSuite extends SparkFunSuite {
+ private val CONFIG_MAP_NAME = "config-map"
+ private val HADOOP_CONF_DIR_VAL = "/etc/hadoop"
+ private val POD_LABEL = Map("bootstrap" -> "true")
+ private val DRIVER_CONTAINER_NAME = "driver-container"
+ private val TOKEN_SECRET_NAME = "secretName"
+ private val TOKEN_SECRET_DATA_ITEM_KEY = "secretItemKey"
+
+ test("Testing kerberos with Secret") {
+ val keytabStep = new HadoopKerberosSecretResolverStep(
+ new SparkConf(),
+ TOKEN_SECRET_NAME,
+ TOKEN_SECRET_DATA_ITEM_KEY)
+ val expectedDriverSparkConf = Map(
+ HADOOP_KERBEROS_CONF_ITEM_KEY -> TOKEN_SECRET_DATA_ITEM_KEY,
+ HADOOP_KERBEROS_CONF_SECRET -> TOKEN_SECRET_NAME)
+ val hadoopConfSpec = HadoopConfigSpec(
+ Map.empty[String, String],
+ new PodBuilder()
+ .withNewMetadata()
+ .addToLabels("bootstrap", "true")
+ .endMetadata()
+ .withNewSpec().endSpec()
+ .build(),
+ new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(),
+ Map.empty[String, String],
+ None,
+ "",
+ "")
+ val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec)
+ assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf)
+ assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
+ assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
+ assert(returnContainerSpec.dtSecret === None)
+ assert(returnContainerSpec.dtSecretItemKey === TOKEN_SECRET_DATA_ITEM_KEY)
+ assert(returnContainerSpec.dtSecretName === TOKEN_SECRET_NAME)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala
new file mode 100644
index 0000000000000..539b682d175dd
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.config._
+
+private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
+ private val NAMESPACE = "testNamespace"
+ private val HADOOP_CONFIG_MAP = "hadoop-config-map"
+ private val HADOOP_CONF_DIR_VAL = "/etc/hadoop/conf"
+
+ test("Testing without Kerberos") {
+ val sparkTestConf = new SparkConf(true)
+ .set(KUBERNETES_KERBEROS_SUPPORT, false)
+ val hadoopOrchestrator = new HadoopStepsOrchestrator(
+ NAMESPACE,
+ HADOOP_CONFIG_MAP,
+ sparkTestConf,
+ HADOOP_CONF_DIR_VAL)
+ val steps = hadoopOrchestrator.getHadoopSteps()
+ assert(steps.length === 1)
+ assert(steps.head.isInstanceOf[HadoopConfMounterStep])
+ }
+
+ test("Testing with Keytab Kerberos Login") {
+ val sparkTestConf = new SparkConf(true)
+ .set(KUBERNETES_KERBEROS_SUPPORT, true)
+ .set(KUBERNETES_KERBEROS_KEYTAB, "keytab.file")
+ .set(KUBERNETES_KERBEROS_PRINCIPAL, "user@kerberos")
+ val hadoopOrchestrator = new HadoopStepsOrchestrator(
+ NAMESPACE,
+ HADOOP_CONFIG_MAP,
+ sparkTestConf,
+ HADOOP_CONF_DIR_VAL)
+ val steps = hadoopOrchestrator.getHadoopSteps()
+ assert(steps.length === 2)
+ assert(steps.head.isInstanceOf[HadoopConfMounterStep])
+ assert(steps(1).isInstanceOf[HadoopKerberosKeytabResolverStep])
+ }
+
+ test("Testing with kinit Kerberos Login") {
+ val sparkTestConf = new SparkConf(true)
+ .set(KUBERNETES_KERBEROS_SUPPORT, true)
+ val hadoopOrchestrator = new HadoopStepsOrchestrator(
+ NAMESPACE,
+ HADOOP_CONFIG_MAP,
+ sparkTestConf,
+ HADOOP_CONF_DIR_VAL)
+ val steps = hadoopOrchestrator.getHadoopSteps()
+ assert(steps.length === 2)
+ assert(steps.head.isInstanceOf[HadoopConfMounterStep])
+ assert(steps(1).isInstanceOf[HadoopKerberosKeytabResolverStep])
+ }
+
+ test("Testing with Secret stored Kerberos") {
+ val sparkTestConf = new SparkConf(true)
+ .set(KUBERNETES_KERBEROS_SUPPORT, true)
+ .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, "dtSecret")
+ .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtItemKey")
+ val hadoopOrchestrator = new HadoopStepsOrchestrator(
+ NAMESPACE,
+ HADOOP_CONFIG_MAP,
+ sparkTestConf,
+ HADOOP_CONF_DIR_VAL)
+ val steps = hadoopOrchestrator.getHadoopSteps()
+ assert(steps.length === 2)
+ assert(steps.head.isInstanceOf[HadoopConfMounterStep])
+ assert(steps(1).isInstanceOf[HadoopKerberosSecretResolverStep])
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/BaseInitContainerConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/BaseInitContainerConfigurationStepSuite.scala
index 65df5fcebe382..53e768c0cb898 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/BaseInitContainerConfigurationStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/BaseInitContainerConfigurationStepSuite.scala
@@ -29,7 +29,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.{PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
-class BaseInitContainerConfigurationStepSuite extends SparkFunSuite with BeforeAndAfter{
+private[spark] class BaseInitContainerConfigurationStepSuite
+ extends SparkFunSuite with BeforeAndAfter{
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
private val SPARK_FILES = Seq(
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/SubmittedResourcesInitContainerStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/SubmittedResourcesInitContainerStepSuite.scala
index 1488c0d00b7a5..7894d6fc191ac 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/SubmittedResourcesInitContainerStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/SubmittedResourcesInitContainerStepSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{SubmittedDependencyUploader, SubmittedResourceIdAndSecret}
import org.apache.spark.util.Utils
-class SubmittedResourcesInitContainerStepSuite extends SparkFunSuite with BeforeAndAfter {
+private[spark] class SubmittedResourcesInitContainerStepSuite
+ extends SparkFunSuite with BeforeAndAfter {
private val RESOURCE_SECRET_NAME = "secret"
private val JARS_RESOURCE_ID = "jarsID"
private val JARS_SECRET = "jarsSecret"
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index a9a2937869edd..31be5e584f245 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -19,25 +19,25 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
-import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.mockito.Matchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{doNothing, never, times, verify, when}
+import org.mockito.Mockito.{mock => _, _}
+import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar._
+
import scala.collection.JavaConverters._
import scala.concurrent.Future
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.k8s.config._
-import org.apache.spark.deploy.k8s.constants._
-import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
-import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-
private[spark] class KubernetesClusterSchedulerBackendSuite
extends SparkFunSuite with BeforeAndAfter {
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile
index 7b1effa911f19..27339d72cfd38 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile
@@ -44,6 +44,7 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
index 26d1d805fde2b..3e99ef2809dcb 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
@@ -29,6 +29,7 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile
index a8bb5b362ab52..4e1b2ed91a1b5 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile
@@ -43,6 +43,7 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile
index ab9f67e95a8e5..28e7b8ec3b44f 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile
@@ -29,6 +29,7 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/kerberos-test/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/kerberos-test/Dockerfile
new file mode 100644
index 0000000000000..c4ba43ad511d8
--- /dev/null
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/kerberos-test/Dockerfile
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark distribution. E.g.:
+# docker build -t kerberos-test:latest -f dockerfiles/kerberos-test/Dockerfile .
+
+RUN apk add --no-cache --update krb5 krb5-libs
+COPY examples /opt/spark/examples
+COPY test-scripts/test-env.sh /opt/spark/
+COPY hconf /opt/spark/hconf
\ No newline at end of file
diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/k8s/integrationtest/jobs/HDFSTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/k8s/integrationtest/jobs/HDFSTest.scala
new file mode 100644
index 0000000000000..75536739e3426
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/k8s/integrationtest/jobs/HDFSTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest.jobs
+
+import org.apache.spark.sql.SparkSession
+
+private[spark] object HDFSTest{
+
+ def main(args: Array[String]): Unit = {
+ // scalastyle:off println
+ if (args.length < 1) {
+ System.err.println("Usage: HdfsTest