Skip to content

Commit 3fbf88c

Browse files
committed
Integration Stages 1,2 and 3
1 parent 514ac19 commit 3fbf88c

File tree

17 files changed

+165
-243
lines changed

17 files changed

+165
-243
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KerberosConfBootstrap.scala

Lines changed: 0 additions & 48 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
import org.apache.spark.internal.Logging
23+
24+
25+
/**
26+
* This is separated out from the HadoopConf steps API because this component can be reused to
27+
* mounted the DT secret for executors as well.
28+
*/
29+
private[spark] trait KerberosTokenBootstrapConf {
30+
/**
31+
* Bootstraps a main container with the Secret mounted as volumes and an ENV variable
32+
* pointing to the mounted file containing the DT for Secure HDFS interaction
33+
*/
34+
def bootstrapMainContainerAndVolumes(
35+
originalPodWithMainContainer: PodWithMainContainer)
36+
: PodWithMainContainer
37+
}
38+
39+
private[spark] class KerberosTokenConfBootstrapImpl(
40+
secretName: String,
41+
secretLabel: String) extends KerberosTokenBootstrapConf with Logging{
42+
43+
override def bootstrapMainContainerAndVolumes(
44+
originalPodWithMainContainer: PodWithMainContainer)
45+
: PodWithMainContainer = {
46+
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
47+
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
48+
.editOrNewSpec()
49+
.addNewVolume()
50+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
51+
.withNewSecret()
52+
.withSecretName(secretName)
53+
.endSecret()
54+
.endVolume()
55+
.endSpec()
56+
.build()
57+
val mainContainerWithMountedKerberos = new ContainerBuilder(
58+
originalPodWithMainContainer.mainContainer)
59+
.addNewVolumeMount()
60+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
61+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
62+
.endVolumeMount()
63+
.addNewEnv()
64+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
65+
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretLabel")
66+
.endEnv()
67+
.build()
68+
originalPodWithMainContainer.copy(
69+
pod = dtMountedPod,
70+
mainContainer = mainContainerWithMountedKerberos)
71+
}
72+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ package object constants {
4545

4646
// Hadoop credentials secrets for the Spark app.
4747
private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
48-
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME = "hadoop-token-file"
49-
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_PATH =
50-
s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME"
5148
private[spark] val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"
5249

5350
// Default and fixed ports
@@ -110,12 +107,12 @@ package object constants {
110107
// Kerberos Configuration
111108
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
112109
"spark.kubernetes.kerberos.dt"
113-
private[spark] val KERBEROS_SPARK_CONF_NAME =
114-
"spark.kubernetes.kerberos.secretlabelname"
110+
private[spark] val HADOOP_KERBEROS_CONF_SECRET =
111+
"spark.kubernetes.kerberos.secretname"
112+
private[spark] val HADOOP_KERBEROS_CONF_LABEL =
113+
"spark.kubernetes.kerberos.labelname"
115114
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
116115
"hadoop-tokens"
117-
private[spark] val ENV_KERBEROS_SECRET_LABEL =
118-
"KERBEROS_SECRET_LABEL"
119116

120117
// Miscellaneous
121118
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit
1818

19-
import java.io.File
20-
2119
import org.apache.spark.SparkConf
2220
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2321
import org.apache.spark.deploy.kubernetes.config._
@@ -99,14 +97,6 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9997
submissionSparkConf)
10098
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
10199
submissionSparkConf, kubernetesResourceNamePrefix)
102-
// CHANGES
103-
val hadoopCredentialsStep = new DriverHadoopCredentialsStep(submissionSparkConf)
104-
val hadoopConfigurations2 =
105-
sys.env.get("HADOOP_CONF_DIR").map{ conf => getHadoopConfFiles(conf)}
106-
.getOrElse(Array.empty[File])
107-
// CHANGES
108-
val hadoopConfigurations = hadoopConfDir.map(conf => getHadoopConfFiles(conf))
109-
.getOrElse(Array.empty[File])
110100
val hadoopConfigSteps =
111101
if (hadoopConfDir.isEmpty) {
112102
Option.empty[DriverConfigurationStep]
@@ -157,7 +147,6 @@ private[spark] class DriverConfigurationStepsOrchestrator(
157147
Seq(
158148
initialSubmissionStep,
159149
kubernetesCredentialsStep,
160-
hadoopCredentialsStep,
161150
dependencyResolutionStep) ++
162151
initContainerBootstrapStep.toSeq ++
163152
hadoopConfigSteps.toSeq ++

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/HadoopSecretUtil.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverHadoopCredentialsStep.scala

Lines changed: 0 additions & 38 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ private[spark] class HadoopConfigBootstrapStep(
4040
driverContainer = driverSpec.driverContainer,
4141
configMapProperties = Map.empty[String, String],
4242
additionalDriverSparkConf = Map.empty[String, String],
43-
dtSecret = None)
43+
dtSecret = None,
44+
dtSecretName = HADOOP_KERBEROS_SECRET_NAME,
45+
dtSecretLabel = "")
4446
for (nextStep <- hadoopConfigurationSteps) {
4547
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
4648
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HDFSDelegationToken.scala

Lines changed: 0 additions & 23 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ import io.fabric8.kubernetes.api.model.{Container, Pod, Secret}
2929
* - The properties that will be stored into the config map which have (key, value)
3030
* pairs of (path, data)
3131
* - The secret containing a DT, either previously specified or built on the fly
32+
* - The name of the secret where the DT will be stored
33+
* - The label on the secret which correlates with where the current DT data is stored
3234
*/
3335
private[spark] case class HadoopConfigSpec(
3436
additionalDriverSparkConf: Map[String, String],
3537
driverPod: Pod,
3638
driverContainer: Container,
3739
configMapProperties: Map[String, String],
38-
dtSecret: Option[Secret])
40+
dtSecret: Option[Secret],
41+
dtSecretName: String,
42+
dtSecretLabel: String)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ import org.apache.hadoop.security.token.{Token, TokenIdentifier}
3131
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
3232

3333
import org.apache.spark.SparkConf
34-
import org.apache.spark.deploy.kubernetes.{KerberosConfBootstrapImpl, PodWithMainContainer}
35-
import org.apache.spark.deploy.kubernetes.constants._
3634
import org.apache.spark.deploy.SparkHadoopUtil
35+
import org.apache.spark.deploy.kubernetes.{KerberosTokenConfBootstrapImpl, PodWithMainContainer}
36+
import org.apache.spark.deploy.kubernetes.constants._
3737
import org.apache.spark.internal.Logging
3838

3939
/**
@@ -101,29 +101,32 @@ private[spark] class HadoopKerberosKeytabResolverStep(
101101
if (renewedTokens.isEmpty) logError("Did not obtain any Delegation Tokens")
102102
val data = serialize(renewedCredentials)
103103
val renewalTime = getTokenRenewalInterval(renewedTokens, hadoopConf).getOrElse(Long.MaxValue)
104-
val delegationToken = HDFSDelegationToken(data, renewalTime)
105104
val currentTime: Long = System.currentTimeMillis()
106105
val initialTokenLabelName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalTime"
107-
logInfo(s"Storing dt in $initialTokenLabelName")
108106
val secretDT =
109107
new SecretBuilder()
110108
.withNewMetadata()
111109
.withName(HADOOP_KERBEROS_SECRET_NAME)
112110
.endMetadata()
113-
.addToData(initialTokenLabelName, Base64.encodeBase64String(delegationToken.bytes))
111+
.addToData(initialTokenLabelName, Base64.encodeBase64String(data))
114112
.build()
115-
val bootstrapKerberos = new KerberosConfBootstrapImpl(initialTokenLabelName)
113+
val bootstrapKerberos = new KerberosTokenConfBootstrapImpl(
114+
HADOOP_KERBEROS_SECRET_NAME,
115+
initialTokenLabelName)
116116
val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes(
117117
PodWithMainContainer(
118118
hadoopConfigSpec.driverPod,
119119
hadoopConfigSpec.driverContainer))
120120
hadoopConfigSpec.copy(
121121
additionalDriverSparkConf =
122122
hadoopConfigSpec.additionalDriverSparkConf ++ Map(
123-
KERBEROS_SPARK_CONF_NAME -> initialTokenLabelName),
123+
HADOOP_KERBEROS_CONF_LABEL -> initialTokenLabelName,
124+
HADOOP_KERBEROS_CONF_SECRET -> HADOOP_KERBEROS_SECRET_NAME),
124125
driverPod = withKerberosEnvPod.pod,
125126
driverContainer = withKerberosEnvPod.mainContainer,
126-
dtSecret = Some(secretDT))
127+
dtSecret = Some(secretDT),
128+
dtSecretName = HADOOP_KERBEROS_SECRET_NAME,
129+
dtSecretLabel = initialTokenLabelName)
127130
}
128131

129132
// Functions that should be in Core with Rebase to 2.3

0 commit comments

Comments
 (0)