Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 12d590c

Browse files
authored
Avoids adding duplicated secret volumes when init-container is used (#597)
* Avoids adding duplicated secret volumes when init-container is used Cherry-picked from apache#20148. * Added the missing commit from upstream
1 parent d7dd259 commit 12d590c

File tree

13 files changed

+82
-84
lines changed

13 files changed

+82
-84
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.k8s.ConfigurationUtils
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
23+
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
2424
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator
2525
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
26-
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
2726
import org.apache.spark.launcher.SparkLauncher
2827
import org.apache.spark.util.{SystemClock, Utils}
2928

@@ -117,6 +116,13 @@ private[spark] class DriverConfigurationStepsOrchestrator(
117116
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
118117
submissionSparkConf)
119118

119+
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
120+
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
121+
Some(new MountSecretsStep(mountSecretsBootstrap))
122+
} else {
123+
None
124+
}
125+
120126
val hadoopConfigSteps =
121127
hadoopConfDir.map { conf =>
122128
val hadoopStepsOrchestrator =
@@ -204,23 +210,16 @@ private[spark] class DriverConfigurationStepsOrchestrator(
204210
jarsDownloadPath,
205211
localFilesDownloadPath)
206212

207-
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
208-
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
209-
Some(new MountSecretsStep(mountSecretsBootstrap))
210-
} else {
211-
None
212-
}
213-
214213
Seq(
215214
initialSubmissionStep,
216215
driverAddressStep,
217216
kubernetesCredentialsStep,
218217
dependencyResolutionStep,
219218
localDirectoryMountConfigurationStep) ++
219+
mountSecretsStep.toSeq ++
220220
submittedDependenciesBootstrapSteps ++
221221
hadoopConfigSteps.toSeq ++
222-
resourceStep.toSeq ++
223-
mountSecretsStep.toSeq
222+
resourceStep.toSeq
224223
}
225224

226225
private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MountSecretsBootstrap.scala

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,17 @@ package org.apache.spark.deploy.k8s.submit
1818

1919
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
2020

21-
/**
22-
* Bootstraps a driver or executor pod with needed secrets mounted.
23-
*/
24-
private[spark] trait MountSecretsBootstrap {
21+
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
2522

2623
/**
27-
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
24+
* Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
2825
*
2926
* @param pod the pod into which the secret volumes are being added.
30-
* @param container the container into which the secret volumes are being mounted.
31-
* @return the updated pod and container with the secrets mounted.
27+
* @return the updated pod with the secret volumes added.
3228
*/
33-
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
34-
}
35-
36-
private[spark] class MountSecretsBootstrapImpl(
37-
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {
38-
39-
override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
29+
def addSecretVolumes(pod: Pod): Pod = {
4030
var podBuilder = new PodBuilder(pod)
41-
secretNamesToMountPaths.keys.foreach(name =>
31+
secretNamesToMountPaths.keys.foreach { name =>
4232
podBuilder = podBuilder
4333
.editOrNewSpec()
4434
.addNewVolume()
@@ -47,18 +37,30 @@ private[spark] class MountSecretsBootstrapImpl(
4737
.withSecretName(name)
4838
.endSecret()
4939
.endVolume()
50-
.endSpec())
40+
.endSpec()
41+
}
5142

43+
podBuilder.build()
44+
}
45+
46+
/**
47+
* Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
48+
* given container.
49+
*
50+
* @param container the container into which the secret volumes are being mounted.
51+
* @return the updated container with the secrets mounted.
52+
*/
53+
def mountSecrets(container: Container): Container = {
5254
var containerBuilder = new ContainerBuilder(container)
53-
secretNamesToMountPaths.foreach(namePath =>
55+
secretNamesToMountPaths.foreach { case (name, path) =>
5456
containerBuilder = containerBuilder
5557
.addNewVolumeMount()
56-
.withName(secretVolumeName(namePath._1))
57-
.withMountPath(namePath._2)
58+
.withName(secretVolumeName(name))
59+
.withMountPath(path)
5860
.endVolumeMount()
59-
)
61+
}
6062

61-
(podBuilder.build(), containerBuilder.build())
63+
containerBuilder.build()
6264
}
6365

6466
private def secretVolumeName(secretName: String): String = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/MountSecretsStep.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
2121
/**
2222
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
2323
*
24-
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
24+
* @param bootstrap a utility actually handling mounting of the secrets.
2525
*/
2626
private[spark] class MountSecretsStep(
27-
mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
27+
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
2828

2929
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
30-
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
31-
mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
30+
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
31+
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
3232
driverSpec.copy(
33-
driverPod = driverPodWithSecretsMounted,
34-
driverContainer = driverContainerWithSecretsMounted
33+
driverPod = pod,
34+
driverContainer = container
3535
)
3636
}
3737
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
23+
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrap, SubmittedDependencyUploaderImpl}
2424
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
2525
import org.apache.spark.util.Utils
2626

@@ -140,7 +140,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
140140
KUBERNETES_DRIVER_SECRETS_PREFIX,
141141
"driver secrets")
142142
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
143-
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
143+
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
144144
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
145145
} else {
146146
None

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,15 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
2222
* An init-container configuration step for mounting user-specified secrets onto user-specified
2323
* paths.
2424
*
25-
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
25+
* @param bootstrap a utility actually handling mounting of the secrets.
2626
*/
2727
private[spark] class InitContainerMountSecretsStep(
28-
mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
28+
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
2929

30-
override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
31-
val (podWithSecretsMounted, initContainerWithSecretsMounted) =
32-
mountSecretsBootstrap.mountSecrets(
33-
initContainerSpec.podToInitialize,
34-
initContainerSpec.initContainer)
35-
initContainerSpec.copy(
36-
podToInitialize = podWithSecretsMounted,
37-
initContainer = initContainerWithSecretsMounted
38-
)
30+
override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
31+
// Mount the secret volumes given that the volumes have already been added to the driver pod
32+
// when mounting the secrets into the main driver container.
33+
val initContainer = bootstrap.mountSecrets(spec.initContainer)
34+
spec.copy(initContainer = initContainer)
3935
}
4036
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
2222

23-
import org.apache.spark.{SparkConf, SparkException}
23+
import org.apache.spark.SparkConf
2424
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap}
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
@@ -235,7 +235,8 @@ private[spark] class ExecutorPodFactoryImpl(
235235

236236
val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
237237
mountSecretsBootstrap.map {bootstrap =>
238-
bootstrap.mountSecrets(executorPod, containerWithExecutorLimitCores)
238+
(bootstrap.addSecretVolumes(executorPod),
239+
bootstrap.mountSecrets(containerWithExecutorLimitCores))
239240
}.getOrElse((executorPod, containerWithExecutorLimitCores))
240241
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
241242
mountSmallFilesBootstrap.map { bootstrap =>
@@ -258,7 +259,7 @@ private[spark] class ExecutorPodFactoryImpl(
258259

259260
val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
260261
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
261-
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
262+
(podWithDetachedInitContainer.pod, bootstrap.mountSecrets(resolvedInitContainer))
262263
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)
263264

264265
val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.SparkContext
2424
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
27-
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
27+
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrap, MountSmallFilesBootstrapImpl}
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.network.netty.SparkTransportConf
3030
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
@@ -125,12 +125,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
125125
val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
126126
KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
127127
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
128-
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
128+
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
129129
} else {
130130
None
131131
}
132132
val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
133-
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
133+
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
134134
} else {
135135
None
136136
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.k8s.submit
17+
package org.apache.spark.deploy.k8s
1818

1919
import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model.{Container, Pod}
2222

2323
private[spark] object SecretVolumeUtils {
2424

25-
def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
26-
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
25+
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
26+
pod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
2727
}
2828

2929
def containerHasVolume(
30-
driverContainer: Container,
30+
container: Container,
3131
volumeName: String,
3232
mountPath: String): Boolean = {
33-
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
33+
container.getVolumeMounts.asScala.exists(volumeMount =>
3434
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
3535
}
3636
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/MountSecretsBootstrapSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
1919
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
2020

2121
import org.apache.spark.SparkFunSuite
22+
import org.apache.spark.deploy.k8s.SecretVolumeUtils
2223

2324
private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {
2425

@@ -34,9 +35,9 @@ private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {
3435
val driverContainer = new ContainerBuilder().build()
3536
val driverPod = new PodBuilder().build()
3637

37-
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
38+
val bootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
3839
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
39-
mountSecretsBootstrap.mountSecrets(driverPod, driverContainer)
40+
(bootstrap.addSecretVolumes(driverPod), bootstrap.mountSecrets(driverContainer))
4041
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
4142
assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)))
4243
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.submit.submitsteps
17+
package org.apache.spark.deploy.k8s.submit.submitsteps
1818

1919
import java.nio.file.Paths
2020

2121
import scala.collection.JavaConverters._
2222

2323
import org.apache.spark.{SparkConf, SparkFunSuite}
2424
import org.apache.spark.deploy.k8s.constants._
25-
import org.apache.spark.deploy.k8s.submit.submitsteps.{KubernetesDriverSpec, LocalDirectoryMountConfigurationStep}
2625

2726
private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite {
2827

0 commit comments

Comments
 (0)