Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit eba1cb2

Browse files
committed
Initial architecture design for HDFS support
1 parent 436482e commit eba1cb2

File tree

11 files changed

+343
-5
lines changed

11 files changed

+343
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.io.File
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, KeyToPathBuilder, PodBuilder}
23+
24+
import collection.JavaConverters._
25+
26+
27+
/**
28+
* This is separated out from the HadoopConf steps API because this component can be reused to
29+
* set up the hadoop-conf for executors as well.
30+
*/
31+
private[spark] trait HadoopConfBootstrap {
32+
/**
33+
* Bootstraps a main container with the ConfigMaps mounted as volumes and an ENV variable
34+
* pointing to the mounted file.
35+
*/
36+
def bootstrapMainContainerAndVolumes(
37+
originalPodWithMainContainer: PodWithMainContainer)
38+
: PodWithMainContainer
39+
}
40+
41+
private[spark] class HadoopConfBootstrapImpl(
42+
hadoopConfConfigMapName: String,
43+
hadoopConfigFiles: Array[File]) extends HadoopConfBootstrap {
44+
45+
override def bootstrapMainContainerAndVolumes(
46+
originalPodWithMainContainer: PodWithMainContainer)
47+
: PodWithMainContainer = {
48+
val fileContents = hadoopConfigFiles.map(file => (file.getPath, file.toString)).toMap
49+
val keyPaths = hadoopConfigFiles.map(file =>
50+
new KeyToPathBuilder().withKey(file.getPath).withPath(file.getAbsolutePath).build())
51+
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
52+
.editSpec()
53+
.addNewVolume()
54+
.withName(HADOOP_FILE_VOLUME)
55+
.withNewConfigMap()
56+
.withName(hadoopConfConfigMapName)
57+
.addAllToItems(keyPaths.toList.asJavaCollection)
58+
.endConfigMap()
59+
.endVolume()
60+
.endSpec()
61+
.build()
62+
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
63+
originalPodWithMainContainer.mainContainer)
64+
.addNewVolumeMount()
65+
.withName(HADOOP_FILE_VOLUME)
66+
.withMountPath(HADOOP_FILE_DIR)
67+
.endVolumeMount()
68+
.addNewEnv()
69+
.withName(HADOOP_CONF_DIR)
70+
.withValue(s"$HADOOP_FILE_DIR/$HADOOP_FILE_VOLUME")
71+
.endEnv()
72+
.build()
73+
PodWithMainContainer(
74+
hadoopSupportedPod,
75+
mainContainerWithMountedHadoopConf
76+
)
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{Container, Pod}
20+
21+
private[spark] case class PodWithMainContainer(
22+
pod: Pod,
23+
mainContainer: Container)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ package object config extends Logging {
498498
.createOptional
499499

500500
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
501+
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
502+
ConfigBuilder("spark.kubernetes.kerberos")
503+
.doc("Specify whether your job is a job " +
504+
"that will require a Delegation Token to access HDFS")
505+
.booleanConf
506+
.createWithDefault(false)
501507

502508
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
503509
if (!rawMasterString.startsWith("k8s://")) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ package object constants {
8686
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
8787
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
8888
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
89+
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
90+
private[spark] val HADOOP_FILE_DIR = "/etc/hadoop"
91+
private[spark] val HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
8992
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
9093
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
9194
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit
1818

19+
import java.io.File
20+
1921
import org.apache.spark.SparkConf
2022
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2123
import org.apache.spark.deploy.kubernetes.config._
2224
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep}
25+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator
26+
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
2427
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2528
import org.apache.spark.launcher.SparkLauncher
2629
import org.apache.spark.util.Utils
@@ -51,6 +54,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
5154
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
5255
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
5356
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
57+
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
5458

5559
def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
5660
val additionalMainAppJar = mainAppResource match {
@@ -94,6 +98,22 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9498
submissionSparkConf)
9599
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
96100
submissionSparkConf, kubernetesResourceNamePrefix)
101+
val hadoopConfigurations =
102+
sys.env.get("HADOOP_CONF_DIR").map{ conf => getHadoopConfFiles(conf)}
103+
.getOrElse(Array.empty[File])
104+
val hadoopConfigSteps =
105+
if (hadoopConfigurations.isEmpty) {
106+
Option.empty[DriverConfigurationStep]
107+
} else {
108+
val hadoopStepsOrchestrator = new HadoopStepsOrchestrator(
109+
namespace,
110+
kubernetesResourceNamePrefix,
111+
submissionSparkConf,
112+
hadoopConfigurations)
113+
val hadoopConfSteps =
114+
hadoopStepsOrchestrator.getHadoopSteps()
115+
Some(new HadoopConfigBootstrapStep(hadoopConfSteps))
116+
}
97117
val pythonStep = mainAppResource match {
98118
case PythonMainAppResource(mainPyResource) =>
99119
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
@@ -133,6 +153,16 @@ private[spark] class DriverConfigurationStepsOrchestrator(
133153
kubernetesCredentialsStep,
134154
dependencyResolutionStep) ++
135155
initContainerBootstrapStep.toSeq ++
156+
hadoopConfigSteps.toSeq ++
136157
pythonStep.toSeq
137158
}
159+
private def getHadoopConfFiles(path: String) : Array[File] = {
160+
def isFile(file: File) = if (file.isFile) Some(file) else None
161+
val dir = new File(path)
162+
if (dir.isDirectory) {
163+
dir.listFiles.flatMap { file => isFile(file) }
164+
} else {
165+
Array.empty[File]
166+
}
167+
}
138168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
20+
21+
/**
22+
* Configures the driverSpec that bootstraps dependencies into the driver pod.
23+
*/
24+
private[spark] class HadoopConfigBootstrapStep(
25+
hadoopConfigurationSteps: Seq[HadoopConfigurationStep])
26+
extends DriverConfigurationStep {
27+
28+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
29+
var currentHadoopSpec = HadoopConfigSpec(
30+
driverPod = driverSpec.driverPod,
31+
driverContainer = driverSpec.driverContainer)
32+
for (nextStep <- hadoopConfigurationSteps) {
33+
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
34+
}
35+
driverSpec.copy(
36+
driverPod = currentHadoopSpec.driverPod,
37+
driverContainer = currentHadoopSpec.driverContainer)
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import java.io.File
20+
21+
import io.fabric8.kubernetes.api.model._
22+
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrap, PodWithMainContainer}
23+
import org.apache.spark.deploy.kubernetes.config._
24+
import org.apache.spark.deploy.kubernetes.constants._
25+
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
26+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverKubernetesCredentialsStep, KubernetesDriverSpec}
27+
import scala.collection.JavaConverters._
28+
29+
/**
30+
* Step that configures the ConfigMap + Volumes for the driver
31+
*/
32+
private[spark] class HadoopConfMounterStep(
33+
hadoopConfigMapName: String,
34+
hadoopConfBootstrapConf: HadoopConfBootstrap)
35+
extends HadoopConfigurationStep {
36+
37+
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
38+
val bootstrappedPodAndMainContainer =
39+
hadoopConfBootstrapConf.bootstrapMainContainerAndVolumes(
40+
PodWithMainContainer(
41+
hadoopConfigSpec.driverPod,
42+
hadoopConfigSpec.driverContainer
43+
))
44+
hadoopConfigSpec.copy(
45+
driverPod = bootstrappedPodAndMainContainer.pod,
46+
driverContainer = bootstrappedPodAndMainContainer.mainContainer
47+
)
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
20+
21+
/**
22+
* Represents a given configuration of the hadoop configuration logic, informing the
23+
* HadoopConfigBootstrapStep of how the driver should be configured. This includes:
24+
* <p>
25+
* - What Spark properties should be set on the driver's SparkConf for the executors
26+
* - The spec of the main container so that it can be modified to share volumes
27+
* - The spec of the driver pod EXCEPT for the addition of the given hadoop configs (e.g. volumes
28+
* the hadoop logic needs)
29+
*/
30+
private[spark] case class HadoopConfigSpec(
31+
// additionalDriverSparkConf: Map[String, String],
32+
driverPod: Pod,
33+
driverContainer: Container)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
/**
20+
* Represents a step in preparing the driver
21+
*/
22+
private[spark] trait HadoopConfigurationStep {
23+
24+
def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import java.io.File
20+
21+
import org.apache.spark.SparkConf
22+
import org.apache.spark.deploy.kubernetes.HadoopConfBootstrapImpl
23+
import org.apache.spark.deploy.kubernetes.config._
24+
25+
26+
/**
27+
* Returns the complete ordered list of steps required to configure the hadoop configurations.
28+
*/
29+
private[spark] class HadoopStepsOrchestrator(
30+
namespace: String,
31+
kubernetesResourceNamePrefix: String,
32+
submissionSparkConf: SparkConf,
33+
hadoopConfigurationFiles: Array[File]) {
34+
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"
35+
private val maybeKerberosSupport = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
36+
37+
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
38+
val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl(
39+
hadoopConfigMapName,
40+
hadoopConfigurationFiles)
41+
val hadoopConfMounterStep = new HadoopConfMounterStep(
42+
hadoopConfigMapName,
43+
hadoopConfBootstrapImpl)
44+
val maybeHadoopKerberosMountingStep =
45+
if (maybeKerberosSupport) {
46+
// TODO: Implement mounting secrets
47+
Option.empty[HadoopConfigurationStep]
48+
} else {
49+
Option.empty[HadoopConfigurationStep]
50+
}
51+
Seq(hadoopConfMounterStep) ++ maybeHadoopKerberosMountingStep.toSeq
52+
}
53+
}

0 commit comments

Comments
 (0)