Skip to content

Commit e6fa7e4

Browse files
committed
[SPARK-24137][K8S] Mount local directories as empty dir volumes.
Drastically improves performance and won't cause Spark applications to fail because they write too much data to the Docker image's specific file system. The file system's directories that back emptydir volumes are generally larger and more performant. Has been in use via the prototype version of Kubernetes support, but lost in the transition to here. Author: mcheah <[email protected]> Closes apache#21238 from mccheah/mount-local-dirs.
1 parent 9c32b5b commit e6fa7e4

File tree

7 files changed

+225
-13
lines changed

7 files changed

+225
-13
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,8 +454,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
454454
*/
455455
private[spark] def validateSettings() {
456456
if (contains("spark.local.dir")) {
457-
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
458-
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
457+
val msg = "Note that spark.local.dir will be overridden by the value set by " +
458+
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +
459+
" in YARN)."
459460
logWarning(msg)
460461
}
461462

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.nio.file.Paths
20+
import java.util.UUID
21+
22+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
23+
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
private[spark] class LocalDirsFeatureStep(
27+
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
28+
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
29+
extends KubernetesFeatureConfigStep {
30+
31+
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
32+
// property - we want to instead default to mounting an emptydir volume that doesn't already
33+
// exist in the image.
34+
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
35+
// a bit opinionated about YARN and Mesos.
36+
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
37+
.orElse(conf.getOption("spark.local.dir"))
38+
.getOrElse(defaultLocalDir)
39+
.split(",")
40+
41+
override def configurePod(pod: SparkPod): SparkPod = {
42+
val localDirVolumes = resolvedLocalDirs
43+
.zipWithIndex
44+
.map { case (localDir, index) =>
45+
new VolumeBuilder()
46+
.withName(s"spark-local-dir-${index + 1}")
47+
.withNewEmptyDir()
48+
.endEmptyDir()
49+
.build()
50+
}
51+
val localDirVolumeMounts = localDirVolumes
52+
.zip(resolvedLocalDirs)
53+
.map { case (localDirVolume, localDirPath) =>
54+
new VolumeMountBuilder()
55+
.withName(localDirVolume.getName)
56+
.withMountPath(localDirPath)
57+
.build()
58+
}
59+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
60+
.editSpec()
61+
.addToVolumes(localDirVolumes: _*)
62+
.endSpec()
63+
.build()
64+
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
65+
.addNewEnv()
66+
.withName("SPARK_LOCAL_DIRS")
67+
.withValue(resolvedLocalDirs.mkString(","))
68+
.endEnv()
69+
.addToVolumeMounts(localDirVolumeMounts: _*)
70+
.build()
71+
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
72+
}
73+
74+
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
75+
76+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
77+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit
1919
import java.io.File
2020

2121
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils}
22-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
22+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2323
import org.apache.spark.util.Utils
2424

2525
private[spark] class KubernetesDriverBuilder(
@@ -33,17 +33,20 @@ private[spark] class KubernetesDriverBuilder(
3333
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3434
=> MountSecretsFeatureStep) =
3535
new MountSecretsFeatureStep(_),
36+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
37+
=> LocalDirsFeatureStep) =
38+
new LocalDirsFeatureStep(_),
3639
provideMountLocalFilesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3740
=> MountLocalFilesFeatureStep) =
38-
new MountLocalFilesFeatureStep(_)) {
41+
new MountLocalFilesFeatureStep(_)) {
3942
import KubernetesDriverBuilder._
4043

4144
def buildFromFeatures(
4245
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
4346
val baseFeatures = Seq(
4447
provideBasicStep(kubernetesConf),
4548
provideCredentialsStep(kubernetesConf),
46-
provideServiceStep(kubernetesConf))
49+
provideLocalDirsStep(kubernetesConf))
4750
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
4851
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
4952
} else baseFeatures

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
20-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
20+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2121

2222
private[spark] class KubernetesExecutorBuilder(
2323
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
@@ -27,11 +27,15 @@ private[spark] class KubernetesExecutorBuilder(
2727
new MountSecretsFeatureStep(_),
2828
provideMountLocalFilesStep:
2929
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountLocalFilesFeatureStep =
30-
new MountLocalFilesFeatureStep(_)) {
30+
new MountLocalFilesFeatureStep(_),
31+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
32+
=> LocalDirsFeatureStep =
33+
new LocalDirsFeatureStep(_)) {
3134

3235
def buildFromFeatures(
3336
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
34-
val baseFeatures = Seq(provideBasicStep(kubernetesConf))
37+
val baseFeatures = Seq(
38+
provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
3539
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
3640
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
3741
} else baseFeatures
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
20+
import org.mockito.Mockito
21+
import org.scalatest.BeforeAndAfter
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
27+
private val defaultLocalDir = "/var/data/default-local-dir"
28+
private var sparkConf: SparkConf = _
29+
private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] = _
30+
31+
before {
32+
val realSparkConf = new SparkConf(false)
33+
sparkConf = Mockito.spy(realSparkConf)
34+
kubernetesConf = KubernetesConf(
35+
sparkConf,
36+
KubernetesDriverSpecificConf(
37+
None,
38+
"app-name",
39+
"main",
40+
Seq.empty),
41+
"resource",
42+
"app-id",
43+
None,
44+
Map.empty,
45+
Map.empty,
46+
Map.empty,
47+
Map.empty)
48+
}
49+
50+
test("Resolve to default local dir if neither env nor configuration are set") {
51+
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
52+
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
53+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
54+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
55+
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
56+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
57+
new VolumeBuilder()
58+
.withName(s"spark-local-dir-1")
59+
.withNewEmptyDir()
60+
.endEmptyDir()
61+
.build())
62+
assert(configuredPod.container.getVolumeMounts.size === 1)
63+
assert(configuredPod.container.getVolumeMounts.get(0) ===
64+
new VolumeMountBuilder()
65+
.withName(s"spark-local-dir-1")
66+
.withMountPath(defaultLocalDir)
67+
.build())
68+
assert(configuredPod.container.getEnv.size === 1)
69+
assert(configuredPod.container.getEnv.get(0) ===
70+
new EnvVarBuilder()
71+
.withName("SPARK_LOCAL_DIRS")
72+
.withValue(defaultLocalDir)
73+
.build())
74+
}
75+
76+
test("Use configured local dirs split on comma if provided.") {
77+
Mockito.doReturn("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
78+
.when(sparkConf).getenv("SPARK_LOCAL_DIRS")
79+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
80+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
81+
assert(configuredPod.pod.getSpec.getVolumes.size === 2)
82+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
83+
new VolumeBuilder()
84+
.withName(s"spark-local-dir-1")
85+
.withNewEmptyDir()
86+
.endEmptyDir()
87+
.build())
88+
assert(configuredPod.pod.getSpec.getVolumes.get(1) ===
89+
new VolumeBuilder()
90+
.withName(s"spark-local-dir-2")
91+
.withNewEmptyDir()
92+
.endEmptyDir()
93+
.build())
94+
assert(configuredPod.container.getVolumeMounts.size === 2)
95+
assert(configuredPod.container.getVolumeMounts.get(0) ===
96+
new VolumeMountBuilder()
97+
.withName(s"spark-local-dir-1")
98+
.withMountPath("/var/data/my-local-dir-1")
99+
.build())
100+
assert(configuredPod.container.getVolumeMounts.get(1) ===
101+
new VolumeMountBuilder()
102+
.withName(s"spark-local-dir-2")
103+
.withMountPath("/var/data/my-local-dir-2")
104+
.build())
105+
assert(configuredPod.container.getEnv.size === 1)
106+
assert(configuredPod.container.getEnv.get(0) ===
107+
new EnvVarBuilder()
108+
.withName("SPARK_LOCAL_DIRS")
109+
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
110+
.build())
111+
}
112+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.{SparkConf, SparkFunSuite}
2525
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
26-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
26+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2727
import org.apache.spark.util.Utils
2828

2929
class KubernetesDriverBuilderSuite extends SparkFunSuite {
3030

3131
private val BASIC_STEP_TYPE = "basic"
3232
private val CREDENTIALS_STEP_TYPE = "credentials"
3333
private val SERVICE_STEP_TYPE = "service"
34+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
3435
private val SECRETS_STEP_TYPE = "mount-secrets"
3536
private val MOUNT_LOCAL_FILES_STEP_TYPE = "mount-local-files"
3637

@@ -43,6 +44,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
4344
private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
4445
SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
4546

47+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
48+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
49+
4650
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
4751
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
4852

@@ -55,6 +59,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
5559
_ => credentialsStep,
5660
_ => serviceStep,
5761
_ => secretsStep,
62+
_ => localDirsStep,
5863
_ => mountLocalFilesStep)
5964

6065
test("Apply fundamental steps all the time.") {
@@ -77,7 +82,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
7782
Map.empty,
7883
BASIC_STEP_TYPE,
7984
CREDENTIALS_STEP_TYPE,
80-
SERVICE_STEP_TYPE)
85+
SERVICE_STEP_TYPE,
86+
LOCAL_DIRS_STEP_TYPE)
8187
}
8288

8389
test("Apply secrets step if secrets are present.") {
@@ -101,6 +107,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
101107
BASIC_STEP_TYPE,
102108
CREDENTIALS_STEP_TYPE,
103109
SERVICE_STEP_TYPE,
110+
LOCAL_DIRS_STEP_TYPE,
104111
SECRETS_STEP_TYPE)
105112
}
106113

@@ -134,6 +141,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
134141
BASIC_STEP_TYPE,
135142
CREDENTIALS_STEP_TYPE,
136143
SERVICE_STEP_TYPE,
144+
LOCAL_DIRS_STEP_TYPE,
137145
MOUNT_LOCAL_FILES_STEP_TYPE)
138146
}
139147

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,28 @@ import io.fabric8.kubernetes.api.model.PodBuilder
2020

2121
import org.apache.spark.{SparkConf, SparkFunSuite}
2222
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
23-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
23+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2424

2525
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
2626
private val BASIC_STEP_TYPE = "basic"
2727
private val SECRETS_STEP_TYPE = "mount-secrets"
2828
private val MOUNT_LOCAL_FILES_STEP_TYPE = "mount-local-files"
29+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
2930

3031
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3132
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
3233
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3334
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
3435
private val mountLocalFilesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3536
MOUNT_LOCAL_FILES_STEP_TYPE, classOf[MountLocalFilesFeatureStep])
37+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
38+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
3639

3740
private val builderUnderTest = new KubernetesExecutorBuilder(
3841
_ => basicFeatureStep,
3942
_ => mountSecretsStep,
40-
_ => mountLocalFilesStep)
43+
_ => mountLocalFilesStep,
44+
_ => localDirsStep)
4145

4246
test("Basic steps are consistently applied.") {
4347
val conf = KubernetesConf(
@@ -51,7 +55,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
5155
Map.empty,
5256
Map.empty,
5357
Map.empty)
54-
validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE)
58+
validateStepTypesApplied(
59+
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
5560
}
5661

5762
test("Apply secrets step if secrets are present.") {
@@ -69,6 +74,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
6974
validateStepTypesApplied(
7075
builderUnderTest.buildFromFeatures(conf),
7176
BASIC_STEP_TYPE,
77+
LOCAL_DIRS_STEP_TYPE,
7278
SECRETS_STEP_TYPE)
7379
}
7480

@@ -87,6 +93,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
8793
validateStepTypesApplied(
8894
builderUnderTest.buildFromFeatures(conf),
8995
BASIC_STEP_TYPE,
96+
LOCAL_DIRS_STEP_TYPE,
9097
MOUNT_LOCAL_FILES_STEP_TYPE)
9198
}
9299

0 commit comments

Comments
 (0)