Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 3f1c567

Browse files
committed
initial Stage 2 architecture using deprecated 2.1 methods
1 parent 82e073b commit 3f1c567

17 files changed

+313
-104
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ object SparkSubmit {
559559
}
560560

561561
// assure a keytab is available from any place in a JVM
562-
if (clusterManager == YARN || clusterManager == LOCAL) {
562+
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
563563
if (args.principal != null) {
564564
require(args.keytab != null, "Keytab must be specified when principal is specified")
565565
if (!new File(args.keytab).exists()) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.deploy.kubernetes
1818

1919
import java.io.File
2020

21+
import scala.collection.JavaConverters._
22+
2123
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
2224

2325
import org.apache.spark.deploy.kubernetes.constants._
@@ -40,16 +42,17 @@ private[spark] trait HadoopConfBootstrap {
4042

4143
private[spark] class HadoopConfBootstrapImpl(
4244
hadoopConfConfigMapName: String,
43-
hadoopConfigFiles: Array[File]) extends HadoopConfBootstrap with Logging{
45+
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging{
4446

4547
override def bootstrapMainContainerAndVolumes(
4648
originalPodWithMainContainer: PodWithMainContainer)
4749
: PodWithMainContainer = {
48-
import scala.collection.JavaConverters._
4950
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
5051
val keyPaths = hadoopConfigFiles.map(file =>
51-
new KeyToPathBuilder().withKey(file.toPath.getFileName.toString)
52-
.withPath(file.toPath.getFileName.toString).build()).toList
52+
new KeyToPathBuilder()
53+
.withKey(file.toPath.getFileName.toString)
54+
.withPath(file.toPath.getFileName.toString)
55+
.build()).toList
5356
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
5457
.editSpec()
5558
.addNewVolume()
@@ -72,9 +75,8 @@ private[spark] class HadoopConfBootstrapImpl(
7275
.withValue(HADOOP_FILE_DIR)
7376
.endEnv()
7477
.build()
75-
PodWithMainContainer(
76-
hadoopSupportedPod,
77-
mainContainerWithMountedHadoopConf
78-
)
78+
originalPodWithMainContainer.copy(
79+
pod = hadoopSupportedPod,
80+
mainContainer = mainContainerWithMountedHadoopConf)
7981
}
8082
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.ContainerBuilder
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
private[spark] trait KerberosConfBootstrap {
24+
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
25+
: PodWithMainContainer
26+
}
27+
28+
private[spark] class KerberosConfBootstrapImpl(
29+
delegationTokenLabelName: String) extends KerberosConfBootstrap{
30+
override def bootstrapMainContainerAndVolumes(
31+
originalPodWithMainContainer: PodWithMainContainer)
32+
: PodWithMainContainer = {
33+
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
34+
originalPodWithMainContainer.mainContainer)
35+
.addNewEnv()
36+
.withName(ENV_KERBEROS_SECRET_LABEL)
37+
.withValue(delegationTokenLabelName)
38+
.endEnv()
39+
.build()
40+
originalPodWithMainContainer.copy(mainContainer = mainContainerWithMountedHadoopConf)
41+
}
42+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,7 @@ package object config extends Logging {
500500
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
501501
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
502502
ConfigBuilder("spark.kubernetes.kerberos")
503-
.doc("Specify whether your job is a job " +
504-
"that will require a Delegation Token to access HDFS")
503+
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
505504
.booleanConf
506505
.createWithDefault(false)
507506

@@ -519,6 +518,14 @@ package object config extends Logging {
519518
.stringConf
520519
.createOptional
521520

521+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET =
522+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret")
523+
.doc("Specify the label of the secret where " +
524+
" your existing delegation token is stored. This removes the need" +
525+
" for the job user to provide any keytab for launching a job")
526+
.stringConf
527+
.createOptional
528+
522529
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
523530
if (!rawMasterString.startsWith("k8s://")) {
524531
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ package object constants {
9797
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
9898
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
9999
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"
100+
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
101+
"spark.kubernetes.kerberos.dt"
102+
private[spark] val KERBEROS_SPARK_CONF_NAME =
103+
"spark.kubernetes.kerberos.secretlabelname"
104+
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
105+
"hadoop-tokens"
106+
private[spark] val ENV_KERBEROS_SECRET_LABEL =
107+
"KERBEROS_SECRET_LABEL"
100108

101109
// Miscellaneous
102110
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private[spark] class Client(
149149
}
150150

151151
private[spark] object Client {
152-
def run(sparkConf: SparkConf,
152+
def run(sparkConf: SparkConf,
153153
clientArguments: ClientArguments,
154154
hadoopConfDir: Option[String]): Unit = {
155155
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit
1818

19-
import java.io.File
20-
2119
import org.apache.spark.SparkConf
2220
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2321
import org.apache.spark.deploy.kubernetes.config._
@@ -99,17 +97,14 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9997
submissionSparkConf)
10098
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
10199
submissionSparkConf, kubernetesResourceNamePrefix)
102-
val hadoopConfigurations = hadoopConfDir.map(conf => getHadoopConfFiles(conf))
103-
.getOrElse(Array.empty[File])
104100
val hadoopConfigSteps =
105-
if (hadoopConfigurations.isEmpty) {
101+
if (hadoopConfDir.isEmpty) {
106102
Option.empty[DriverConfigurationStep]
107103
} else {
108104
val hadoopStepsOrchestrator = new HadoopStepsOrchestrator(
109105
namespace,
110106
hadoopConfigMapName,
111107
submissionSparkConf,
112-
hadoopConfigurations,
113108
hadoopConfDir)
114109
val hadoopConfSteps =
115110
hadoopStepsOrchestrator.getHadoopSteps()
@@ -157,13 +152,4 @@ private[spark] class DriverConfigurationStepsOrchestrator(
157152
hadoopConfigSteps.toSeq ++
158153
pythonStep.toSeq
159154
}
160-
private def getHadoopConfFiles(path: String) : Array[File] = {
161-
def isFile(file: File) = if (file.isFile) Some(file) else None
162-
val dir = new File(path)
163-
if (dir.isDirectory) {
164-
dir.listFiles.flatMap { file => isFile(file) }
165-
} else {
166-
Array.empty[File]
167-
}
168-
}
169155
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps
1818

19-
import java.io.StringWriter
20-
import java.util.Properties
19+
import scala.collection.JavaConverters._
2120

22-
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, HasMetadata}
21+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder
2322

2423
import org.apache.spark.deploy.kubernetes.constants._
2524
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
2625

26+
2727
/**
2828
* Configures the driverSpec that bootstraps dependencies into the driver pod.
2929
*/
@@ -33,12 +33,12 @@ private[spark] class HadoopConfigBootstrapStep(
3333
extends DriverConfigurationStep {
3434

3535
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
36-
import scala.collection.JavaConverters._
3736
var currentHadoopSpec = HadoopConfigSpec(
3837
driverPod = driverSpec.driverPod,
3938
driverContainer = driverSpec.driverContainer,
4039
configMapProperties = Map.empty[String, String],
41-
additionalDriverSparkConf = Map.empty[String, String])
40+
additionalDriverSparkConf = Map.empty[String, String],
41+
dtSecret = None)
4242
for (nextStep <- hadoopConfigurationSteps) {
4343
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
4444
}
@@ -58,7 +58,7 @@ private[spark] class HadoopConfigBootstrapStep(
5858
driverSparkConf = executorSparkConf,
5959
otherKubernetesResources =
6060
driverSpec.otherKubernetesResources ++
61-
Seq(configMap)
61+
Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq
6262
)
6363
}
6464
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
private[spark] case class HDFSDelegationToken(bytes: Array[Byte], renewal: Long)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.kubernetes.constants._
2828
*/
2929
private[spark] class HadoopConfMounterStep(
3030
hadoopConfigMapName: String,
31-
hadoopConfigurationFiles: Array[File],
31+
hadoopConfigurationFiles: Seq[File],
3232
hadoopConfBootstrapConf: HadoopConfBootstrap,
3333
hadoopConfDir: Option[String])
3434
extends HadoopConfigurationStep {

0 commit comments

Comments
 (0)