Skip to content

Commit 6c46b49

Browse files
authored
Merge pull request apache-spark-on-k8s#372 from palantir/patch-spark-24137
[SPARK-24137][K8S] Mount local directories as empty dir volumes.
2 parents 9c32b5b + 2df8713 commit 6c46b49

File tree

7 files changed

+226
-13
lines changed

7 files changed

+226
-13
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,8 +454,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
454454
*/
455455
private[spark] def validateSettings() {
456456
if (contains("spark.local.dir")) {
457-
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
458-
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
457+
val msg = "Note that spark.local.dir will be overridden by the value set by " +
458+
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS" +
459+
" in YARN)."
459460
logWarning(msg)
460461
}
461462

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.nio.file.Paths
20+
import java.util.UUID
21+
22+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}
23+
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
private[spark] class LocalDirsFeatureStep(
27+
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
28+
defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}")
29+
extends KubernetesFeatureConfigStep {
30+
31+
// Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system
32+
// property - we want to instead default to mounting an emptydir volume that doesn't already
33+
// exist in the image.
34+
// We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already
35+
// a bit opinionated about YARN and Mesos.
36+
private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS"))
37+
.orElse(conf.getOption("spark.local.dir"))
38+
.getOrElse(defaultLocalDir)
39+
.split(",")
40+
41+
override def configurePod(pod: SparkPod): SparkPod = {
42+
val localDirVolumes = resolvedLocalDirs
43+
.zipWithIndex
44+
.map { case (localDir, index) =>
45+
new VolumeBuilder()
46+
.withName(s"spark-local-dir-${index + 1}")
47+
.withNewEmptyDir()
48+
.endEmptyDir()
49+
.build()
50+
}
51+
val localDirVolumeMounts = localDirVolumes
52+
.zip(resolvedLocalDirs)
53+
.map { case (localDirVolume, localDirPath) =>
54+
new VolumeMountBuilder()
55+
.withName(localDirVolume.getName)
56+
.withMountPath(localDirPath)
57+
.build()
58+
}
59+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
60+
.editSpec()
61+
.addToVolumes(localDirVolumes: _*)
62+
.endSpec()
63+
.build()
64+
val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
65+
.addNewEnv()
66+
.withName("SPARK_LOCAL_DIRS")
67+
.withValue(resolvedLocalDirs.mkString(","))
68+
.endEnv()
69+
.addToVolumeMounts(localDirVolumeMounts: _*)
70+
.build()
71+
SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
72+
}
73+
74+
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
75+
76+
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
77+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit
1919
import java.io.File
2020

2121
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils}
22-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
22+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2323
import org.apache.spark.util.Utils
2424

2525
private[spark] class KubernetesDriverBuilder(
@@ -33,17 +33,21 @@ private[spark] class KubernetesDriverBuilder(
3333
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3434
=> MountSecretsFeatureStep) =
3535
new MountSecretsFeatureStep(_),
36+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
37+
=> LocalDirsFeatureStep) =
38+
new LocalDirsFeatureStep(_),
3639
provideMountLocalFilesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
3740
=> MountLocalFilesFeatureStep) =
38-
new MountLocalFilesFeatureStep(_)) {
41+
new MountLocalFilesFeatureStep(_)) {
3942
import KubernetesDriverBuilder._
4043

4144
def buildFromFeatures(
4245
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
4346
val baseFeatures = Seq(
4447
provideBasicStep(kubernetesConf),
4548
provideCredentialsStep(kubernetesConf),
46-
provideServiceStep(kubernetesConf))
49+
provideServiceStep(kubernetesConf),
50+
provideLocalDirsStep(kubernetesConf))
4751
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
4852
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
4953
} else baseFeatures

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
20-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
20+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2121

2222
private[spark] class KubernetesExecutorBuilder(
2323
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
@@ -27,11 +27,15 @@ private[spark] class KubernetesExecutorBuilder(
2727
new MountSecretsFeatureStep(_),
2828
provideMountLocalFilesStep:
2929
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountLocalFilesFeatureStep =
30-
new MountLocalFilesFeatureStep(_)) {
30+
new MountLocalFilesFeatureStep(_),
31+
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
32+
=> LocalDirsFeatureStep =
33+
new LocalDirsFeatureStep(_)) {
3134

3235
def buildFromFeatures(
3336
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
34-
val baseFeatures = Seq(provideBasicStep(kubernetesConf))
37+
val baseFeatures = Seq(
38+
provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
3539
val withProvideSecretsStep = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
3640
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
3741
} else baseFeatures
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
20+
import org.mockito.Mockito
21+
import org.scalatest.BeforeAndAfter
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
25+
26+
class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
27+
private val defaultLocalDir = "/var/data/default-local-dir"
28+
private var sparkConf: SparkConf = _
29+
private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] = _
30+
31+
before {
32+
val realSparkConf = new SparkConf(false)
33+
sparkConf = Mockito.spy(realSparkConf)
34+
kubernetesConf = KubernetesConf(
35+
sparkConf,
36+
KubernetesDriverSpecificConf(
37+
None,
38+
"app-name",
39+
"main",
40+
Seq.empty),
41+
"resource",
42+
"app-id",
43+
None,
44+
Map.empty,
45+
Map.empty,
46+
Map.empty,
47+
Map.empty)
48+
}
49+
50+
test("Resolve to default local dir if neither env nor configuration are set") {
51+
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
52+
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
53+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
54+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
55+
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
56+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
57+
new VolumeBuilder()
58+
.withName(s"spark-local-dir-1")
59+
.withNewEmptyDir()
60+
.endEmptyDir()
61+
.build())
62+
assert(configuredPod.container.getVolumeMounts.size === 1)
63+
assert(configuredPod.container.getVolumeMounts.get(0) ===
64+
new VolumeMountBuilder()
65+
.withName(s"spark-local-dir-1")
66+
.withMountPath(defaultLocalDir)
67+
.build())
68+
assert(configuredPod.container.getEnv.size === 1)
69+
assert(configuredPod.container.getEnv.get(0) ===
70+
new EnvVarBuilder()
71+
.withName("SPARK_LOCAL_DIRS")
72+
.withValue(defaultLocalDir)
73+
.build())
74+
}
75+
76+
test("Use configured local dirs split on comma if provided.") {
77+
Mockito.doReturn("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
78+
.when(sparkConf).getenv("SPARK_LOCAL_DIRS")
79+
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
80+
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
81+
assert(configuredPod.pod.getSpec.getVolumes.size === 2)
82+
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
83+
new VolumeBuilder()
84+
.withName(s"spark-local-dir-1")
85+
.withNewEmptyDir()
86+
.endEmptyDir()
87+
.build())
88+
assert(configuredPod.pod.getSpec.getVolumes.get(1) ===
89+
new VolumeBuilder()
90+
.withName(s"spark-local-dir-2")
91+
.withNewEmptyDir()
92+
.endEmptyDir()
93+
.build())
94+
assert(configuredPod.container.getVolumeMounts.size === 2)
95+
assert(configuredPod.container.getVolumeMounts.get(0) ===
96+
new VolumeMountBuilder()
97+
.withName(s"spark-local-dir-1")
98+
.withMountPath("/var/data/my-local-dir-1")
99+
.build())
100+
assert(configuredPod.container.getVolumeMounts.get(1) ===
101+
new VolumeMountBuilder()
102+
.withName(s"spark-local-dir-2")
103+
.withMountPath("/var/data/my-local-dir-2")
104+
.build())
105+
assert(configuredPod.container.getEnv.size === 1)
106+
assert(configuredPod.container.getEnv.get(0) ===
107+
new EnvVarBuilder()
108+
.withName("SPARK_LOCAL_DIRS")
109+
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
110+
.build())
111+
}
112+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.{SparkConf, SparkFunSuite}
2525
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf}
26-
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
26+
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2727
import org.apache.spark.util.Utils
2828

2929
class KubernetesDriverBuilderSuite extends SparkFunSuite {
3030

3131
private val BASIC_STEP_TYPE = "basic"
3232
private val CREDENTIALS_STEP_TYPE = "credentials"
3333
private val SERVICE_STEP_TYPE = "service"
34+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
3435
private val SECRETS_STEP_TYPE = "mount-secrets"
3536
private val MOUNT_LOCAL_FILES_STEP_TYPE = "mount-local-files"
3637

@@ -43,6 +44,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
4344
private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
4445
SERVICE_STEP_TYPE, classOf[DriverServiceFeatureStep])
4546

47+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
48+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
49+
4650
private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
4751
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
4852

@@ -55,6 +59,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
5559
_ => credentialsStep,
5660
_ => serviceStep,
5761
_ => secretsStep,
62+
_ => localDirsStep,
5863
_ => mountLocalFilesStep)
5964

6065
test("Apply fundamental steps all the time.") {
@@ -77,7 +82,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
7782
Map.empty,
7883
BASIC_STEP_TYPE,
7984
CREDENTIALS_STEP_TYPE,
80-
SERVICE_STEP_TYPE)
85+
SERVICE_STEP_TYPE,
86+
LOCAL_DIRS_STEP_TYPE)
8187
}
8288

8389
test("Apply secrets step if secrets are present.") {
@@ -101,6 +107,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
101107
BASIC_STEP_TYPE,
102108
CREDENTIALS_STEP_TYPE,
103109
SERVICE_STEP_TYPE,
110+
LOCAL_DIRS_STEP_TYPE,
104111
SECRETS_STEP_TYPE)
105112
}
106113

@@ -134,6 +141,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
134141
BASIC_STEP_TYPE,
135142
CREDENTIALS_STEP_TYPE,
136143
SERVICE_STEP_TYPE,
144+
LOCAL_DIRS_STEP_TYPE,
137145
MOUNT_LOCAL_FILES_STEP_TYPE)
138146
}
139147

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,28 @@ import io.fabric8.kubernetes.api.model.PodBuilder
2020

2121
import org.apache.spark.{SparkConf, SparkFunSuite}
2222
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
23-
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
23+
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountLocalFilesFeatureStep, MountSecretsFeatureStep}
2424

2525
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
2626
private val BASIC_STEP_TYPE = "basic"
2727
private val SECRETS_STEP_TYPE = "mount-secrets"
2828
private val MOUNT_LOCAL_FILES_STEP_TYPE = "mount-local-files"
29+
private val LOCAL_DIRS_STEP_TYPE = "local-dirs"
2930

3031
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3132
BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep])
3233
private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3334
SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep])
3435
private val mountLocalFilesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
3536
MOUNT_LOCAL_FILES_STEP_TYPE, classOf[MountLocalFilesFeatureStep])
37+
private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
38+
LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep])
3639

3740
private val builderUnderTest = new KubernetesExecutorBuilder(
3841
_ => basicFeatureStep,
3942
_ => mountSecretsStep,
40-
_ => mountLocalFilesStep)
43+
_ => mountLocalFilesStep,
44+
_ => localDirsStep)
4145

4246
test("Basic steps are consistently applied.") {
4347
val conf = KubernetesConf(
@@ -51,7 +55,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
5155
Map.empty,
5256
Map.empty,
5357
Map.empty)
54-
validateStepTypesApplied(builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE)
58+
validateStepTypesApplied(
59+
builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE)
5560
}
5661

5762
test("Apply secrets step if secrets are present.") {
@@ -69,6 +74,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
6974
validateStepTypesApplied(
7075
builderUnderTest.buildFromFeatures(conf),
7176
BASIC_STEP_TYPE,
77+
LOCAL_DIRS_STEP_TYPE,
7278
SECRETS_STEP_TYPE)
7379
}
7480

@@ -87,6 +93,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
8793
validateStepTypesApplied(
8894
builderUnderTest.buildFromFeatures(conf),
8995
BASIC_STEP_TYPE,
96+
LOCAL_DIRS_STEP_TYPE,
9097
MOUNT_LOCAL_FILES_STEP_TYPE)
9198
}
9299

0 commit comments

Comments
 (0)