Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 704430a

Browse files
committed
add spark.kubernetes.hadoop.conf.configmap.name conf to use exist hadoop conf configmap
Signed-off-by: forrestchen <[email protected]>
1 parent 12d590c commit 704430a

File tree

12 files changed

+72
-81
lines changed

12 files changed

+72
-81
lines changed

docs/running-on-kubernetes.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,15 @@ from the other deployment modes. See the [configuration page](configuration.html
838838
in the executor Pods. The user can specify multiple instances of this for multiple secrets.
839839
</td>
840840
</tr>
841+
<tr>
842+
<td><code>spark.kubernetes.hadoop.conf.configmap.name</code></td>
843+
<td>(none)</td>
844+
<td>
845+
If this is specified, will not create new configmap to store hadoop conf file and reuse the
846+
exist configmap. The configmap will be mounted into driver/executor pod and
847+
<code>HADOOP_CONF_DIR</code> will be set.
848+
</td>
849+
</tr>
841850
</table>
842851

843852

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,18 @@ private[spark] trait HadoopConfBootstrap {
3939
}
4040

4141
private[spark] class HadoopConfBootstrapImpl(
42-
hadoopConfConfigMapName: String,
43-
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging {
42+
hadoopConfConfigMapName: String) extends HadoopConfBootstrap with Logging {
4443

4544
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
4645
: PodWithMainContainer = {
47-
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files")
48-
val keyPaths = hadoopConfigFiles.map { file =>
49-
val fileStringPath = file.toPath.getFileName.toString
50-
new KeyToPathBuilder()
51-
.withKey(fileStringPath)
52-
.withPath(fileStringPath)
53-
.build() }
46+
logInfo("HADOOP_CONF_DIR or spark.kubernetes.hadoop.conf.configmap.name defined. " +
47+
"Mounting Hadoop specific files")
5448
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
5549
.editSpec()
5650
.addNewVolume()
5751
.withName(HADOOP_FILE_VOLUME)
5852
.withNewConfigMap()
5953
.withName(hadoopConfConfigMapName)
60-
.withItems(keyPaths.asJava)
6154
.endConfigMap()
6255
.endVolume()
6356
.endSpec()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,13 @@ package object config extends Logging {
539539
.stringConf
540540
.createOptional
541541

542+
private[spark] val KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME =
543+
ConfigBuilder("spark.kubernetes.hadoop.conf.configmap.name")
544+
.doc("Specify the configmap name of the config where the hadoop conf exist." +
545+
"It will be mounted to spark pods.")
546+
.stringConf
547+
.createOptional
548+
542549
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
543550
if (!rawMasterString.startsWith("k8s://")) {
544551
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ private[spark] class DriverConfigurationStepsOrchestrator(
5353
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
5454
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
5555
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
56-
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
56+
private val hadoopConfigMapName = submissionSparkConf.get(KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME)
57+
.getOrElse(s"$kubernetesResourceNamePrefix-hadoop-config")
58+
private val noNeedUploadHadoopConf = submissionSparkConf.get(
59+
KUBERNETES_HADOOP_CONF_CONFIGMAP_NAME).isDefined
5760

5861
def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
5962
val additionalMainAppJar = mainAppResource match {
@@ -123,18 +126,23 @@ private[spark] class DriverConfigurationStepsOrchestrator(
123126
None
124127
}
125128

126-
val hadoopConfigSteps =
127-
hadoopConfDir.map { conf =>
129+
val hadoopConfigSteps = if (hadoopConfDir.isDefined || noNeedUploadHadoopConf) {
128130
val hadoopStepsOrchestrator =
129131
new HadoopStepsOrchestrator(
130132
kubernetesResourceNamePrefix,
131133
namespace,
132134
hadoopConfigMapName,
133135
submissionSparkConf,
134-
conf)
136+
hadoopConfDir,
137+
noNeedUploadHadoopConf)
135138
val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps()
136-
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))}
137-
.getOrElse(Option.empty[DriverConfigurationStep])
139+
Some(new HadoopConfigBootstrapStep(
140+
hadoopConfSteps,
141+
hadoopConfigMapName,
142+
noNeedUploadHadoopConf))
143+
} else {
144+
Option.empty[DriverConfigurationStep]
145+
}
138146
val resourceStep = mainAppResource match {
139147
case PythonMainAppResource(mainPyResource) =>
140148
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.submitsteps
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.ConfigMapBuilder
21+
import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, HasMetadata}
2222

2323
import org.apache.spark.deploy.k8s.constants._
2424
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
@@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigS
3131
*/
3232
private[spark] class HadoopConfigBootstrapStep(
3333
hadoopConfigurationSteps: Seq[HadoopConfigurationStep],
34-
hadoopConfigMapName: String )
34+
hadoopConfigMapName: String,
35+
noNeedUploadHadoopConf: Boolean = false)
3536
extends DriverConfigurationStep {
3637

3738
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
@@ -46,13 +47,16 @@ private[spark] class HadoopConfigBootstrapStep(
4647
for (nextStep <- hadoopConfigurationSteps) {
4748
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
4849
}
49-
val configMap =
50-
new ConfigMapBuilder()
50+
val configMap = if (noNeedUploadHadoopConf) {
51+
Option.empty[HasMetadata]
52+
} else {
53+
Some(new ConfigMapBuilder()
5154
.withNewMetadata()
52-
.withName(hadoopConfigMapName)
53-
.endMetadata()
55+
.withName(hadoopConfigMapName)
56+
.endMetadata()
5457
.addToData(currentHadoopSpec.configMapProperties.asJava)
55-
.build()
58+
.build())
59+
}
5660
val driverSparkConfWithExecutorSetup = driverSpec.driverSparkConf.clone()
5761
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)
5862
.setAll(currentHadoopSpec.additionalDriverSparkConf)
@@ -62,7 +66,7 @@ private[spark] class HadoopConfigBootstrapStep(
6266
driverSparkConf = driverSparkConfWithExecutorSetup,
6367
otherKubernetesResources =
6468
driverSpec.otherKubernetesResources ++
65-
Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq
69+
configMap.toSeq ++ currentHadoopSpec.dtSecret.toSeq
6670
)
6771
}
6872
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ private[spark] class HadoopConfMounterStep(
3535
hadoopConfigMapName: String,
3636
hadoopConfigurationFiles: Seq[File],
3737
hadoopConfBootstrapConf: HadoopConfBootstrap,
38-
hadoopConfDir: String)
38+
hadoopConfDir: Option[String],
39+
noNeedUploadHadoopConf: Boolean = false)
3940
extends HadoopConfigurationStep {
4041

4142
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
@@ -48,10 +49,11 @@ private[spark] class HadoopConfMounterStep(
4849
driverPod = bootstrappedPodAndMainContainer.pod,
4950
driverContainer = bootstrappedPodAndMainContainer.mainContainer,
5051
configMapProperties =
51-
hadoopConfigurationFiles.map(file =>
52+
hadoopConfigurationFiles.filter(_ => !noNeedUploadHadoopConf).map(file =>
5253
(file.toPath.getFileName.toString, Files.toString(file, Charsets.UTF_8))).toMap,
53-
additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++
54-
Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir)
54+
additionalDriverSparkConf = hadoopConfDir.filter(_ => !noNeedUploadHadoopConf)
55+
.foldLeft(hadoopConfigSpec.additionalDriverSparkConf)((sparkConf, conf) =>
56+
sparkConf ++ Map(HADOOP_CONF_DIR_LOC -> conf))
5557
)
5658
}
5759
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[spark] class HadoopStepsOrchestrator(
3232
namespace: String,
3333
hadoopConfigMapName: String,
3434
submissionSparkConf: SparkConf,
35-
hadoopConfDir: String) extends Logging {
35+
hadoopConfDir: Option[String],
36+
noNeedUploadHadoopConf: Boolean = false) extends Logging {
3637

3738
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
3839
private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL)
@@ -43,7 +44,8 @@ private[spark] class HadoopStepsOrchestrator(
4344
submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
4445
private val maybeRenewerPrincipal =
4546
submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL)
46-
private val hadoopConfigurationFiles = HadoopConfUtils.getHadoopConfFiles(hadoopConfDir)
47+
private val hadoopConfigurationFiles = hadoopConfDir.map(HadoopConfUtils.getHadoopConfFiles)
48+
.getOrElse(Seq.empty[File])
4749
private val hadoopUGI = new HadoopUGIUtilImpl
4850
logInfo(s"Hadoop Conf directory: $hadoopConfDir")
4951

@@ -68,14 +70,13 @@ private[spark] class HadoopStepsOrchestrator(
6870
" you must also specify the name of the secret")
6971

7072
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
71-
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
72-
hadoopConfigMapName,
73-
hadoopConfigurationFiles)
73+
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(hadoopConfigMapName)
7474
val hadoopConfMounterStep = new HadoopConfMounterStep(
7575
hadoopConfigMapName,
7676
hadoopConfigurationFiles,
7777
hadoopConfBootstrapImpl,
78-
hadoopConfDir)
78+
hadoopConfDir,
79+
noNeedUploadHadoopConf)
7980
val maybeKerberosStep =
8081
if (isKerberosEnabled) {
8182
maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
8888
}
8989

9090
val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap =>
91-
val hadoopConfigurations = maybeHadoopConfDir.map(
92-
conf_dir => HadoopConfUtils.getHadoopConfFiles(conf_dir)).getOrElse(Seq.empty[File])
93-
new HadoopConfBootstrapImpl(
94-
hadoopConfigMap,
95-
hadoopConfigurations)
91+
new HadoopConfBootstrapImpl(hadoopConfigMap)
9692
}
9793

9894
val kerberosBootstrap =

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,20 @@ import org.apache.spark.util.Utils
3434
private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{
3535
private val CONFIG_MAP_NAME = "config-map"
3636
private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
37-
private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
3837
private val SPARK_USER_VALUE = "sparkUser"
3938

4039
before {
4140
MockitoAnnotations.initMocks(this)
4241
}
4342

4443
test("Test of bootstrapping hadoop_conf_dir files") {
45-
val hadoopConfStep = new HadoopConfBootstrapImpl(
46-
CONFIG_MAP_NAME,
47-
HADOOP_FILES)
48-
val expectedKeyPaths = Seq(
49-
new KeyToPathBuilder()
50-
.withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString)
51-
.withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString)
52-
.build())
44+
val hadoopConfStep = new HadoopConfBootstrapImpl(CONFIG_MAP_NAME)
5345
val expectedPod = new PodBuilder()
5446
.editOrNewSpec()
5547
.addNewVolume()
5648
.withName(HADOOP_FILE_VOLUME)
5749
.withNewConfigMap()
5850
.withName(CONFIG_MAP_NAME)
59-
.withItems(expectedKeyPaths.asJava)
6051
.endConfigMap()
6152
.endVolume()
6253
.endSpec()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with Befor
7373
CONFIG_MAP_NAME,
7474
HADOOP_FILES,
7575
hadoopConfBootstrap,
76-
HADOOP_CONF_DIR_VAL)
76+
Some(HADOOP_CONF_DIR_VAL))
7777
val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL)
7878
val expectedConfigMap = Map(
7979
TEMP_HADOOP_FILE.toPath.getFileName.toString ->

0 commit comments

Comments
 (0)