Skip to content

Commit d7f54dd

Browse files
committed
initial Stage 2 architecture using deprecated 2.1 methods
1 parent 06df962 commit d7f54dd

17 files changed

+313
-100
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
575575
}
576576

577577
// assure a keytab is available from any place in a JVM
578-
if (clusterManager == YARN || clusterManager == LOCAL) {
578+
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
579579
if (args.principal != null) {
580580
require(args.keytab != null, "Keytab must be specified when principal is specified")
581581
if (!new File(args.keytab).exists()) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/HadoopConfBootstrap.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.deploy.kubernetes
1818

1919
import java.io.File
2020

21+
import scala.collection.JavaConverters._
22+
2123
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
2224

2325
import org.apache.spark.deploy.kubernetes.constants._
@@ -40,16 +42,17 @@ private[spark] trait HadoopConfBootstrap {
4042

4143
private[spark] class HadoopConfBootstrapImpl(
4244
hadoopConfConfigMapName: String,
43-
hadoopConfigFiles: Array[File]) extends HadoopConfBootstrap with Logging{
45+
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging{
4446

4547
override def bootstrapMainContainerAndVolumes(
4648
originalPodWithMainContainer: PodWithMainContainer)
4749
: PodWithMainContainer = {
48-
import scala.collection.JavaConverters._
4950
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
5051
val keyPaths = hadoopConfigFiles.map(file =>
51-
new KeyToPathBuilder().withKey(file.toPath.getFileName.toString)
52-
.withPath(file.toPath.getFileName.toString).build()).toList
52+
new KeyToPathBuilder()
53+
.withKey(file.toPath.getFileName.toString)
54+
.withPath(file.toPath.getFileName.toString)
55+
.build()).toList
5356
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
5457
.editSpec()
5558
.addNewVolume()
@@ -72,9 +75,8 @@ private[spark] class HadoopConfBootstrapImpl(
7275
.withValue(HADOOP_FILE_DIR)
7376
.endEnv()
7477
.build()
75-
PodWithMainContainer(
76-
hadoopSupportedPod,
77-
mainContainerWithMountedHadoopConf
78-
)
78+
originalPodWithMainContainer.copy(
79+
pod = hadoopSupportedPod,
80+
mainContainer = mainContainerWithMountedHadoopConf)
7981
}
8082
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.ContainerBuilder
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
23+
private[spark] trait KerberosConfBootstrap {
24+
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
25+
: PodWithMainContainer
26+
}
27+
28+
private[spark] class KerberosConfBootstrapImpl(
29+
delegationTokenLabelName: String) extends KerberosConfBootstrap{
30+
override def bootstrapMainContainerAndVolumes(
31+
originalPodWithMainContainer: PodWithMainContainer)
32+
: PodWithMainContainer = {
33+
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
34+
originalPodWithMainContainer.mainContainer)
35+
.addNewEnv()
36+
.withName(ENV_KERBEROS_SECRET_LABEL)
37+
.withValue(delegationTokenLabelName)
38+
.endEnv()
39+
.build()
40+
originalPodWithMainContainer.copy(mainContainer = mainContainerWithMountedHadoopConf)
41+
}
42+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,7 @@ package object config extends Logging {
512512
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
513513
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
514514
ConfigBuilder("spark.kubernetes.kerberos")
515-
.doc("Specify whether your job is a job " +
516-
"that will require a Delegation Token to access HDFS")
515+
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
517516
.booleanConf
518517
.createWithDefault(false)
519518

@@ -531,6 +530,14 @@ package object config extends Logging {
531530
.stringConf
532531
.createOptional
533532

533+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET =
534+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret")
535+
.doc("Specify the label of the secret where " +
536+
" your existing delegation token is stored. This removes the need" +
537+
" for the job user to provide any keytab for launching a job")
538+
.stringConf
539+
.createOptional
540+
534541
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
535542
if (!rawMasterString.startsWith("k8s://")) {
536543
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ package object constants {
105105
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
106106
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
107107
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"
108+
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
109+
"spark.kubernetes.kerberos.dt"
110+
private[spark] val KERBEROS_SPARK_CONF_NAME =
111+
"spark.kubernetes.kerberos.secretlabelname"
112+
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
113+
"hadoop-tokens"
114+
private[spark] val ENV_KERBEROS_SECRET_LABEL =
115+
"KERBEROS_SECRET_LABEL"
108116

109117
// Miscellaneous
110118
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private[spark] class Client(
149149
}
150150

151151
private[spark] object Client {
152-
def run(sparkConf: SparkConf,
152+
def run(sparkConf: SparkConf,
153153
clientArguments: ClientArguments,
154154
hadoopConfDir: Option[String]): Unit = {
155155
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,13 @@ private[spark] class DriverConfigurationStepsOrchestrator(
108108
val hadoopConfigurations = hadoopConfDir.map(conf => getHadoopConfFiles(conf))
109109
.getOrElse(Array.empty[File])
110110
val hadoopConfigSteps =
111-
if (hadoopConfigurations.isEmpty) {
111+
if (hadoopConfDir.isEmpty) {
112112
Option.empty[DriverConfigurationStep]
113113
} else {
114114
val hadoopStepsOrchestrator = new HadoopStepsOrchestrator(
115115
namespace,
116116
hadoopConfigMapName,
117117
submissionSparkConf,
118-
hadoopConfigurations,
119118
hadoopConfDir)
120119
val hadoopConfSteps =
121120
hadoopStepsOrchestrator.getHadoopSteps()
@@ -164,13 +163,4 @@ private[spark] class DriverConfigurationStepsOrchestrator(
164163
hadoopConfigSteps.toSeq ++
165164
pythonStep.toSeq
166165
}
167-
private def getHadoopConfFiles(path: String) : Array[File] = {
168-
def isFile(file: File) = if (file.isFile) Some(file) else None
169-
val dir = new File(path)
170-
if (dir.isDirectory) {
171-
dir.listFiles.flatMap { file => isFile(file) }
172-
} else {
173-
Array.empty[File]
174-
}
175-
}
176166
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps
1818

19-
import java.io.StringWriter
20-
import java.util.Properties
19+
import scala.collection.JavaConverters._
2120

22-
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, HasMetadata}
21+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder
2322

2423
import org.apache.spark.deploy.kubernetes.constants._
2524
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep}
2625

26+
2727
/**
2828
* Configures the driverSpec that bootstraps dependencies into the driver pod.
2929
*/
@@ -33,12 +33,12 @@ private[spark] class HadoopConfigBootstrapStep(
3333
extends DriverConfigurationStep {
3434

3535
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
36-
import scala.collection.JavaConverters._
3736
var currentHadoopSpec = HadoopConfigSpec(
3837
driverPod = driverSpec.driverPod,
3938
driverContainer = driverSpec.driverContainer,
4039
configMapProperties = Map.empty[String, String],
41-
additionalDriverSparkConf = Map.empty[String, String])
40+
additionalDriverSparkConf = Map.empty[String, String],
41+
dtSecret = None)
4242
for (nextStep <- hadoopConfigurationSteps) {
4343
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
4444
}
@@ -58,7 +58,7 @@ private[spark] class HadoopConfigBootstrapStep(
5858
driverSparkConf = executorSparkConf,
5959
otherKubernetesResources =
6060
driverSpec.otherKubernetesResources ++
61-
Seq(configMap)
61+
Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq
6262
)
6363
}
6464
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
private[spark] case class HDFSDelegationToken(bytes: Array[Byte], renewal: Long)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.deploy.kubernetes.constants._
2828
*/
2929
private[spark] class HadoopConfMounterStep(
3030
hadoopConfigMapName: String,
31-
hadoopConfigurationFiles: Array[File],
31+
hadoopConfigurationFiles: Seq[File],
3232
hadoopConfBootstrapConf: HadoopConfBootstrap,
3333
hadoopConfDir: Option[String])
3434
extends HadoopConfigurationStep {

0 commit comments

Comments
 (0)