Skip to content

Commit 65f1048

Browse files
authored
Merge pull request apache-spark-on-k8s#248 from palantir/resync-kube
Resync with apache-spark-on-k8s upstream
2 parents 6ecc757 + a41e968 commit 65f1048

File tree

23 files changed

+546
-155
lines changed

23 files changed

+546
-155
lines changed

resource-managers/kubernetes/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@ important matters to keep in mind when developing this feature.
1414

1515
# Building Spark with Kubernetes Support
1616

17-
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile
18-
the Kubernetes core implementation module along with its dependencies:
17+
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile the Kubernetes core implementation module along with its dependencies:
1918

2019
build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
2120

21+
If this is the first time you compile the Kubernetes core implementation module, run the following command to install the dependencies and compile:
22+
23+
build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
24+
2225
To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the
2326
`kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when
2427
building Spark normally. For example, to build Spark against Hadoop 2.7 and Kubernetes:

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,22 @@ package object config extends Logging {
450450
.timeConf(TimeUnit.MINUTES)
451451
.createWithDefault(5)
452452

453+
private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET =
454+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretName")
455+
.doc("Name of the secret that should be mounted into the executor containers for" +
456+
" distributing submitted small files without the resource staging server.")
457+
.internal()
458+
.stringConf
459+
.createOptional
460+
461+
private[spark] val EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH =
462+
ConfigBuilder("spark.kubernetes.mountdependencies.smallfiles.executor.secretMountPath")
463+
.doc(s"Mount path in the executors for the secret given by" +
464+
s" ${EXECUTOR_SUBMITTED_SMALL_FILES_SECRET.key}")
465+
.internal()
466+
.stringConf
467+
.createOptional
468+
453469
private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP =
454470
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname")
455471
.doc("Name of the config map to use in the init-container that retrieves submitted files" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ package object constants {
7070
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
7171
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
7272
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
73+
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
7374

7475
// Bootstrapping dependencies with the init-container
7576
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
@@ -92,6 +93,9 @@ package object constants {
9293
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
9394
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
9495

96+
// Bootstrapping dependencies via a secret
97+
private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files"
98+
9599
// Miscellaneous
96100
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"
97101
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep}
23+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep}
2424
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.launcher.SparkLauncher
2626
import org.apache.spark.util.Utils
@@ -99,40 +99,77 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9999
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
100100
case _ => Option.empty[DriverConfigurationStep]
101101
}
102-
val initContainerBootstrapStep = if ((sparkJars ++ sparkFiles).exists { uri =>
103-
Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local"
104-
}) {
105-
val initContainerConfigurationStepsOrchestrator =
106-
new InitContainerConfigurationStepsOrchestrator(
107-
namespace,
108-
kubernetesResourceNamePrefix,
109-
sparkJars,
102+
103+
val (localFilesDownloadPath, submittedDependenciesBootstrapSteps) =
104+
if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFiles)) {
105+
val (submittedLocalFilesDownloadPath,
106+
sparkFilesResolvedFromInitContainer,
107+
mountSmallFilesWithoutInitContainerStep) =
108+
// If the resource staging server is specified, submit all local files through that.
109+
submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI).map { _ =>
110+
(filesDownloadPath, sparkFiles, Option.empty[DriverConfigurationStep])
111+
}.getOrElse {
112+
// Else - use a small files bootstrap that submits the local files via a secret.
113+
// Then, indicate to the outer block that the init-container should not handle
114+
// those local files simply by filtering them out.
115+
val sparkFilesWithoutLocal = KubernetesFileUtils.getNonSubmitterLocalFiles(sparkFiles)
116+
val smallFilesSecretName = s"${kubernetesAppId}-submitted-files"
117+
val mountSmallFilesBootstrap = new MountSmallFilesBootstrapImpl(
118+
smallFilesSecretName, MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH)
119+
val mountSmallLocalFilesStep = new MountSmallLocalFilesStep(
110120
sparkFiles,
111-
jarsDownloadPath,
112-
filesDownloadPath,
113-
dockerImagePullPolicy,
114-
allDriverLabels,
121+
smallFilesSecretName,
122+
MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
123+
mountSmallFilesBootstrap)
124+
(MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH,
125+
sparkFilesWithoutLocal.toArray,
126+
Some(mountSmallLocalFilesStep))
127+
}
128+
129+
val initContainerBootstrapStep =
130+
if (areAnyFilesNonContainerLocal(sparkJars ++ sparkFilesResolvedFromInitContainer)) {
131+
val initContainerConfigurationStepsOrchestrator =
132+
new InitContainerConfigurationStepsOrchestrator(
133+
namespace,
134+
kubernetesResourceNamePrefix,
135+
sparkJars,
136+
sparkFilesResolvedFromInitContainer,
137+
jarsDownloadPath,
138+
filesDownloadPath,
139+
dockerImagePullPolicy,
140+
allDriverLabels,
141+
initContainerConfigMapName,
142+
INIT_CONTAINER_CONFIG_MAP_KEY,
143+
submissionSparkConf)
144+
val initContainerConfigurationSteps =
145+
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
146+
Some(new InitContainerBootstrapStep(initContainerConfigurationSteps,
115147
initContainerConfigMapName,
116-
INIT_CONTAINER_CONFIG_MAP_KEY,
117-
submissionSparkConf)
118-
val initContainerConfigurationSteps =
119-
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
120-
Some(new InitContainerBootstrapStep(initContainerConfigurationSteps,
121-
initContainerConfigMapName,
122-
INIT_CONTAINER_CONFIG_MAP_KEY))
148+
INIT_CONTAINER_CONFIG_MAP_KEY))
149+
} else Option.empty[DriverConfigurationStep]
150+
(submittedLocalFilesDownloadPath,
151+
mountSmallFilesWithoutInitContainerStep.toSeq ++
152+
initContainerBootstrapStep.toSeq)
123153
} else {
124-
Option.empty[DriverConfigurationStep]
154+
(filesDownloadPath, Seq.empty[DriverConfigurationStep])
125155
}
126156
val dependencyResolutionStep = new DependencyResolutionStep(
127157
sparkJars,
128158
sparkFiles,
129159
jarsDownloadPath,
130-
filesDownloadPath)
160+
localFilesDownloadPath)
131161
Seq(
132162
initialSubmissionStep,
133163
kubernetesCredentialsStep,
134164
dependencyResolutionStep) ++
135-
initContainerBootstrapStep.toSeq ++
165+
submittedDependenciesBootstrapSteps ++
136166
pythonStep.toSeq
137167
}
168+
169+
private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
170+
files.exists { uri =>
171+
Option(Utils.resolveURI(uri).getScheme).getOrElse("file") != "local"
172+
}
173+
}
174+
138175
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
private[spark] trait MountSmallFilesBootstrap {
24+
def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container)
25+
}
26+
27+
private[spark] class MountSmallFilesBootstrapImpl(
28+
secretName: String, secretMountPath: String) extends MountSmallFilesBootstrap {
29+
def mountSmallFilesSecret(pod: Pod, container: Container): (Pod, Container) = {
30+
val resolvedPod = new PodBuilder(pod)
31+
.editOrNewSpec()
32+
.addNewVolume()
33+
.withName("submitted-files")
34+
.withNewSecret()
35+
.withSecretName(secretName)
36+
.endSecret()
37+
.endVolume()
38+
.endSpec()
39+
.build()
40+
val resolvedContainer = new ContainerBuilder(container)
41+
.addNewEnv()
42+
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
43+
.withValue(secretMountPath)
44+
.endEnv()
45+
.addNewVolumeMount()
46+
.withName("submitted-files")
47+
.withMountPath(secretMountPath)
48+
.endVolumeMount()
49+
.build()
50+
(resolvedPod, resolvedContainer)
51+
}
52+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DependencyResolutionStep.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder
2323

2424
import org.apache.spark.deploy.kubernetes.constants._
2525
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
26+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStep
2627
import org.apache.spark.util.Utils
2728

2829
/**
@@ -36,11 +37,12 @@ private[spark] class DependencyResolutionStep(
3637
sparkJars: Seq[String],
3738
sparkFiles: Seq[String],
3839
jarsDownloadPath: String,
39-
filesDownloadPath: String) extends DriverConfigurationStep {
40+
localFilesDownloadPath: String) extends DriverConfigurationStep {
4041

4142
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
4243
val resolvedSparkJars = KubernetesFileUtils.resolveSubmittedUris(sparkJars, jarsDownloadPath)
43-
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(sparkFiles, filesDownloadPath)
44+
val resolvedSparkFiles = KubernetesFileUtils.resolveSubmittedUris(
45+
sparkFiles, localFilesDownloadPath)
4446
val sparkConfResolvedSparkDependencies = driverSpec.driverSparkConf.clone()
4547
if (resolvedSparkJars.nonEmpty) {
4648
sparkConfResolvedSparkDependencies.set("spark.jars", resolvedSparkJars.mkString(","))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/InitContainerBootstrapStep.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,32 @@ private[spark] class InitContainerBootstrapStep(
3333

3434
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
3535
var currentInitContainerSpec = InitContainerSpec(
36-
initContainerProperties = Map.empty[String, String],
37-
additionalDriverSparkConf = Map.empty[String, String],
38-
initContainer = new ContainerBuilder().build(),
39-
driverContainer = driverSpec.driverContainer,
40-
podToInitialize = driverSpec.driverPod,
41-
initContainerDependentResources = Seq.empty[HasMetadata])
36+
initContainerProperties = Map.empty[String, String],
37+
additionalDriverSparkConf = Map.empty[String, String],
38+
initContainer = new ContainerBuilder().build(),
39+
driverContainer = driverSpec.driverContainer,
40+
podToInitialize = driverSpec.driverPod,
41+
initContainerDependentResources = Seq.empty[HasMetadata])
4242
for (nextStep <- initContainerConfigurationSteps) {
4343
currentInitContainerSpec = nextStep.configureInitContainer(currentInitContainerSpec)
4444
}
4545
val configMap = PropertiesConfigMapFromScalaMapBuilder.buildConfigMap(
46-
initContainerConfigMapName,
47-
initContainerConfigMapKey,
48-
currentInitContainerSpec.initContainerProperties)
46+
initContainerConfigMapName,
47+
initContainerConfigMapKey,
48+
currentInitContainerSpec.initContainerProperties)
4949
val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone()
50-
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName)
51-
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey)
52-
.setAll(currentInitContainerSpec.additionalDriverSparkConf)
50+
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP, initContainerConfigMapName)
51+
.set(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY, initContainerConfigMapKey)
52+
.setAll(currentInitContainerSpec.additionalDriverSparkConf)
5353
val resolvedDriverPod = InitContainerUtil.appendInitContainer(
54-
currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer)
54+
currentInitContainerSpec.podToInitialize, currentInitContainerSpec.initContainer)
5555
driverSpec.copy(
56-
driverPod = resolvedDriverPod,
57-
driverContainer = currentInitContainerSpec.driverContainer,
58-
driverSparkConf = resolvedDriverSparkConf,
59-
otherKubernetesResources =
60-
driverSpec.otherKubernetesResources ++
61-
currentInitContainerSpec.initContainerDependentResources ++
62-
Seq(configMap))
56+
driverPod = resolvedDriverPod,
57+
driverContainer = currentInitContainerSpec.driverContainer,
58+
driverSparkConf = resolvedDriverSparkConf,
59+
otherKubernetesResources =
60+
driverSpec.otherKubernetesResources ++
61+
currentInitContainerSpec.initContainerDependentResources ++
62+
Seq(configMap))
6363
}
6464
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import java.io.File
20+
21+
import com.google.common.io.{BaseEncoding, Files}
22+
import io.fabric8.kubernetes.api.model.SecretBuilder
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.deploy.kubernetes.config._
26+
import org.apache.spark.deploy.kubernetes.submit.{KubernetesFileUtils, MountSmallFilesBootstrap}
27+
import org.apache.spark.util.Utils
28+
29+
private[spark] class MountSmallLocalFilesStep(
30+
sparkFiles: Seq[String],
31+
smallFilesSecretName: String,
32+
smallFilesSecretMountPath: String,
33+
mountSmallFilesBootstrap: MountSmallFilesBootstrap) extends DriverConfigurationStep {
34+
35+
import MountSmallLocalFilesStep._
36+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
37+
val localFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles).map(new File(_))
38+
val totalSizeBytes = localFiles.map(_.length()).sum
39+
val totalSizeBytesString = Utils.bytesToString(totalSizeBytes)
40+
require(totalSizeBytes < MAX_SECRET_BUNDLE_SIZE_BYTES,
41+
s"Total size of all files submitted must be less than $MAX_SECRET_BUNDLE_SIZE_BYTES_STRING" +
42+
s" if you do not use a resource staging server. The total size of all submitted local" +
43+
s" files is $totalSizeBytesString. Please install a resource staging server and configure" +
44+
s" your application to use it via ${RESOURCE_STAGING_SERVER_URI.key}")
45+
val localFileBase64Contents = localFiles.map { file =>
46+
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
47+
(file.getName, fileBase64)
48+
}.toMap
49+
val localFilesSecret = new SecretBuilder()
50+
.withNewMetadata()
51+
.withName(smallFilesSecretName)
52+
.endMetadata()
53+
.withData(localFileBase64Contents.asJava)
54+
.build()
55+
val (resolvedDriverPod, resolvedDriverContainer) =
56+
mountSmallFilesBootstrap.mountSmallFilesSecret(
57+
driverSpec.driverPod, driverSpec.driverContainer)
58+
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
59+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET, smallFilesSecretName)
60+
.set(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET_MOUNT_PATH, smallFilesSecretMountPath)
61+
driverSpec.copy(
62+
driverPod = resolvedDriverPod,
63+
driverContainer = resolvedDriverContainer,
64+
driverSparkConf = resolvedSparkConf,
65+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(localFilesSecret))
66+
}
67+
}
68+
69+
private[spark] object MountSmallLocalFilesStep {
70+
val MAX_SECRET_BUNDLE_SIZE_BYTES = 10240
71+
val MAX_SECRET_BUNDLE_SIZE_BYTES_STRING =
72+
Utils.bytesToString(MAX_SECRET_BUNDLE_SIZE_BYTES)
73+
}

0 commit comments

Comments
 (0)