Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit a004888

Browse files
authored
Merge pull request #379 from kimoonkim/wip-use-dt-secrets-on-pods
[WIP] Use HDFS Delegation Token in driver/executor pods as part of Secure HDFS Support
2 parents beb1361 + 0141c0a commit a004888

File tree

6 files changed

+126
-4
lines changed

6 files changed

+126
-4
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ package object config extends Logging {
6969
private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
7070
private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
7171

72+
// TODO: This option is intended to be used for internal prototype only until the submission
73+
// client automatically creates the secret file. Remove this option afterward
74+
// unless other use is found.
75+
private[spark] val MOUNTED_HADOOP_SECRET_CONF =
76+
ConfigBuilder("spark.kubernetes.mounted.hadoopSecret")
77+
.doc("Use a Kubernetes secret containing Hadoop tokens such as an HDFS delegation token." +
78+
" The secret should have an entry named 'hadoop-token-file' under the data section," +
79+
" which contains binary dumps of Hadoop tokens.")
80+
.internal()
81+
.stringConf
82+
.createOptional
83+
7284
private[spark] val RESOURCE_STAGING_SERVER_USE_SERVICE_ACCOUNT_CREDENTIALS =
7385
ConfigBuilder(
7486
s"$APISERVER_AUTH_RESOURCE_STAGING_SERVER_CONF_PREFIX.useServiceAccountCredentials")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ package object constants {
4343
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
4444
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
4545

46+
// Hadoop credentials secrets for the Spark app.
47+
private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
48+
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME = "hadoop-token-file"
49+
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_PATH =
50+
s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME"
51+
private[spark] val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"
52+
4653
// Default and fixed ports
4754
private[spark] val SUBMISSION_SERVER_PORT = 7077
4855
private[spark] val DEFAULT_DRIVER_PORT = 7078
@@ -69,6 +76,7 @@ package object constants {
6976
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
7077
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7178
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
79+
private[spark] val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
7280

7381
// Bootstrapping dependencies with the init-container
7482
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep}
23+
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
2424
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.launcher.SparkLauncher
2626
import org.apache.spark.util.Utils
@@ -94,6 +94,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9494
submissionSparkConf)
9595
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
9696
submissionSparkConf, kubernetesResourceNamePrefix)
97+
val hadoopCredentialsStep = new DriverHadoopCredentialsStep(submissionSparkConf)
9798
val pythonStep = mainAppResource match {
9899
case PythonMainAppResource(mainPyResource) =>
99100
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
@@ -131,6 +132,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
131132
Seq(
132133
initialSubmissionStep,
133134
kubernetesCredentialsStep,
135+
hadoopCredentialsStep,
134136
dependencyResolutionStep) ++
135137
initContainerBootstrapStep.toSeq ++
136138
pythonStep.toSeq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
object HadoopSecretUtil {
24+
25+
def configurePod(secretNameOption: Option[String], pod: Pod) : Pod = {
26+
secretNameOption.map { secret =>
27+
new PodBuilder(pod)
28+
.editOrNewSpec()
29+
.addNewVolume()
30+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
31+
.withNewSecret()
32+
.withSecretName(secret)
33+
.endSecret()
34+
.endVolume()
35+
.endSpec()
36+
.build()
37+
}.getOrElse(pod)
38+
}
39+
40+
def configureContainer(secretNameOption: Option[String],
41+
containerSpec: Container) : Container = {
42+
secretNameOption.map { secret =>
43+
new ContainerBuilder(containerSpec)
44+
.addNewVolumeMount()
45+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
46+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
47+
.endVolumeMount()
48+
.addNewEnv()
49+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
50+
.withValue(SPARK_APP_HADOOP_TOKEN_FILE_PATH)
51+
.endEnv()
52+
.build()
53+
}.getOrElse(containerSpec)
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.deploy.kubernetes.config._
21+
import org.apache.spark.deploy.kubernetes.submit.HadoopSecretUtil
22+
23+
private[spark] class DriverHadoopCredentialsStep(submissionSparkConf: SparkConf)
24+
extends DriverConfigurationStep {
25+
26+
private val maybeMountedHadoopSecret = submissionSparkConf.get(MOUNTED_HADOOP_SECRET_CONF)
27+
28+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
29+
val podWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
30+
driverSpec.driverPod)
31+
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
32+
maybeMountedHadoopSecret,
33+
driverSpec.driverContainer)
34+
driverSpec.copy(
35+
driverPod = podWithMountedHadoopToken,
36+
driverContainer = containerWithMountedHadoopToken)
37+
}
38+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException}
3737
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
3838
import org.apache.spark.deploy.kubernetes.config._
3939
import org.apache.spark.deploy.kubernetes.constants._
40-
import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil
40+
import org.apache.spark.deploy.kubernetes.submit.{HadoopSecretUtil, InitContainerUtil}
4141
import org.apache.spark.network.netty.SparkTransportConf
4242
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
4343
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
@@ -130,6 +130,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
130130
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
131131
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
132132

133+
private val maybeMountedHadoopSecret = conf.get(MOUNTED_HADOOP_SECRET_CONF)
134+
133135
private val driverPod = try {
134136
kubernetesClient.pods().inNamespace(kubernetesNamespace).
135137
withName(kubernetesDriverPodName).get()
@@ -582,9 +584,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
582584

583585
val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful(
584586
executorPodWithInitContainer, nodeToLocalTaskCount)
585-
val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity)
587+
val executorPodWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
588+
executorPodWithNodeAffinity)
589+
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
590+
maybeMountedHadoopSecret, initBootstrappedExecutorContainer)
591+
592+
val resolvedExecutorPod = new PodBuilder(executorPodWithMountedHadoopToken)
586593
.editSpec()
587-
.addToContainers(initBootstrappedExecutorContainer)
594+
.addToContainers(containerWithMountedHadoopToken)
588595
.endSpec()
589596
.build()
590597
try {

0 commit comments

Comments
 (0)