Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 455317d

Browse files
mccheahash211
authored andcommitted
Use a secret to mount small files in driver and executors. (#437)
* Use a secret to mount small files in driver and executors. Allows bypassing the resource staging server in a few scenarios. * Fix scalstyle * Address comments and add tests. * Lightly brush up formatting. * Make the working directory empty so that added files don't clobber existing binaries. * Address comments. * Drop testing file size to N+1 of the limit
1 parent f8cf9db commit 455317d

File tree

21 files changed

+524
-93
lines changed

21 files changed

+524
-93
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,22 @@ package object config extends Logging {
450450
.timeConf(TimeUnit.MINUTES)
451451
.createWithDefault(5)
452452

453+
private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET =
454+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName")
455+
.doc("Name of the secret that should be mounted into the executor containers for" +
456+
" distributing submitted small files without the resource staging server.")
457+
.internal()
458+
.stringConf
459+
.createOptional
460+
461+
private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH =
462+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath")
463+
.doc(s"Mount path in the executors for the secret given by" +
464+
s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}")
465+
.internal()
466+
.stringConf
467+
.createOptional
468+
453469
private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP =
454470
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname")
455471
.doc("Name of the config map to use in the init-container that retrieves submitted files" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ package object constants {
6969
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
7070
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7171
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
72+
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
7273

7374
// Bootstrapping dependencies with the init-container
7475
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
@@ -91,6 +92,9 @@ package object constants {
9192
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
9293
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
9394

95+
// Bootstrapping dependencies via a secret
96+
private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files"
97+
9498
// Miscellaneous
9599
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"
96100
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep}
23+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
2424
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.launcher.SparkLauncher
2626
import org.apache.spark.util.Utils
@@ -99,40 +99,77 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9999
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
100100
case _ => Option.empty[DriverConfigurationStep]
101101
}
102-
val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri =>
103-
Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local"
104-
}) {
105-
val initContainerConfigurationStepsOrchestrator =
106-
new InitContainerConfigurationStepsOrchestrator(
107-
namespace,
108-
kubernetesResourceNamePrefix,
109-
sparkJars,
102+
103+
val (localFilesDownloadPath, submittedDependenciesBootstrapSteps) =
104+
if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) {
105+
val (submittedLocalFilesDownloadPath,
106+
sparkFilesResolvedFromInitContainer,
107+
mountSmallFilesWithoutInitContainerStep) =
108+
// If the resource staging server is specified, submit all local files through that.
109+
submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ =>
110+
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
111+
}.getOrElse {
112+
// Else - use a small files bootstrap that submits the local files via a secret.
113+
// Then, indicate to the outer block that the init-container should not handle
114+
// those local files simply by filtering them out.
115+
val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
116+
val smallFilesSecretName = s"${kubernetesAppId}-submitted-files"
117+
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
118+
smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)
119+
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
110120
sparkFiles,
111-
jarsDownloadPath,
112-
filesDownloadPath,
113-
dockerImagePullPolicy,
114-
allDriverLabels,
121+
smallFilesSecretName,
122+
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
123+
mountSmallFilesBootstrap)
124+
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
125+
sparkFilesWithoutLocal.toArray,
126+
Some(mountSmallLocalFilesStep))
127+
}
128+
129+
val initContainerBootstrapStep =
130+
if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFilesResolvedFromInitContainer)) {
131+
val initContainerConfigurationStepsOrchestrator =
132+
new InitContainerConfigurationStepsOrchestrator(
133+
namespace,
134+
kubernetesResourceNamePrefix,
135+
sparkJars,
136+
sparkFilesResolvedFromInitContainer,
137+
jarsDownloadPath,
138+
filesDownloadPath,
139+
dockerImagePullPolicy,
140+
allDriverLabels,
141+
initContainerConfigMapName,
142+
INIT_CONTAINER_CONFIG_MAP_KEY,
143+
submissionSparkConf)
144+
val initContainerConfigurationSteps =
145+
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
146+
Some(new InitContainerBootstrapStep(initContainerConfigurationSteps,
115147
initContainerConfigMapName,
116-
INIT_CONTAINER_CONFIG_MAP_KEY,
117-
submissionSparkConf)
118-
val initContainerConfigurationSteps =
119-
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
120-
Some(new InitContainerBootstrapStep(initContainerConfigurationSteps,
121-
initContainerConfigMapName,
122-
INIT_CONTAINER_CONFIG_MAP_KEY))
148+
INIT_CONTAINER_CONFIG_MAP_KEY))
149+
} else Option.empty[DriverConfigurationStep]
150+
(submittedLocalFilesDownloadPath,
151+
mountSmallFilesWithoutInitContainerStep.toSeq ++
152+
initContainerBootstrapStep.toSeq)
123153
} else {
124-
Option.empty[DriverConfigurationStep]
154+
(filesDownloadPath, Seq.empty[DriverConfigurationStep])
125155
}
126156
val dependencyResolutionStep = new DependencyResolutionStep(
127157
sparkJars,
128158
sparkFiles,
129159
jarsDownloadPath,
130-
filesDownloadPath)
160+
localFilesDownloadPath)
131161
Seq(
132162
initialSubmissionStep,
133163
kubernetesCredentialsStep,
134164
dependencyResolutionStep) ++
135-
initContainerBootstrapStep.toSeq ++
165+
submittedDependenciesBootstrapSteps ++
136166
pythonStep.toSeq
137167
}
168+
169+
private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
170+
files.exists { uri =>
171+
Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local"
172+
}
173+
}
174+
138175
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
private[spark] trait MountSmallFilesBootstrap {
24+
def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container)
25+
}
26+
27+
private[spark] class MountSmallFilesBootstrapImpl(
28+
secretName: String, secretMountPath: String) extends MountSmallFilesBootstrap {
29+
def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = {
30+
val resolvedPod = new PodBuilder(pod)
31+
.editOrNewSpec()
32+
.addNewVolume()
33+
.withName("submitted-files")
34+
.withNewSecret()
35+
.withSecretName(secretName)
36+
.endSecret()
37+
.endVolume()
38+
.endSpec()
39+
.build()
40+
val resolvedContainer = new ContainerBuilder(container)
41+
.addNewEnv()
42+
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
43+
.withValue(secretMountPath)
44+
.endEnv()
45+
.addNewVolumeMount()
46+
.withName("submitted-files")
47+
.withMountPath(secretMountPath)
48+
.endVolumeMount()
49+
.build()
50+
(resolvedPod, resolvedContainer)
51+
}
52+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder
2323

2424
import org.apache.spark.deploy.kubernetes.constants._
2525
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
26+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStep
2627
import org.apache.spark.util.Utils
2728

2829
/**
@@ -36,11 +37,12 @@ private[spark] class DependencyResolutionStep(
3637
sparkJars: Seq[String],
3738
sparkFiles: Seq[String],
3839
jarsDownloadPath: String,
39-
filesDownloadPath: String) extends DriverConfigurationStep {
40+
localFilesDownloadPath: String) extends DriverConfigurationStep {
4041

4142
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
4243
val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath)
43-
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(sparkFiles, filesDownloadPath)
44+
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(
45+
sparkFiles, localFilesDownloadPath)
4446
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
4547
if (resolvedSparkJars.nonEmpty) {
4648
sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,32 @@ private[spark] class InitContainerBootstrapStep(
3333

3434
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
3535
var currentInitContainerSpec = InitContainerSpec(
36-
initContainerProperties = Map.empty[String, String],
37-
additionalDriverSparkConf = Map.empty[String, String],
38-
initContainer = new ContainerBuilder().build(),
39-
driverContainer = driverSpec.driverContainer,
40-
podToInitialize = driverSpec.driverPod,
41-
initContainerDependentResources = Seq.empty[HasMetadata])
36+
initContainerProperties = Map.empty[String, String],
37+
additionalDriverSparkConf = Map.empty[String, String],
38+
initContainer = new ContainerBuilder().build(),
39+
driverContainer = driverSpec.driverContainer,
40+
podToInitialize = driverSpec.driverPod,
41+
initContainerDependentResources = Seq.empty[HasMetadata])
4242
for (nextStep <- initContainerConfigurationSteps) {
4343
currentInitContainerSpec = nextStep.configureInitContainer(currentInitContainerSpec)
4444
}
4545
val configMap = PropertiesConfigMapFromScalaMapBuilder.buildConfigMap(
46-
initContainerConfigMapName,
47-
initContainerConfigMapKey,
48-
currentInitContainerSpec.initContainerProperties)
46+
initContainerConfigMapName,
47+
initContainerConfigMapKey,
48+
currentInitContainerSpec.initContainerProperties)
4949
val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone()
50-
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName)
51-
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey)
52-
.setAll(currentInitContainerSpec.additionalDriverSparkConf)
50+
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName)
51+
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey)
52+
.setAll(currentInitContainerSpec.additionalDriverSparkConf)
5353
val resolvedDriverPod = InitContainerUtil.appendInitContainer(
54-
currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer)
54+
currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer)
5555
driverSpec.copy(
56-
driverPod = resolvedDriverPod,
57-
driverContainer = currentInitContainerSpec.driverContainer,
58-
driverSparkConf = resolvedDriverSparkConf,
59-
otherKubernetesResources =
60-
driverSpec.otherKubernetesResources ++
61-
currentInitContainerSpec.initContainerDependentResources ++
62-
Seq(configMap))
56+
driverPod = resolvedDriverPod,
57+
driverContainer = currentInitContainerSpec.driverContainer,
58+
driverSparkConf = resolvedDriverSparkConf,
59+
otherKubernetesResources =
60+
driverSpec.otherKubernetesResources ++
61+
currentInitContainerSpec.initContainerDependentResources ++
62+
Seq(configMap))
6363
}
6464
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import java.io.File
20+
21+
import com.google.common.io.{BaseEncoding, Files}
22+
import io.fabric8.kubernetes.api.model.SecretBuilder
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.deploy.kubernetes.config._
26+
import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, MountSmallFilesBootstrap}
27+
import org.apache.spark.util.Utils
28+
29+
private[spark] class MountSmallLocalFilesStep(
30+
sparkFiles: Seq[String],
31+
smallFilesSecretName: String,
32+
smallFilesSecretMountPath: String,
33+
mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep {
34+
35+
import MountSmallLocalFilesStep._
36+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
37+
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles).map(new File(_))
38+
val totalSizeBytes = localFiles.map(_.length()).sum
39+
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
40+
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,
41+
s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING" +
42+
s" if you do not use a resource staging server. The total size of all submitted local" +
43+
s" files is $totalSizeBytesString. Please install a resource staging server and configure" +
44+
s" your application to use it via ${RESOURCE_STAGING_SERVER_URI.key}")
45+
val localFileBase64Contents = localFiles.map { file =>
46+
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
47+
(file.getName, fileBase64)
48+
}.toMap
49+
val localFilesSecret = new SecretBuilder()
50+
.withNewMetadata()
51+
.withName(smallFilesSecretName)
52+
.endMetadata()
53+
.withData(localFileBase64Contents.asJava)
54+
.build()
55+
val (resolvedDriverPod, resolvedDriverContainer) =
56+
mountSmallFilesBootstrap.mountSmallFilesSecret(
57+
driverSpec.driverPod, driverSpec.driverContainer)
58+
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
59+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, smallFilesSecretName)
60+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH, smallFilesSecretMountPath)
61+
driverSpec.copy(
62+
driverPod = resolvedDriverPod,
63+
driverContainer = resolvedDriverContainer,
64+
driverSparkConf = resolvedSparkConf,
65+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(localFilesSecret))
66+
}
67+
}
68+
69+
private[spark] object MountSmallLocalFilesStep {
70+
val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240
71+
val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING =
72+
Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES)
73+
}

0 commit comments

Comments
 (0)