Skip to content

Commit f2a4033

Browse files
committed
Mount a hadoop secret in the executor pod
1 parent 3c3331a commit f2a4033

File tree

3 files changed

+74
-34
lines changed

3 files changed

+74
-34
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
object HadoopSecretUtil {
24+
25+
def configurePod(secretNameOption: Option[String], pod: Pod) : Pod = {
26+
secretNameOption.map { secret =>
27+
new PodBuilder(pod)
28+
.editOrNewSpec()
29+
.addNewVolume()
30+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
31+
.withNewSecret()
32+
.withSecretName(secret)
33+
.endSecret()
34+
.endVolume()
35+
.endSpec()
36+
.build()
37+
}.getOrElse(pod)
38+
}
39+
40+
def configureContainer(secretNameOption: Option[String],
41+
containerSpec: Container) : Container = {
42+
secretNameOption.map { secret =>
43+
new ContainerBuilder(containerSpec)
44+
.addNewVolumeMount()
45+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
46+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
47+
.endVolumeMount()
48+
.addNewEnv()
49+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
50+
.withValue(SPARK_APP_HADOOP_TOKEN_FILE_PATH)
51+
.endEnv()
52+
.build()
53+
}.getOrElse(containerSpec)
54+
}
55+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverHadoopCredentialsStep.scala

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,24 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
20-
2119
import org.apache.spark.SparkConf
2220
import org.apache.spark.deploy.kubernetes.config._
23-
import org.apache.spark.deploy.kubernetes.constants._
24-
21+
import org.apache.spark.deploy.kubernetes.submit.HadoopSecretUtil
2522

26-
class DriverHadoopCredentialsStep(submissionSparkConf: SparkConf) extends DriverConfigurationStep {
23+
private[spark] class DriverHadoopCredentialsStep(submissionSparkConf: SparkConf)
24+
extends DriverConfigurationStep {
2725

2826
private val maybeMountedHadoopSecret = submissionSparkConf.getOption(MOUNTED_HADOOP_SECRET_CONF)
2927

3028
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
31-
val driverPodWithMountedHadoopTokens = maybeMountedHadoopSecret.map { secret =>
32-
new PodBuilder(driverSpec.driverPod)
33-
.editOrNewSpec()
34-
.addNewVolume()
35-
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
36-
.withNewSecret()
37-
.withSecretName(secret)
38-
.endSecret()
39-
.endVolume()
40-
.endSpec()
41-
.build()
42-
}.getOrElse(driverSpec.driverPod)
43-
val driverContainerWithMountedSecretVolume = maybeMountedHadoopSecret.map { secret =>
44-
new ContainerBuilder(driverSpec.driverContainer)
45-
.addNewVolumeMount()
46-
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
47-
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
48-
.endVolumeMount()
49-
.addNewEnv()
50-
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
51-
.withValue(SPARK_APP_HADOOP_TOKEN_FILE_PATH)
52-
.endEnv()
53-
.build()
54-
}.getOrElse(driverSpec.driverContainer)
29+
val podWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
30+
driverSpec.driverPod)
31+
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
32+
maybeMountedHadoopSecret, driverSpec.driverContainer)
5533
driverSpec.copy(
56-
driverPod = driverPodWithMountedHadoopTokens,
34+
driverPod = podWithMountedHadoopToken,
5735
otherKubernetesResources = driverSpec.otherKubernetesResources,
5836
driverSparkConf = driverSpec.driverSparkConf,
59-
driverContainer = driverContainerWithMountedSecretVolume)
37+
driverContainer = containerWithMountedHadoopToken)
6038
}
6139
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException}
3737
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
3838
import org.apache.spark.deploy.kubernetes.config._
3939
import org.apache.spark.deploy.kubernetes.constants._
40-
import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil
40+
import org.apache.spark.deploy.kubernetes.submit.{HadoopSecretUtil, InitContainerUtil}
4141
import org.apache.spark.network.netty.SparkTransportConf
4242
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
4343
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
@@ -130,6 +130,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
130130
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
131131
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
132132

133+
private val maybeMountedHadoopSecret = conf.getOption(MOUNTED_HADOOP_SECRET_CONF)
134+
133135
private val driverPod = try {
134136
kubernetesClient.pods().inNamespace(kubernetesNamespace).
135137
withName(kubernetesDriverPodName).get()
@@ -582,9 +584,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
582584

583585
val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful(
584586
executorPodWithInitContainer, nodeToLocalTaskCount)
585-
val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity)
587+
val executorPodWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
588+
executorPodWithNodeAffinity)
589+
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
590+
maybeMountedHadoopSecret, initBootstrappedExecutorContainer)
591+
592+
val resolvedExecutorPod = new PodBuilder(executorPodWithMountedHadoopToken)
586593
.editSpec()
587-
.addToContainers(initBootstrappedExecutorContainer)
594+
.addToContainers(containerWithMountedHadoopToken)
588595
.endSpec()
589596
.build()
590597
try {

0 commit comments

Comments
 (0)