Skip to content

Commit 2223db0

Browse files
mccheahrobert3005
authored andcommitted
Support mounting files with --files using secrets. (apache-spark-on-k8s#327)
1 parent 1010acc commit 2223db0

File tree

14 files changed

+442
-58
lines changed

14 files changed

+442
-58
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,30 @@ private[spark] object Config extends Logging {
141141
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
142142
.createWithDefaultString("1s")
143143

144+
val FILES_DOWNLOAD_LOCATION =
145+
ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
146+
.doc("Location to download files to in the driver and executors. When using " +
147+
"spark-submit, this directory must be empty and will be mounted as an empty directory " +
148+
"volume on the driver and executor pods.")
149+
.stringConf
150+
.createWithDefault("/var/spark-data/spark-files")
151+
152+
val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET =
153+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName")
154+
.doc("Name of the secret that should be mounted into the executor containers for" +
155+
" distributing submitted small files without the resource staging server.")
156+
.internal()
157+
.stringConf
158+
.createOptional
159+
160+
val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH =
161+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath")
162+
.doc(s"Mount path in the executors for the secret given by" +
163+
s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}")
164+
.internal()
165+
.stringConf
166+
.createOptional
167+
144168
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
145169
"spark.kubernetes.authenticate.submission"
146170

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ private[spark] object Constants {
6464
val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
6565
val ENV_CLASSPATH = "SPARK_CLASSPATH"
6666
val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
67+
val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
68+
val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
69+
val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
70+
6771
val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
6872
// Spark app configs for containers
6973
val SPARK_CONF_VOLUME = "spark-conf-volume"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19+
import java.nio.file.Paths
20+
1921
import org.apache.spark.SparkConf
2022
import org.apache.spark.util.Utils
2123

@@ -42,19 +44,41 @@ private[spark] object KubernetesUtils {
4244
/**
4345
* For the given collection of file URIs, resolves them as follows:
4446
* - File URIs with scheme local:// resolve to just the path of the URI.
45-
* - Otherwise, the URIs are returned as-is.
47+
* - Otherwise, the URIs are returned resolved to the downloaded path.
4648
*/
47-
def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
49+
def resolveFileUrisAndPath(
50+
fileUris: Iterable[String], downloadPath: String): Iterable[String] = {
4851
fileUris.map { uri =>
49-
resolveFileUri(uri)
52+
resolveFileUri(uri, downloadPath)
53+
}
54+
}
55+
56+
/**
57+
* Get from a given collection of file URIs the ones that represent submitter-local files.
58+
*/
59+
def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = {
60+
uris.filter { uri =>
61+
Utils.resolveURI(uri).getScheme == "file"
62+
}
63+
}
64+
65+
def resolveLocalFile(uri: String): String = {
66+
val fileUri = Utils.resolveURI(uri)
67+
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
68+
fileScheme match {
69+
case "local" => fileUri.getPath
70+
case _ => uri
5071
}
5172
}
5273

53-
private def resolveFileUri(uri: String): String = {
74+
private def resolveFileUri(uri: String, downloadPath: String): String = {
5475
val fileUri = Utils.resolveURI(uri)
5576
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
5677
fileScheme match {
5778
case "local" => fileUri.getPath
79+
case "file" =>
80+
val fileName = Paths.get(fileUri.getPath).toFile.getName
81+
s"$downloadPath/$fileName"
5882
case _ => uri
5983
}
6084
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
import org.apache.spark.deploy.k8s.Constants._
22+
23+
private[spark] class MountSmallFilesBootstrap(
24+
secretName: String, secretMountPath: String) {
25+
def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = {
26+
val resolvedPod = new PodBuilder(pod)
27+
.editOrNewSpec()
28+
.addNewVolume()
29+
.withName("submitted-files")
30+
.withNewSecret()
31+
.withSecretName(secretName)
32+
.endSecret()
33+
.endVolume()
34+
.endSpec()
35+
.build()
36+
val resolvedContainer = new ContainerBuilder(container)
37+
.addNewEnv()
38+
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
39+
.withValue(secretMountPath)
40+
.endEnv()
41+
.addNewVolumeMount()
42+
.withName("submitted-files")
43+
.withMountPath(secretMountPath)
44+
.endVolumeMount()
45+
.build()
46+
(resolvedPod, resolvedContainer)
47+
}
48+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
package org.apache.spark.deploy.k8s.submit
1818

1919
import org.apache.spark.{SparkConf, SparkException}
20-
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
20+
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap, MountSmallFilesBootstrap}
2121
import org.apache.spark.deploy.k8s.Config._
2222
import org.apache.spark.deploy.k8s.Constants._
2323
import org.apache.spark.deploy.k8s.submit.steps._
24+
import org.apache.spark.deploy.k8s.submit.submitsteps.DriverMountLocalFilesStep
2425
import org.apache.spark.launcher.SparkLauncher
25-
import org.apache.spark.util.SystemClock
26-
import org.apache.spark.util.Utils
26+
import org.apache.spark.util.{SystemClock, Utils}
2727

2828
/**
2929
* Figures out and returns the complete ordered list of needed DriverConfigurationSteps to
@@ -104,15 +104,23 @@ private[spark] class DriverConfigOrchestrator(
104104
.getOrElse(Array.empty[String])
105105

106106
// TODO(SPARK-23153): remove once submission client local dependencies are supported.
107-
if (existSubmissionLocalFiles(sparkJars) || existSubmissionLocalFiles(sparkFiles)) {
107+
if (existSubmissionLocalFiles(sparkJars)) {
108108
throw new SparkException("The Kubernetes mode does not yet support referencing application " +
109109
"dependencies in the local file system.")
110110
}
111111

112+
val mountLocalFilesStep = if (existSubmissionLocalFiles(sparkFiles)) {
113+
val localFilesSecretName = s"$kubernetesResourceNamePrefix-submitted-files"
114+
Seq(new DriverMountLocalFilesStep(
115+
KubernetesUtils.getOnlySubmitterLocalFiles(sparkFiles),
116+
localFilesSecretName,
117+
sparkConf.get(FILES_DOWNLOAD_LOCATION),
118+
new MountSmallFilesBootstrap(localFilesSecretName, sparkConf.get(FILES_DOWNLOAD_LOCATION))
119+
))
120+
} else Nil
121+
112122
val dependencyResolutionStep = if (sparkJars.nonEmpty || sparkFiles.nonEmpty) {
113-
Seq(new DependencyResolutionStep(
114-
sparkJars,
115-
sparkFiles))
123+
Seq(new DependencyResolutionStep(sparkJars, sparkFiles))
116124
} else {
117125
Nil
118126
}
@@ -127,6 +135,7 @@ private[spark] class DriverConfigOrchestrator(
127135
initialSubmissionStep,
128136
serviceBootstrapStep,
129137
kubernetesCredentialsStep) ++
138+
mountLocalFilesStep ++
130139
dependencyResolutionStep ++
131140
mountSecretsStep
132141
}
@@ -139,7 +148,8 @@ private[spark] class DriverConfigOrchestrator(
139148

140149
private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
141150
files.exists { uri =>
142-
Utils.resolveURI(uri).getScheme != "local"
151+
val resolvedUri = Utils.resolveURI(uri)
152+
resolvedUri.getScheme != "local" && resolvedUri.getScheme != "file"
143153
}
144154
}
145155
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit.steps
1818

19-
import java.io.File
20-
21-
import io.fabric8.kubernetes.api.model.ContainerBuilder
22-
23-
import org.apache.spark.deploy.k8s.Constants._
19+
import org.apache.spark.deploy.k8s.Config._
2420
import org.apache.spark.deploy.k8s.KubernetesUtils
2521
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
2622

@@ -33,29 +29,19 @@ private[spark] class DependencyResolutionStep(
3329
sparkFiles: Seq[String]) extends DriverConfigurationStep {
3430

3531
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
36-
val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars)
37-
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles)
38-
3932
val sparkConf = driverSpec.driverSparkConf.clone()
40-
if (resolvedSparkJars.nonEmpty) {
41-
sparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
42-
}
33+
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
34+
sparkFiles, sparkConf.get(FILES_DOWNLOAD_LOCATION))
35+
4336
if (resolvedSparkFiles.nonEmpty) {
4437
sparkConf.set("spark.files", resolvedSparkFiles.mkString(","))
4538
}
46-
val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) {
47-
new ContainerBuilder(driverSpec.driverContainer)
48-
.addNewEnv()
49-
.withName(ENV_MOUNTED_CLASSPATH)
50-
.withValue(resolvedSparkJars.mkString(File.pathSeparator))
51-
.endEnv()
52-
.build()
53-
} else {
54-
driverSpec.driverContainer
39+
40+
if (sparkJars.nonEmpty) {
41+
sparkConf.set("spark.jars",
42+
sparkJars.map(jar => KubernetesUtils.resolveLocalFile(jar)).mkString(","))
5543
}
5644

57-
driverSpec.copy(
58-
driverContainer = resolvedDriverContainer,
59-
driverSparkConf = sparkConf)
45+
driverSpec.copy(driverSparkConf = sparkConf)
6046
}
6147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps
18+
19+
import java.io.File
20+
21+
import scala.collection.JavaConverters._
22+
23+
import com.google.common.io.{BaseEncoding, Files}
24+
import io.fabric8.kubernetes.api.model.SecretBuilder
25+
26+
import org.apache.spark.deploy.k8s.Config._
27+
import org.apache.spark.deploy.k8s.MountSmallFilesBootstrap
28+
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
29+
import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
30+
import org.apache.spark.util.Utils
31+
32+
private[spark] class DriverMountLocalFilesStep(
33+
submitterLocalFiles: Iterable[String],
34+
smallFilesSecretName: String,
35+
smallFilesSecretMountPath: String,
36+
mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep {
37+
38+
import DriverMountLocalFilesStep._
39+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
40+
val localFiles = submitterLocalFiles.map { localFileUri =>
41+
new File(Utils.resolveURI(localFileUri).getPath)
42+
}
43+
val totalSizeBytes = localFiles.map(_.length()).sum
44+
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
45+
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,
46+
s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING." +
47+
s" Total size for files ended up being $totalSizeBytesString")
48+
val localFileBase64Contents = localFiles.map { file =>
49+
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
50+
(file.getName, fileBase64)
51+
}.toMap
52+
val localFilesSecret = new SecretBuilder()
53+
.withNewMetadata()
54+
.withName(smallFilesSecretName)
55+
.endMetadata()
56+
.withData(localFileBase64Contents.asJava)
57+
.build()
58+
val (resolvedDriverPod, resolvedDriverContainer) =
59+
mountSmallFilesBootstrap.mountSmallFilesSecret(
60+
driverSpec.driverPod, driverSpec.driverContainer)
61+
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
62+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, smallFilesSecretName)
63+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH, smallFilesSecretMountPath)
64+
driverSpec.copy(
65+
driverPod = resolvedDriverPod,
66+
driverContainer = resolvedDriverContainer,
67+
driverSparkConf = resolvedSparkConf,
68+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(localFilesSecret))
69+
}
70+
}
71+
72+
private[spark] object DriverMountLocalFilesStep {
73+
val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240
74+
val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING =
75+
Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES)
76+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.{SparkConf, SparkException}
2424
import org.apache.spark.deploy.k8s.{KubernetesUtils, MountSecretsBootstrap}
2525
import org.apache.spark.deploy.k8s.Config._
2626
import org.apache.spark.deploy.k8s.Constants._
27+
import org.apache.spark.deploy.k8s.MountSmallFilesBootstrap
2728
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
2829
import org.apache.spark.util.Utils
2930

@@ -37,7 +38,8 @@ import org.apache.spark.util.Utils
3738
*/
3839
private[spark] class ExecutorPodFactory(
3940
sparkConf: SparkConf,
40-
mountSecretsBootstrap: Option[MountSecretsBootstrap]) {
41+
mountSecretsBootstrap: Option[MountSecretsBootstrap],
42+
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap]) {
4143

4244
private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)
4345

@@ -212,10 +214,15 @@ private[spark] class ExecutorPodFactory(
212214
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
213215
}.getOrElse((executorPod, containerWithLimitCores))
214216

217+
val (maybeSmallFilesMountedPod, maybeSmallFilesMountedContainer) =
218+
mountSmallFilesBootstrap.map { bootstrap =>
219+
bootstrap.mountSmallFilesSecret(
220+
maybeSecretsMountedPod, maybeSecretsMountedContainer)
221+
}.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer))
215222

216-
new PodBuilder(maybeSecretsMountedPod)
223+
new PodBuilder(maybeSmallFilesMountedPod)
217224
.editSpec()
218-
.addToContainers(maybeSecretsMountedContainer)
225+
.addToContainers(maybeSmallFilesMountedContainer)
219226
.endSpec()
220227
.build()
221228
}

0 commit comments

Comments
 (0)