Skip to content

Commit 5e4051c

Browse files
committed
handled comments and increased test hardening
1 parent 350c8ed commit 5e4051c

File tree

17 files changed

+127
-80
lines changed

17 files changed

+127
-80
lines changed

docs/running-on-kubernetes.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -769,38 +769,38 @@ from the other deployment modes. See the [configuration page](configuration.html
769769
</td>
770770
</tr>
771771
<tr>
772-
<td><code>spark.kubernetes.kerberos</code></td>
772+
<td><code>spark.kubernetes.kerberos.enabled</code></td>
773773
<td>false</td>
774774
<td>
775-
Specify whether your job is a job that will require a Delegation Token to access HDFS. By default, we
775+
Specify whether your job is a job that will require a Kerberos Authorization to access HDFS. By default, we
776776
will assume that you will not require secure HDFS access.
777777
</td>
778778
</tr>
779779
<tr>
780780
<td><code>spark.kubernetes.kerberos.keytab</code></td>
781781
<td>(none)</td>
782782
<td>
783-
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
783+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
784784
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
785-
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
785+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
786786
will look within your local TGT cache to resolve this.
787787
</td>
788788
</tr>
789789
<tr>
790790
<td><code>spark.kubernetes.kerberos.principal</code></td>
791791
<td>(none)</td>
792792
<td>
793-
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
793+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
794794
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
795-
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
795+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
796796
will look within your local TGT cache to resolve this.
797797
</td>
798798
</tr>
799799
<tr>
800800
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
801801
<td>(none)</td>
802802
<td>
803-
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
803+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
804804
the name of the secret where your existing delegation token data is stored. You must also specify the
805805
label <code>spark.kubernetes.kerberos.tokensecret.name</code> where your data is stored on the secret.
806806
</td>
@@ -809,7 +809,7 @@ from the other deployment modes. See the [configuration page](configuration.html
809809
<td><code>spark.kubernetes.kerberos.tokensecret.label</code></td>
810810
<td>spark.kubernetes.kerberos.dt.label</td>
811811
<td>
812-
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
812+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
813813
the label within the pre-specified secret where the data of your existing delegation token data is stored.
814814
We have a default value of <code>spark.kubernetes.kerberos.dt.label</code> should you not include it. But
815815
you should always include this if you are proposing a pre-existing secret contain the delegation token data.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ import org.apache.spark.internal.Logging
3232
*/
3333
private[spark] trait HadoopConfBootstrap {
3434
/**
35-
* Bootstraps a main container with the ConfigMaps mounted as volumes and an ENV variable
36-
* pointing to the mounted file.
35+
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
36+
* mounted as volumes and an ENV variable pointing to the mounted file.
3737
*/
3838
def bootstrapMainContainerAndVolumes(
3939
originalPodWithMainContainer: PodWithMainContainer)
@@ -68,11 +68,11 @@ private[spark] class HadoopConfBootstrapImpl(
6868
originalPodWithMainContainer.mainContainer)
6969
.addNewVolumeMount()
7070
.withName(HADOOP_FILE_VOLUME)
71-
.withMountPath(HADOOP_FILE_DIR)
71+
.withMountPath(HADOOP_CONF_DIR_PATH)
7272
.endVolumeMount()
7373
.addNewEnv()
74-
.withName(HADOOP_CONF_DIR)
75-
.withValue(HADOOP_FILE_DIR)
74+
.withName(ENV_HADOOP_CONF_DIR)
75+
.withValue(HADOOP_CONF_DIR_PATH)
7676
.endEnv()
7777
.build()
7878
originalPodWithMainContainer.copy(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ package object config extends Logging {
545545
" your existing delegation token is stored. This removes the need" +
546546
" for the job user to provide any keytab for launching a job")
547547
.stringConf
548-
.createWithDefault("spark.kubernetes.kerberos.dt.label")
548+
.createOptional
549549

550550
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
551551
if (!rawMasterString.startsWith("k8s://")) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ package object constants {
9898

9999
// Hadoop Configuration
100100
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
101-
private[spark] val HADOOP_FILE_DIR = "/etc/hadoop"
102-
private[spark] val HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
101+
private[spark] val HADOOP_CONF_DIR_PATH = "/etc/hadoop/conf"
102+
private[spark] val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
103103
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
104104
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
105105
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,18 +98,12 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9898
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
9999
submissionSparkConf, kubernetesResourceNamePrefix)
100100
val hadoopConfigSteps =
101-
if (hadoopConfDir.isEmpty) {
102-
Option.empty[DriverConfigurationStep]
103-
} else {
104-
val hadoopStepsOrchestrator = new HadoopStepsOrchestrator(
105-
namespace,
106-
hadoopConfigMapName,
107-
submissionSparkConf,
108-
hadoopConfDir)
109-
val hadoopConfSteps =
110-
hadoopStepsOrchestrator.getHadoopSteps()
111-
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))
112-
}
101+
hadoopConfDir.map { conf =>
102+
val hadoopStepsOrchestrator =
103+
new HadoopStepsOrchestrator(namespace, hadoopConfigMapName, submissionSparkConf, conf)
104+
val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps()
105+
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))}
106+
.getOrElse(Option.empty[DriverConfigurationStep])
113107
val pythonStep = mainAppResource match {
114108
case PythonMainAppResource(mainPyResource) =>
115109
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ private[spark] class HadoopConfigBootstrapStep(
4949
val configMap =
5050
new ConfigMapBuilder()
5151
.withNewMetadata()
52-
.withName(hadoopConfigMapName)
53-
.endMetadata()
54-
.addToData(currentHadoopSpec.configMapProperties.asJava)
52+
.withName(hadoopConfigMapName)
53+
.endMetadata()
54+
.addToData(currentHadoopSpec.configMapProperties.asJava)
5555
.build()
5656
val executorSparkConf = driverSpec.driverSparkConf.clone()
5757
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[spark] class HadoopConfMounterStep(
3434
hadoopConfigMapName: String,
3535
hadoopConfigurationFiles: Seq[File],
3636
hadoopConfBootstrapConf: HadoopConfBootstrap,
37-
hadoopConfDir: Option[String])
37+
hadoopConfDir: String)
3838
extends HadoopConfigurationStep {
3939

4040
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
@@ -51,8 +51,7 @@ private[spark] class HadoopConfMounterStep(
5151
hadoopConfigurationFiles.map(file =>
5252
(file.toPath.getFileName.toString, readFileToString(file))).toMap,
5353
additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++
54-
hadoopConfDir.map(conf_dir => Map(HADOOP_CONF_DIR_LOC -> conf_dir)).getOrElse(
55-
Map.empty[String, String])
54+
Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir)
5655
)
5756
}
5857
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ private[spark] class HadoopKerberosKeytabResolverStep(
5555
private var originalCredentials: Credentials = _
5656
private var dfs : FileSystem = _
5757
private var renewer: String = _
58-
private var renewedCredentials: Credentials = _
59-
private var renewedTokens: Iterable[Token[_ <: TokenIdentifier]] = _
58+
private var credentials: Credentials = _
59+
private var tokens: Iterable[Token[_ <: TokenIdentifier]] = _
6060
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
6161
val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
62-
logInfo(s"Hadoop Configuration: ${hadoopConf.toString}")
62+
logDebug(s"Hadoop Configuration: ${hadoopConf.toString}")
6363
if (!UserGroupInformation.isSecurityEnabled) logError("Hadoop not configuration with Kerberos")
6464
val maybeJobUserUGI =
6565
for {
@@ -70,7 +70,7 @@ private[spark] class HadoopKerberosKeytabResolverStep(
7070
// Reliant on [Spark-20328] for changing to YARN principal
7171
submissionSparkConf.set("spark.yarn.principal", principal)
7272
submissionSparkConf.set("spark.yarn.keytab", keytab.toURI.toString)
73-
logInfo("Logged into KDC with keytab using Job User UGI")
73+
logDebug("Logged into KDC with keytab using Job User UGI")
7474
UserGroupInformation.loginUserFromKeytabAndReturnUGI(
7575
principal,
7676
keytab.toURI.toString)
@@ -80,27 +80,27 @@ private[spark] class HadoopKerberosKeytabResolverStep(
8080
// It is necessary to run as jobUserUGI because logged in user != Current User
8181
jobUserUGI.doAs(new PrivilegedExceptionAction[Void] {
8282
override def run(): Void = {
83-
logInfo(s"Retrieved Job User UGI: $jobUserUGI")
83+
logDebug(s"Retrieved Job User UGI: $jobUserUGI")
8484
originalCredentials = jobUserUGI.getCredentials
85-
logInfo(s"Original tokens: ${originalCredentials.toString}")
86-
logInfo(s"All tokens: ${originalCredentials.getAllTokens}")
87-
logInfo(s"All secret keys: ${originalCredentials.getAllSecretKeys}")
85+
logDebug(s"Original tokens: ${originalCredentials.toString}")
86+
logDebug(s"All tokens: ${originalCredentials.getAllTokens}")
87+
logDebug(s"All secret keys: ${originalCredentials.getAllSecretKeys}")
8888
dfs = FileSystem.get(hadoopConf)
8989
// This is not necessary with [Spark-20328] since we would be using
9090
// Spark core providers to handle delegation token renewal
9191
renewer = jobUserUGI.getShortUserName
92-
logInfo(s"Renewer is: $renewer")
93-
renewedCredentials = new Credentials(originalCredentials)
94-
dfs.addDelegationTokens(renewer, renewedCredentials)
95-
renewedTokens = renewedCredentials.getAllTokens.asScala
96-
logInfo(s"Renewed tokens: ${renewedCredentials.toString}")
97-
logInfo(s"All renewed tokens: ${renewedTokens.mkString(",")}")
98-
logInfo(s"All renewed secret keys: ${renewedCredentials.getAllSecretKeys}")
92+
logDebug(s"Renewer is: $renewer")
93+
credentials = new Credentials(originalCredentials)
94+
dfs.addDelegationTokens(renewer, credentials)
95+
tokens = credentials.getAllTokens.asScala
96+
logDebug(s"Tokens: ${credentials.toString}")
97+
logDebug(s"All tokens: ${tokens.mkString(",")}")
98+
logDebug(s"All secret keys: ${credentials.getAllSecretKeys}")
9999
null
100100
}})
101-
if (renewedTokens.isEmpty) logError("Did not obtain any Delegation Tokens")
102-
val data = serialize(renewedCredentials)
103-
val renewalTime = getTokenRenewalInterval(renewedTokens, hadoopConf).getOrElse(Long.MaxValue)
101+
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens")
102+
val data = serialize(credentials)
103+
val renewalTime = getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue)
104104
val currentTime: Long = System.currentTimeMillis()
105105
val initialTokenLabelName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalTime"
106106
val secretDT =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
1919
import java.io.File
2020

2121
import org.apache.spark.SparkConf
22-
import org.apache.spark.deploy.kubernetes.HadoopConfBootstrapImpl
22+
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, OptionRequirements}
2323
import org.apache.spark.deploy.kubernetes.config._
2424

2525

@@ -30,16 +30,35 @@ private[spark] class HadoopStepsOrchestrator(
3030
namespace: String,
3131
hadoopConfigMapName: String,
3232
submissionSparkConf: SparkConf,
33-
hadoopConfDir: Option[String]) {
34-
private val maybeKerberosSupport = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
33+
hadoopConfDir: String) {
34+
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
3535
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
3636
private val maybeKeytab = submissionSparkConf.get(KUBERNETES_KERBEROS_KEYTAB)
3737
.map(k => new File(k))
3838
private val maybeExistingSecret = submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
3939
private val maybeExistingSecretLabel =
4040
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_LABEL)
41-
private val hadoopConfigurationFiles = hadoopConfDir.map(conf => getHadoopConfFiles(conf))
42-
.getOrElse(Seq.empty[File])
41+
private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir)
42+
43+
require(maybeKeytab.forall( _ => isKerberosEnabled ),
44+
"You must enable Kerberos support if you are specifying a Kerberos Keytab")
45+
46+
require(maybeExistingSecret.forall( _ => isKerberosEnabled ),
47+
"You must enable Kerberos support if you are specifying a Kerberos Secret")
48+
49+
OptionRequirements.requireBothOrNeitherDefined(
50+
maybeKeytab,
51+
maybePrincipal,
52+
"If a Kerberos keytab is specified you must also specify a Kerberos principal",
53+
"If a Kerberos principal is specified you must also specify a Kerberos keytab")
54+
55+
OptionRequirements.requireBothOrNeitherDefined(
56+
maybeExistingSecret,
57+
maybeExistingSecretLabel,
58+
"If a secret storing a Kerberos Delegation Token is specified you must also" +
59+
" specify the label where the data is stored",
60+
"If a secret label where the data of the Kerberos Delegation Token is specified" +
61+
" you must also specify the name of the secret")
4362

4463
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
4564
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
@@ -51,11 +70,11 @@ private[spark] class HadoopStepsOrchestrator(
5170
hadoopConfBootstrapImpl,
5271
hadoopConfDir)
5372
val maybeKerberosStep =
54-
if (maybeKerberosSupport) {
73+
if (isKerberosEnabled) {
5574
maybeExistingSecret.map(secretLabel => Some(new HadoopKerberosSecretResolverStep(
5675
submissionSparkConf,
5776
secretLabel,
58-
maybeExistingSecretLabel))).getOrElse(Some(
77+
maybeExistingSecretLabel.get))).getOrElse(Some(
5978
new HadoopKerberosKeytabResolverStep(
6079
submissionSparkConf,
6180
maybePrincipal,
@@ -65,6 +84,7 @@ private[spark] class HadoopStepsOrchestrator(
6584
}
6685
Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq
6786
}
87+
6888
private def getHadoopConfFiles(path: String) : Seq[File] = {
6989
def isFile(file: File) = if (file.isFile) Some(file) else None
7090
val dir = new File(path)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
6262
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
6363
.orElse(submissionSparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
6464
.getOrElse(false)
65+
6566
OptionRequirements.requireNandDefined(
6667
maybeResourceStagingServerInternalClientCert,
6768
maybeResourceStagingServerInternalTrustStore,

0 commit comments

Comments
 (0)