Skip to content

Commit c1b7dc4

Browse files
turboFeiczxm
authored andcommitted
[SPARK-43504][K8S] Mounts the hadoop config map on the executor pod
### What changes were proposed in this pull request? In this pr, for spark on k8s, the hadoop config map will be mounted in executor side as well. Before, the hadoop config map is only mounted in driver side. ### Why are the changes needed? Since [SPARK-25815](https://issues.apache.org/jira/browse/SPARK-25815) [,](apache#22911,) the hadoop config map will not be mounted in executor side. Per the  apache#22911 description: > The main two things that don't need to happen in executors anymore are: > 1. adding the Hadoop config to the executor pods: this is not needed > since the Spark driver will serialize the Hadoop config and send > it to executors when running tasks. But in fact, the executor still need the hadoop configuration. ![image](https://github.com/apache/spark/assets/6757692/ff6374c9-7ebd-4472-a85c-99c75a737e2a) As shown in above picture, the driver can resolve `hdfs://zeus`, but the executor can not. so we still need to mount the hadoop config map in executor side. ### Does this PR introduce _any_ user-facing change? Yes, users do not need to take workarounds to make executors load the hadoop configuration. Such as: - including hadoop conf in executor image - placing hadoop conf files under `SPARK_CONF_DIR`. ### How was this patch tested? UT. Closes apache#41181 from turboFei/exec_hadoop_conf. Authored-by: fwang12 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 21e7ca6 commit c1b7dc4

File tree

4 files changed

+154
-0
lines changed

4 files changed

+154
-0
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
104104
}
105105
}
106106

107+
override def getAdditionalPodSystemProperties(): Map[String, String] = {
108+
if (hasHadoopConf) {
109+
Map(HADOOP_CONFIG_MAP_NAME -> existingConfMap.getOrElse(newConfigMapName))
110+
} else {
111+
Map.empty
112+
}
113+
}
114+
107115
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
108116
if (confDir.isDefined) {
109117
val fileMap = confFiles.map { file =>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.k8s.features
19+
20+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder}
21+
22+
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
23+
import org.apache.spark.deploy.k8s.Constants._
24+
25+
/**
26+
* Mounts the Hadoop configuration on the executor pod.
27+
*/
28+
private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesConf)
29+
extends KubernetesFeatureConfigStep {
30+
31+
private val hadoopConfigMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
32+
33+
override def configurePod(original: SparkPod): SparkPod = {
34+
original.transform { case pod if hadoopConfigMapName.isDefined =>
35+
val confVolume = new VolumeBuilder()
36+
.withName(HADOOP_CONF_VOLUME)
37+
.withNewConfigMap()
38+
.withName(hadoopConfigMapName.get)
39+
.endConfigMap()
40+
.build()
41+
42+
val podWithConf = new PodBuilder(pod.pod)
43+
.editSpec()
44+
.addNewVolumeLike(confVolume)
45+
.endVolume()
46+
.endSpec()
47+
.build()
48+
49+
val containerWithMount = new ContainerBuilder(pod.container)
50+
.addNewVolumeMount()
51+
.withName(HADOOP_CONF_VOLUME)
52+
.withMountPath(HADOOP_CONF_DIR_PATH)
53+
.endVolumeMount()
54+
.addNewEnv()
55+
.withName(ENV_HADOOP_CONF_DIR)
56+
.withValue(HADOOP_CONF_DIR_PATH)
57+
.endEnv()
58+
.build()
59+
60+
SparkPod(podWithConf, containerWithMount)
61+
}
62+
}
63+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ private[spark] class KubernetesExecutorBuilder {
7171
new MountSecretsFeatureStep(conf),
7272
new EnvSecretsFeatureStep(conf),
7373
new MountVolumesFeatureStep(conf),
74+
new HadoopConfExecutorFeatureStep(conf),
7475
new LocalDirsFeatureStep(conf)) ++ userFeatures
7576

7677
val spec = KubernetesExecutorSpec(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.k8s.features
19+
20+
import java.io.File
21+
import java.nio.charset.StandardCharsets.UTF_8
22+
23+
import com.google.common.io.Files
24+
25+
import org.apache.spark.{SparkConf, SparkFunSuite}
26+
import org.apache.spark.deploy.k8s.{Constants, KubernetesTestConf, SecretVolumeUtils, SparkPod}
27+
import org.apache.spark.deploy.k8s.Constants._
28+
import org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.containerHasEnvVar
29+
import org.apache.spark.util.{SparkConfWithEnv, Utils}
30+
31+
class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite {
32+
import SecretVolumeUtils._
33+
34+
test("SPARK-43504: mounts the hadoop config map on the executor pod") {
35+
val confDir = Utils.createTempDir()
36+
val confFiles = Set("core-site.xml", "hdfs-site.xml")
37+
38+
confFiles.foreach { f =>
39+
Files.write("some data", new File(confDir, f), UTF_8)
40+
}
41+
42+
Seq(
43+
Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()),
44+
Map.empty[String, String]).foreach { env =>
45+
val hasHadoopConf = env.contains(ENV_HADOOP_CONF_DIR)
46+
47+
val driverSparkConf = new SparkConfWithEnv(env)
48+
val executorSparkConf = new SparkConf(false)
49+
50+
val driverConf = KubernetesTestConf.createDriverConf(sparkConf = driverSparkConf)
51+
val driverStep = new HadoopConfDriverFeatureStep(driverConf)
52+
53+
val additionalPodSystemProperties = driverStep.getAdditionalPodSystemProperties()
54+
if (hasHadoopConf) {
55+
assert(additionalPodSystemProperties.contains(Constants.HADOOP_CONFIG_MAP_NAME))
56+
additionalPodSystemProperties.foreach { case (key, value) =>
57+
executorSparkConf.set(key, value)
58+
}
59+
} else {
60+
assert(additionalPodSystemProperties.isEmpty)
61+
}
62+
63+
val executorConf = KubernetesTestConf.createExecutorConf(sparkConf = executorSparkConf)
64+
val executorStep = new HadoopConfExecutorFeatureStep(executorConf)
65+
val executorPod = executorStep.configurePod(SparkPod.initialPod())
66+
67+
checkPod(executorPod, hasHadoopConf)
68+
}
69+
}
70+
71+
private def checkPod(pod: SparkPod, hasHadoopConf: Boolean): Unit = {
72+
if (hasHadoopConf) {
73+
assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
74+
assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
75+
assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
76+
} else {
77+
assert(!podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
78+
assert(!containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
79+
assert(!containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)