Skip to content

Commit f28cb17

Browse files
liyinan926mccheah
authored andcommitted
Added configuration properties to inject arbitrary secrets into the driver/executors (apache-spark-on-k8s#479)
* Added configuration properties to inject arbitrary secrets into the driver/executors * Addressed comments
1 parent b61f495 commit f28cb17

File tree

12 files changed

+331
-18
lines changed

12 files changed

+331
-18
lines changed

docs/running-on-kubernetes.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,22 @@ from the other deployment modes. See the [configuration page](configuration.html
800800
the Driver process. The user can specify multiple of these to set multiple environment variables.
801801
</td>
802802
</tr>
803+
<tr>
804+
<td><code>spark.kubernetes.driver.secrets.[SecretName]</code></td>
805+
<td>(none)</td>
806+
<td>
807+
Mounts the Kubernetes secret named <code>SecretName</code> onto the path specified by the value
808+
in the driver Pod. The user can specify multiple instances of this for multiple secrets.
809+
</td>
810+
</tr>
811+
<tr>
812+
<td><code>spark.kubernetes.executor.secrets.[SecretName]</code></td>
813+
<td>(none)</td>
814+
<td>
815+
Mounts the Kubernetes secret named <code>SecretName</code> onto the path specified by the value
816+
in the executor Pods. The user can specify multiple instances of this for multiple secrets.
817+
</td>
818+
</tr>
803819
</table>
804820

805821

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ package object config extends Logging {
152152
.stringConf
153153
.createOptional
154154

155+
private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
156+
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
157+
155158
private[spark] val KUBERNETES_DRIVER_POD_NAME =
156159
ConfigBuilder("spark.kubernetes.driver.pod.name")
157160
.doc("Name of the driver pod.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
23+
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
2424
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.launcher.SparkLauncher
2626
import org.apache.spark.util.Utils
@@ -83,6 +83,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
8383
val allDriverLabels = driverCustomLabels ++ Map(
8484
SPARK_APP_ID_LABEL -> kubernetesAppId,
8585
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
86+
val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
87+
submissionSparkConf,
88+
KUBERNETES_DRIVER_SECRETS_PREFIX,
89+
"driver secrets")
90+
8691
val initialSubmissionStep = new BaseDriverConfigurationStep(
8792
kubernetesAppId,
8893
kubernetesResourceNamePrefix,
@@ -92,8 +97,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9297
mainClass,
9398
appArgs,
9499
submissionSparkConf)
100+
95101
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
96102
submissionSparkConf, kubernetesResourceNamePrefix)
103+
97104
val pythonStep = mainAppResource match {
98105
case PythonMainAppResource(mainPyResource) =>
99106
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
@@ -153,17 +160,27 @@ private[spark] class DriverConfigurationStepsOrchestrator(
153160
} else {
154161
(filesDownloadPath, Seq.empty[DriverConfigurationStep])
155162
}
163+
156164
val dependencyResolutionStep = new DependencyResolutionStep(
157165
sparkJars,
158166
sparkFiles,
159167
jarsDownloadPath,
160168
localFilesDownloadPath)
169+
170+
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
171+
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
172+
Some(new MountSecretsStep(mountSecretsBootstrap))
173+
} else {
174+
None
175+
}
176+
161177
Seq(
162178
initialSubmissionStep,
163179
kubernetesCredentialsStep,
164180
dependencyResolutionStep) ++
165181
submittedDependenciesBootstrapSteps ++
166-
pythonStep.toSeq
182+
pythonStep.toSeq ++
183+
mountSecretsStep.toSeq
167184
}
168185

169186
private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
/**
22+
* Bootstraps a driver or executor pod with needed secrets mounted.
23+
*/
24+
private[spark] trait MountSecretsBootstrap {
25+
26+
/**
27+
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
28+
*
29+
* @param pod the pod into which the secret volumes are being added.
30+
* @param container the container into which the secret volumes are being mounted.
31+
* @return the updated pod and container with the secrets mounted.
32+
*/
33+
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
34+
}
35+
36+
private[spark] class MountSecretsBootstrapImpl(
37+
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {
38+
39+
override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
40+
var podBuilder = new PodBuilder(pod)
41+
secretNamesToMountPaths.keys.foreach(name =>
42+
podBuilder = podBuilder
43+
.editOrNewSpec()
44+
.addNewVolume()
45+
.withName(secretVolumeName(name))
46+
.withNewSecret()
47+
.withSecretName(name)
48+
.endSecret()
49+
.endVolume()
50+
.endSpec())
51+
52+
var containerBuilder = new ContainerBuilder(container)
53+
secretNamesToMountPaths.foreach(namePath =>
54+
containerBuilder = containerBuilder
55+
.addNewVolumeMount()
56+
.withName(secretVolumeName(namePath._1))
57+
.withMountPath(namePath._2)
58+
.endVolumeMount()
59+
)
60+
61+
(podBuilder.build(), containerBuilder.build())
62+
}
63+
64+
private def secretVolumeName(secretName: String): String = {
65+
secretName + "-volume"
66+
}
67+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import org.apache.spark.deploy.kubernetes.submit.MountSecretsBootstrap
20+
21+
/**
22+
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
23+
*
24+
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
25+
*/
26+
private[spark] class MountSecretsStep(
27+
mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
28+
29+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
30+
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
31+
mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
32+
driverSpec.copy(
33+
driverPod = driverPodWithSecretsMounted,
34+
driverContainer = driverContainerWithSecretsMounted
35+
)
36+
}
37+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactory.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.kubernetes
1818

19-
import java.util.concurrent.atomic.AtomicLong
19+
import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
2222
import org.apache.commons.io.FilenameUtils
23-
import scala.collection.JavaConverters._
2423

2524
import org.apache.spark.{SparkConf, SparkException}
2625
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
2726
import org.apache.spark.deploy.kubernetes.config._
2827
import org.apache.spark.deploy.kubernetes.constants._
29-
import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap}
28+
import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap}
3029
import org.apache.spark.util.Utils
3130

3231
// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
@@ -45,6 +44,7 @@ private[spark] trait ExecutorPodFactory {
4544
private[spark] class ExecutorPodFactoryImpl(
4645
sparkConf: SparkConf,
4746
nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier,
47+
mountSecretsBootstrap: Option[MountSecretsBootstrap],
4848
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
4949
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
5050
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
@@ -250,11 +250,18 @@ private[spark] class ExecutorPodFactoryImpl(
250250
.build()
251251
}
252252
}.getOrElse(executorPod)
253+
254+
val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
255+
mountSecretsBootstrap.map {bootstrap =>
256+
bootstrap.mountSecrets(withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
257+
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
258+
253259
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
254260
mountSmallFilesBootstrap.map { bootstrap =>
255261
bootstrap.mountSmallFilesSecret(
256-
withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
257-
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
262+
withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
263+
}.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))
264+
258265
val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
259266
executorInitContainerBootstrap.map { bootstrap =>
260267
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import java.io.File
2121
import io.fabric8.kubernetes.client.Config
2222

2323
import org.apache.spark.SparkContext
24-
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
24+
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
2525
import org.apache.spark.deploy.kubernetes.config._
2626
import org.apache.spark.deploy.kubernetes.constants._
27-
import org.apache.spark.deploy.kubernetes.submit.MountSmallFilesBootstrapImpl
27+
import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
3030

@@ -51,6 +51,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
5151
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
5252
val maybeExecutorInitContainerSecretMountPath =
5353
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR)
54+
5455
val executorInitContainerSecretVolumePlugin = for {
5556
initContainerSecretName <- maybeExecutorInitContainerSecretName
5657
initContainerSecretMountPath <- maybeExecutorInitContainerSecretMountPath
@@ -59,10 +60,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
5960
initContainerSecretName,
6061
initContainerSecretMountPath)
6162
}
63+
6264
// Only set up the bootstrap if they've provided both the config map key and the config map
6365
// name. The config map might not be provided if init-containers aren't being used to
6466
// bootstrap dependencies.
65-
val executorInitContainerbootStrap = for {
67+
val executorInitContainerBootstrap = for {
6668
configMap <- maybeInitContainerConfigMap
6769
configMapKey <- maybeInitContainerConfigMapKey
6870
} yield {
@@ -75,12 +77,22 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
7577
configMap,
7678
configMapKey)
7779
}
80+
7881
val mountSmallFilesBootstrap = for {
7982
secretName <- maybeSubmittedFilesSecret
8083
secretMountPath <- maybeSubmittedFilesSecretMountPath
8184
} yield {
8285
new MountSmallFilesBootstrapImpl(secretName, secretMountPath)
8386
}
87+
88+
val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
89+
KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
90+
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
91+
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
92+
} else {
93+
None
94+
}
95+
8496
if (maybeInitContainerConfigMap.isEmpty) {
8597
logWarning("The executor's init-container config map was not specified. Executors will" +
8698
" therefore not attempt to fetch remote or submitted dependencies.")
@@ -89,6 +101,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
89101
logWarning("The executor's init-container config map key was not specified. Executors will" +
90102
" therefore not attempt to fetch remote or submitted dependencies.")
91103
}
104+
92105
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
93106
KUBERNETES_MASTER_INTERNAL_URL,
94107
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
@@ -99,8 +112,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
99112
val executorPodFactory = new ExecutorPodFactoryImpl(
100113
sparkConf,
101114
NodeAffinityExecutorPodModifierImpl,
115+
mountSecretBootstrap,
102116
mountSmallFilesBootstrap,
103-
executorInitContainerbootStrap,
117+
executorInitContainerBootstrap,
104118
executorInitContainerSecretVolumePlugin)
105119
new KubernetesClusterSchedulerBackend(
106120
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,14 @@ import java.util.Collections
2222
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
2323
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
2424

25-
import com.fasterxml.jackson.databind.ObjectMapper
26-
import com.fasterxml.jackson.module.scala.DefaultScalaModule
27-
import io.fabric8.kubernetes.api.model._
28-
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
29-
import io.fabric8.kubernetes.client.Watcher.Action
30-
import org.apache.commons.io.FilenameUtils
3125
import scala.collection.{concurrent, mutable}
3226
import scala.collection.JavaConverters._
3327
import scala.concurrent.{ExecutionContext, Future}
3428

29+
import io.fabric8.kubernetes.api.model._
30+
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
31+
import io.fabric8.kubernetes.client.Watcher.Action
32+
3533
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
3634
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
3735
import org.apache.spark.deploy.kubernetes.config._

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
2020
import org.apache.spark.deploy.kubernetes.config._
21-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
21+
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
2222

2323
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
2424

@@ -29,6 +29,9 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
2929
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
3030
private val APP_ARGS = Array("arg1", "arg2")
3131
private val ADDITIONAL_PYTHON_FILES = Seq("local:///var/apps/python/py1.py")
32+
private val SECRET_FOO = "foo"
33+
private val SECRET_BAR = "bar"
34+
private val SECRET_MOUNT_PATH = "/etc/secrets/driver"
3235

3336
test("Base submission steps without an init-container or python files.") {
3437
val sparkConf = new SparkConf(false)
@@ -116,6 +119,29 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
116119
classOf[MountSmallLocalFilesStep])
117120
}
118121

122+
test("Submission steps with driver secrets to mount") {
123+
val sparkConf = new SparkConf(false)
124+
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH)
125+
.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH)
126+
val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar")
127+
val orchestrator = new DriverConfigurationStepsOrchestrator(
128+
NAMESPACE,
129+
APP_ID,
130+
LAUNCH_TIME,
131+
mainAppResource,
132+
APP_NAME,
133+
MAIN_CLASS,
134+
APP_ARGS,
135+
ADDITIONAL_PYTHON_FILES,
136+
sparkConf)
137+
validateStepTypes(
138+
orchestrator,
139+
classOf[BaseDriverConfigurationStep],
140+
classOf[DriverKubernetesCredentialsStep],
141+
classOf[DependencyResolutionStep],
142+
classOf[MountSecretsStep])
143+
}
144+
119145
private def validateStepTypes(
120146
orchestrator: DriverConfigurationStepsOrchestrator,
121147
types: Class[_ <: DriverConfigurationStep]*): Unit = {

0 commit comments

Comments
 (0)