Skip to content

Commit 49932d6

Browse files
mccheahash211
authored andcommitted
Mount emptyDir volumes for temporary directories on executors in static allocation mode (rebased) (apache-spark-on-k8s#522)
* Use emptyDir volume mounts for executor local directories. * Mount local dirs in the driver. Remove shuffle dir configuration. * Arrange imports * Fix style and integration tests. * Add TODO note for volume types to change. * Add unit test and extra documentation. * Fix existing unit tests and add tests for empty dir volumes * Remove extraneous constant
1 parent 887fdce commit 49932d6

File tree

15 files changed

+423
-93
lines changed

15 files changed

+423
-93
lines changed

conf/kubernetes-shuffle-service.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ spec:
3232
volumes:
3333
- name: temp-volume
3434
hostPath:
35-
path: '/tmp' # change this path according to your cluster configuration.
35+
path: '/tmp/spark-local' # change this path according to your cluster configuration.
3636
containers:
3737
- name: shuffle
3838
# This is an official image that is built
@@ -41,7 +41,7 @@ spec:
4141
image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0
4242
imagePullPolicy: IfNotPresent
4343
volumeMounts:
44-
- mountPath: '/tmp'
44+
- mountPath: '/tmp/spark-local'
4545
name: temp-volume
4646
# more volumes can be mounted here.
4747
# The spark job must be configured to use these

docs/running-on-kubernetes.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ Below is an example submission:
222222
local:///opt/spark/examples/src/main/python/pi.py 100
223223
```
224224

225-
## Dynamic Executor Scaling
225+
## Dynamic Allocation in Kubernetes
226226

227227
Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
228228
an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
@@ -245,6 +245,7 @@ the command may then look like the following:
245245
--class org.apache.spark.examples.GroupByTest \
246246
--master k8s://<k8s-master>:<port> \
247247
--kubernetes-namespace default \
248+
--conf spark.local.dir=/tmp/spark-local
248249
--conf spark.app.name=group-by-test \
249250
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
250251
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
@@ -254,6 +255,14 @@ the command may then look like the following:
254255
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.2.0" \
255256
local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.3.0.jar 10 400000 2
256257

258+
The external shuffle service has to mount directories that can be shared with the executor pods. The provided example
259+
YAML spec mounts a hostPath volume to the external shuffle service pods, but these hostPath volumes must also be mounted
260+
into the executors. When using the external shuffle service, the directories specified in the `spark.local.dir`
261+
configuration are mounted as hostPath volumes into all of the executor containers. To ensure that one does not
262+
accidentally mount the incorrect hostPath volumes, the value of `spark.local.dir` must be specified in your
263+
application's configuration when using Kubernetes, even though it defaults to the JVM's temporary directory when using
264+
other cluster managers.
265+
257266
## Advanced
258267

259268
### Securing the Resource Staging Server with TLS

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,6 @@ package object config extends Logging {
157157
.stringConf
158158
.createOptional
159159

160-
private[spark] val KUBERNETES_SHUFFLE_DIR =
161-
ConfigBuilder("spark.kubernetes.shuffle.dir")
162-
.doc("Path to the shared shuffle directories.")
163-
.stringConf
164-
.createOptional
165-
166160
private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI =
167161
ConfigBuilder("spark.kubernetes.shuffle.apiServer.url")
168162
.doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,5 @@ package object constants {
102102
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
103103
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
104104
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L
105+
private[spark] val GENERATED_LOCAL_DIR_MOUNT_ROOT = "/mnt/tmp/spark-local"
105106
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
2323
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
2424
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
25+
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
2526
import org.apache.spark.launcher.SparkLauncher
2627
import org.apache.spark.util.{SystemClock, Utils}
2728

@@ -104,6 +105,9 @@ private[spark] class DriverConfigurationStepsOrchestrator(
104105
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
105106
submissionSparkConf, kubernetesResourceNamePrefix)
106107

108+
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
109+
submissionSparkConf)
110+
107111
val pythonStep = mainAppResource match {
108112
case PythonMainAppResource(mainPyResource) =>
109113
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
@@ -181,7 +185,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
181185
initialSubmissionStep,
182186
driverAddressStep,
183187
kubernetesCredentialsStep,
184-
dependencyResolutionStep) ++
188+
dependencyResolutionStep,
189+
localDirectoryMountConfigurationStep) ++
185190
submittedDependenciesBootstrapSteps ++
186191
pythonStep.toSeq ++
187192
mountSecretsStep.toSeq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps
18+
19+
import java.nio.file.Paths
20+
import java.util.UUID
21+
22+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMountBuilder}
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.deploy.k8s.constants._
26+
27+
/**
28+
* Configures local directories that the driver and executors should use for temporary storage.
29+
*
30+
* Note that we have different semantics for scratch space in Kubernetes versus the other cluster
31+
* managers. In Kubernetes, we cannot allow the local directories to resolve to the Java temporary
32+
* directory. This is because we will mount either emptyDir volumes for both the driver and
33+
* executors, or hostPath volumes for the executors and an emptyDir for the driver. In either
34+
* case, the mount paths need to be directories that do not exist in the base container images.
35+
* But the Java temporary directory is typically a directory like /tmp which exists in most
36+
* container images.
37+
*
38+
* The solution is twofold:
39+
* - When not using an external shuffle service, a reasonable default is to create a new directory
40+
* with a random name and set that to be the value of `spark.local.dir`.
41+
* - When using the external shuffle service, it is risky to assume that the user intends to mount
42+
* the JVM temporary directory into the pod as a hostPath volume. We therefore enforce that
43+
* spark.local.dir must be set in dynamic allocation mode so that the user explicitly sets the
44+
* paths that have to be mounted.
45+
*/
46+
private[spark] class LocalDirectoryMountConfigurationStep(
47+
submissionSparkConf: SparkConf,
48+
randomDirProvider: () => String = () => s"spark-${UUID.randomUUID()}")
49+
extends DriverConfigurationStep {
50+
51+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
52+
val configuredLocalDirs = submissionSparkConf.getOption("spark.local.dir")
53+
val isUsingExternalShuffle = submissionSparkConf.get(
54+
org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)
55+
val resolvedLocalDirsSingleString = if (isUsingExternalShuffle) {
56+
require(configuredLocalDirs.isDefined, "spark.local.dir must be provided explicitly when" +
57+
" using the external shuffle service in Kubernetes. These directories should map to" +
58+
" the paths that are mounted into the external shuffle service pods.")
59+
configuredLocalDirs.get
60+
} else {
61+
// If we don't use the external shuffle service, local directories should be randomized if
62+
// not provided.
63+
configuredLocalDirs.getOrElse(s"$GENERATED_LOCAL_DIR_MOUNT_ROOT/${randomDirProvider()}")
64+
}
65+
val resolvedLocalDirs = resolvedLocalDirsSingleString.split(",")
66+
// It's worth noting that we always use an emptyDir volume for the directories on the driver,
67+
// because the driver does not need a hostPath to share its scratch space with any other pod.
68+
// The driver itself will decide on whether to use a hostPath volume or an emptyDir volume for
69+
// these directories on the executors. (see ExecutorPodFactory and
70+
// KubernetesExternalClusterManager)
71+
val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) =>
72+
new VolumeBuilder()
73+
.withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}")
74+
.withNewEmptyDir().endEmptyDir()
75+
.build()
76+
}
77+
val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map {
78+
case (volume, path) =>
79+
new VolumeMountBuilder()
80+
.withName(volume.getName)
81+
.withMountPath(path)
82+
.build()
83+
}
84+
val resolvedDriverSparkConf = driverSpec.driverSparkConf.clone().set(
85+
"spark.local.dir", resolvedLocalDirsSingleString)
86+
driverSpec.copy(
87+
driverPod = new PodBuilder(driverSpec.driverPod)
88+
.editSpec()
89+
.addToVolumes(localDirVolumes: _*)
90+
.endSpec()
91+
.build(),
92+
driverContainer = new ContainerBuilder(driverSpec.driverContainer)
93+
.addToVolumeMounts(localDirVolumeMounts: _*)
94+
.build(),
95+
driverSparkConf = resolvedDriverSparkConf
96+
)
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import java.nio.file.Paths
20+
21+
import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.util.Utils
25+
26+
private[spark] trait ExecutorLocalDirVolumeProvider {
27+
def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)]
28+
}
29+
30+
private[spark] class ExecutorLocalDirVolumeProviderImpl(
31+
sparkConf: SparkConf,
32+
kubernetesExternalShuffleManager: Option[KubernetesExternalShuffleManager])
33+
extends ExecutorLocalDirVolumeProvider {
34+
override def getExecutorLocalDirVolumesWithMounts: Seq[(Volume, VolumeMount)] = {
35+
kubernetesExternalShuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts)
36+
.getOrElse {
37+
// If we're not using the external shuffle manager, we should use emptyDir volumes for
38+
// shuffle directories since it's important for disk I/O for these directories to be
39+
// performant. If the user has not provided a local directory, instead of using the
40+
// Java temporary directory, we create one instead, because we want to avoid
41+
// mounting an emptyDir which overlaps with an existing path in the Docker image.
42+
// Java's temporary directory path is typically /tmp or a similar path, which is
43+
// likely to exist in most images.
44+
val resolvedLocalDirs = Utils.getConfiguredLocalDirs(sparkConf)
45+
val localDirVolumes = resolvedLocalDirs.zipWithIndex.map { case (dir, index) =>
46+
new VolumeBuilder()
47+
.withName(s"spark-local-dir-$index-${Paths.get(dir).getFileName.toString}")
48+
.withNewEmptyDir().endEmptyDir()
49+
.build()
50+
}
51+
val localDirVolumeMounts = localDirVolumes.zip(resolvedLocalDirs).map {
52+
case (volume, path) =>
53+
new VolumeMountBuilder()
54+
.withName(volume.getName)
55+
.withMountPath(path)
56+
.build()
57+
}
58+
localDirVolumes.zip(localDirVolumeMounts)
59+
}
60+
}
61+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.k8s
1818

19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
1920
import scala.collection.JavaConverters._
2021

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
22-
2322
import org.apache.spark.{SparkConf, SparkException}
2423
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
2524
import org.apache.spark.deploy.k8s.config._
@@ -46,7 +45,7 @@ private[spark] class ExecutorPodFactoryImpl(
4645
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
4746
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
4847
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
49-
shuffleManager: Option[KubernetesExternalShuffleManager])
48+
executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider)
5049
extends ExecutorPodFactory {
5150

5251
import ExecutorPodFactoryImpl._
@@ -175,9 +174,8 @@ private[spark] class ExecutorPodFactoryImpl(
175174
.withContainerPort(port._2)
176175
.build()
177176
})
178-
val shuffleVolumesWithMounts =
179-
shuffleManager.map(_.getExecutorShuffleDirVolumesWithMounts)
180-
.getOrElse(Seq.empty)
177+
val shuffleVolumesWithMounts = executorLocalDirVolumeProvider
178+
.getExecutorLocalDirVolumesWithMounts
181179

182180
val executorContainer = new ContainerBuilder()
183181
.withName(s"executor")
@@ -262,6 +260,7 @@ private[spark] class ExecutorPodFactoryImpl(
262260
val executorPodWithNodeAffinity =
263261
nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful(
264262
executorPodWithInitContainer, nodeToLocalTaskCount)
263+
265264
new PodBuilder(executorPodWithNodeAffinity)
266265
.editSpec()
267266
.addToContainers(initBootstrappedExecutorContainer)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,25 +113,28 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
113113
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
114114
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
115115

116-
val kubernetesShuffleManager = if (Utils.isDynamicAllocationEnabled(sparkConf)) {
116+
val kubernetesShuffleManager = if (sparkConf.get(
117+
org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) {
117118
val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl(
118-
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
119-
sc.env.securityManager,
120-
sc.env.securityManager.isAuthenticationEnabled())
119+
SparkTransportConf.fromSparkConf(sparkConf, "shuffle"),
120+
sc.env.securityManager,
121+
sc.env.securityManager.isAuthenticationEnabled())
121122
Some(new KubernetesExternalShuffleManagerImpl(
122-
sparkConf,
123-
kubernetesClient,
124-
kubernetesExternalShuffleClient))
123+
sparkConf,
124+
kubernetesClient,
125+
kubernetesExternalShuffleClient))
125126
} else None
126127

128+
val executorLocalDirVolumeProvider = new ExecutorLocalDirVolumeProviderImpl(
129+
sparkConf, kubernetesShuffleManager)
127130
val executorPodFactory = new ExecutorPodFactoryImpl(
128131
sparkConf,
129132
NodeAffinityExecutorPodModifierImpl,
130133
mountSecretBootstrap,
131134
mountSmallFilesBootstrap,
132135
executorInitContainerBootstrap,
133136
executorInitContainerSecretVolumePlugin,
134-
kubernetesShuffleManager)
137+
executorLocalDirVolumeProvider)
135138
val allocatorExecutor = ThreadUtils
136139
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
137140
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
6767
s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
6868
}
6969
private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337)
70-
private val shuffleDirs = sparkConf.get(KUBERNETES_SHUFFLE_DIR).map {
71-
_.split(",")
72-
}.getOrElse(Utils.getConfiguredLocalDirs(sparkConf))
70+
private val shuffleDirs = Utils.getConfiguredLocalDirs(sparkConf)
7371
private var shufflePodCache = scala.collection.mutable.Map[String, String]()
7472
private var watcher: Watch = _
7573

@@ -140,6 +138,12 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
140138
}
141139

142140
override def getExecutorShuffleDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
141+
// TODO: Using hostPath for the local directory will also make it such that the
142+
// other uses of the local directory - broadcasting and caching - will also write
143+
// to the directory that the shuffle service is aware of. It would be better for
144+
// these directories to be separate so that the lifetime of the non-shuffle scratch
145+
// space is tied to an emptyDir instead of the hostPath. This requires a change in
146+
// core Spark as well.
143147
shuffleDirs.zipWithIndex.map {
144148
case (shuffleDir, shuffleDirIndex) =>
145149
val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"

0 commit comments

Comments
 (0)