Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 495f8f8

Browse files
committed
Added proper logic for mounting ConfigMaps
1 parent d6fec87 commit 495f8f8

File tree

10 files changed

+87
-14
lines changed

10 files changed

+87
-14
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,17 @@ private[spark] class HadoopConfBootstrapImpl(
4343
override def bootstrapMainContainerAndVolumes(
4444
originalPodWithMainContainer: PodWithMainContainer)
4545
: PodWithMainContainer = {
46-
import collection.JavaConverters._
47-
val fileContents = hadoopConfigFiles.map(file => (file.getPath, file.toString)).toMap
46+
import scala.collection.JavaConverters._
4847
val keyPaths = hadoopConfigFiles.map(file =>
49-
new KeyToPathBuilder().withKey(file.getPath).withPath(file.getAbsolutePath).build())
48+
new KeyToPathBuilder().withKey(file.toPath.getFileName.toString)
49+
.withPath(file.toPath.getFileName.toString).build()).toList
5050
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
5151
.editSpec()
5252
.addNewVolume()
5353
.withName(HADOOP_FILE_VOLUME)
5454
.withNewConfigMap()
5555
.withName(hadoopConfConfigMapName)
56-
.addAllToItems(keyPaths.toList.asJavaCollection)
56+
.withItems(keyPaths.asJava)
5757
.endConfigMap()
5858
.endVolume()
5959
.endSpec()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ package object constants {
8989
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
9090
private[spark] val HADOOP_FILE_DIR = "/etc/hadoop"
9191
private[spark] val HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
92+
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
93+
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"
9294
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
9395
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
9496
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
112112
hadoopConfigurations)
113113
val hadoopConfSteps =
114114
hadoopStepsOrchestrator.getHadoopSteps()
115-
Some(new HadoopConfigBootstrapStep(hadoopConfSteps))
115+
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, kubernetesResourceNamePrefix))
116116
}
117117
val pythonStep = mainAppResource match {
118118
case PythonMainAppResource(mainPyResource) =>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,44 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps
1818

19+
import java.io.StringWriter
20+
import java.util.Properties
21+
22+
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, HasMetadata}
23+
import org.apache.spark.deploy.kubernetes.constants._
1924
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
2025

2126
/**
2227
* Configures the driverSpec that bootstraps dependencies into the driver pod.
2328
*/
2429
private[spark] class HadoopConfigBootstrapStep(
25-
hadoopConfigurationSteps: Seq[HadoopConfigurationStep])
30+
hadoopConfigurationSteps: Seq[HadoopConfigurationStep], kubernetesResourceNamePrefix: String)
2631
extends DriverConfigurationStep {
32+
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
2733

2834
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
35+
import scala.collection.JavaConverters._
2936
var currentHadoopSpec = HadoopConfigSpec(
3037
driverPod = driverSpec.driverPod,
31-
driverContainer = driverSpec.driverContainer)
38+
driverContainer = driverSpec.driverContainer,
39+
configMapProperties = Map.empty[String, String])
3240
for (nextStep <- hadoopConfigurationSteps) {
3341
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
3442
}
43+
val configMap =
44+
new ConfigMapBuilder()
45+
.withNewMetadata()
46+
.withName(hadoopConfigMapName)
47+
.endMetadata()
48+
.addToData(currentHadoopSpec.configMapProperties.asJava)
49+
.build()
50+
val executorSparkConf = driverSpec.driverSparkConf.clone()
51+
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)
3552
driverSpec.copy(
3653
driverPod = currentHadoopSpec.driverPod,
37-
driverContainer = currentHadoopSpec.driverContainer)
54+
driverContainer = currentHadoopSpec.driverContainer,
55+
driverSparkConf = executorSparkConf,
56+
otherKubernetesResources = Seq(configMap)
57+
)
3858
}
3959
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
1818

19+
import java.io.File
20+
1921
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrap, PodWithMainContainer}
2022

2123
/**
2224
* Step that configures the ConfigMap + Volumes for the driver
2325
*/
2426
private[spark] class HadoopConfMounterStep(
2527
hadoopConfigMapName: String,
28+
hadoopConfigurationFiles: Array[File],
2629
hadoopConfBootstrapConf: HadoopConfBootstrap)
2730
extends HadoopConfigurationStep {
2831

@@ -35,7 +38,10 @@ private[spark] class HadoopConfMounterStep(
3538
))
3639
hadoopConfigSpec.copy(
3740
driverPod = bootstrappedPodAndMainContainer.pod,
38-
driverContainer = bootstrappedPodAndMainContainer.mainContainer
41+
driverContainer = bootstrappedPodAndMainContainer.mainContainer,
42+
configMapProperties =
43+
hadoopConfigurationFiles.map(file =>
44+
(file.toPath.getFileName.toString, file.toString)).toMap
3945
)
4046
}
4147
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
2626
* - The spec of the main container so that it can be modified to share volumes
2727
* - The spec of the driver pod EXCEPT for the addition of the given hadoop configs (e.g. volumes
2828
* the hadoop logic needs)
29+
* - The properties that will be stored into the config map which have (key, value)
30+
* pairs of (path, data)
2931
*/
3032
private[spark] case class HadoopConfigSpec(
3133
// additionalDriverSparkConf: Map[String, String],
3234
driverPod: Pod,
33-
driverContainer: Container)
35+
driverContainer: Container,
36+
configMapProperties: Map[String, String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private[spark] class HadoopStepsOrchestrator(
4040
hadoopConfigurationFiles)
4141
val hadoopConfMounterStep = new HadoopConfMounterStep(
4242
hadoopConfigMapName,
43+
hadoopConfigurationFiles,
4344
hadoopConfBootstrapImpl)
4445
val maybeHadoopKerberosMountingStep =
4546
if (maybeKerberosSupport) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.io.File
2020

2121
import io.fabric8.kubernetes.client.Config
2222

23-
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
23+
import org.apache.spark.deploy.kubernetes._
2424
import org.apache.spark.deploy.kubernetes.config._
2525
import org.apache.spark.deploy.kubernetes.constants._
2626
import org.apache.spark.internal.Logging
@@ -41,6 +41,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
4141
val sparkConf = sc.getConf
4242
val maybeConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP)
4343
val maybeConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY)
44+
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME)
4445

4546
val maybeExecutorInitContainerSecretName =
4647
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
@@ -71,6 +72,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
7172
configMap,
7273
configMapKey)
7374
}
75+
val hadoopBootStrap = for {
76+
hadoopConfigMap <- maybeHadoopConfigMap
77+
} yield {
78+
val hadoopConfigurations =
79+
sys.env.get("HADOOP_CONF_DIR").map{ conf => getHadoopConfFiles(conf)}
80+
.getOrElse(Array.empty[File])
81+
new HadoopConfBootstrapImpl(
82+
hadoopConfigMap,
83+
hadoopConfigurations
84+
)
85+
}
7486
if (maybeConfigMap.isEmpty) {
7587
logWarning("The executor's init-container config map was not specified. Executors will" +
7688
" therefore not attempt to fetch remote or submitted dependencies.")
@@ -79,6 +91,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
7991
logWarning("The executor's init-container config map key was not specified. Executors will" +
8092
" therefore not attempt to fetch remote or submitted dependencies.")
8193
}
94+
if (maybeHadoopConfigMap.isEmpty) {
95+
logWarning("The executor's hadoop config map key was not specified. Executors will" +
96+
" therefore not attempt to fetch hadoop configuration files.")
97+
}
8298
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
8399
KUBERNETES_MASTER_INTERNAL_URL,
84100
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
@@ -90,11 +106,21 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
90106
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
91107
sc,
92108
initBootStrap,
109+
hadoopBootStrap,
93110
executorInitContainerSecretVolumePlugin,
94111
kubernetesClient)
95112
}
96113

97114
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
98115
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
99116
}
117+
private def getHadoopConfFiles(path: String) : Array[File] = {
118+
def isFile(file: File) = if (file.isFile) Some(file) else None
119+
val dir = new File(path)
120+
if (dir.isDirectory) {
121+
dir.listFiles.flatMap { file => isFile(file) }
122+
} else {
123+
Array.empty[File]
124+
}
125+
}
100126
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
3434
import org.apache.commons.io.FilenameUtils
3535

3636
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
37-
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
37+
import org.apache.spark.deploy.kubernetes._
3838
import org.apache.spark.deploy.kubernetes.config._
3939
import org.apache.spark.deploy.kubernetes.constants._
4040
import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil
@@ -50,6 +50,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
5050
scheduler: TaskSchedulerImpl,
5151
val sc: SparkContext,
5252
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
53+
executorHadoopBootStrap: Option[HadoopConfBootstrap],
5354
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
5455
kubernetesClient: KubernetesClient)
5556
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
@@ -428,6 +429,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
428429
* @return A tuple of the new executor name and the Pod data structure.
429430
*/
430431
private def allocateNewExecutorPod(nodeToLocalTaskCount: Map[String, Int]): (String, Pod) = {
432+
import scala.collection.JavaConverters._
431433
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
432434
val name = s"$executorPodNamePrefix-exec-$executorId"
433435

@@ -582,9 +584,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
582584

583585
val executorPodWithNodeAffinity = addNodeAffinityAnnotationIfUseful(
584586
executorPodWithInitContainer, nodeToLocalTaskCount)
585-
val resolvedExecutorPod = new PodBuilder(executorPodWithNodeAffinity)
587+
val (executorHadoopConfPod, executorHadoopConfContainer) =
588+
executorHadoopBootStrap.map { bootstrap =>
589+
val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes(
590+
PodWithMainContainer(executorPodWithNodeAffinity, initBootstrappedExecutorContainer)
591+
)
592+
(podWithMainContainer.pod, podWithMainContainer.mainContainer)
593+
}.getOrElse(executorPodWithNodeAffinity, initBootstrappedExecutorContainer)
594+
val resolvedExecutorPod = new PodBuilder(executorHadoopConfPod)
586595
.editSpec()
587-
.addToContainers(initBootstrappedExecutorContainer)
596+
.addToContainers(executorHadoopConfContainer)
588597
.endSpec()
589598
.build()
590599
try {

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
7272
kubernetesTestComponents.deleteNamespace()
7373
}
7474

75+
test("Include HADOOP_CONF for HDFS based jobs ") {
76+
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
77+
78+
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
79+
}
80+
7581
test("Run PySpark Job on file from SUBMITTER with --py-files") {
7682
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
7783

0 commit comments

Comments
 (0)