Skip to content

Commit 7612bf5

Browse files
committed
first stage of PR apache-spark-on-k8s#514 of just logic
1 parent b008be3 commit 7612bf5

19 files changed

+981
-22
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ object SparkSubmit extends CommandLineUtils {
573573
}
574574

575575
// assure a keytab is available from any place in a JVM
576-
if (clusterManager == YARN || clusterManager == LOCAL) {
576+
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
577577
if (args.principal != null) {
578578
require(args.keytab != null, "Keytab must be specified when principal is specified")
579579
if (!new File(args.keytab).exists()) {

docs/running-on-kubernetes.md

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html
752752
</td>
753753
</tr>
754754
<tr>
755+
<td><code>spark.kubernetes.kerberos.enabled</code></td>
756+
<td>false</td>
757+
<td>
758+
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
759+
will assume that you will not require secure HDFS access.
760+
</td>
761+
</tr>
762+
<tr>
763+
<td><code>spark.kubernetes.kerberos.keytab</code></td>
764+
<td>(none)</td>
765+
<td>
766+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
767+
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
768+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
769+
will look within your local TGT cache to resolve this.
770+
</td>
771+
</tr>
772+
<tr>
773+
<td><code>spark.kubernetes.kerberos.principal</code></td>
774+
<td>(none)</td>
775+
<td>
776+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
777+
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
778+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
779+
will look within your local TGT cache to resolve this.
780+
</td>
781+
</tr>
782+
<tr>
783+
<td><code>spark.kubernetes.kerberos.rewewer.principal</code></td>
784+
<td>(none)</td>
785+
<td>
786+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
787+
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
788+
we will set the principal to be the job users principal by default.
789+
</td>
790+
</tr>
791+
<tr>
792+
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
793+
<td>(none)</td>
794+
<td>
795+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
796+
the name of the secret where your existing delegation token data is stored. You must also specify the
797+
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
798+
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
799+
created.
800+
</td>
801+
</tr>
802+
<tr>
803+
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
804+
<td>spark.kubernetes.kerberos.dt.label</td>
805+
<td>
806+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
807+
the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
808+
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
809+
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
755810
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
756811
<td>(none)</td>
757812
<td>
@@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html
791846
Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
792847
should be lifted in the future include:
793848
* Applications can only run in cluster mode.
794-
* Only Scala and Java applications can be run.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.File
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
24+
25+
import org.apache.spark.deploy.k8s.constants._
26+
import org.apache.spark.internal.Logging
27+
28+
/**
29+
* This is separated out from the HadoopConf steps API because this component can be reused to
30+
* set up the Hadoop Configuration for executors as well.
31+
*/
32+
private[spark] trait HadoopConfBootstrap {
33+
/**
34+
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
35+
* mounted as volumes and an ENV variable pointing to the mounted file.
36+
*/
37+
def bootstrapMainContainerAndVolumes(
38+
originalPodWithMainContainer: PodWithMainContainer)
39+
: PodWithMainContainer
40+
}
41+
42+
private[spark] class HadoopConfBootstrapImpl(
43+
hadoopConfConfigMapName: String,
44+
hadoopConfigFiles: Seq[File],
45+
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{
46+
47+
override def bootstrapMainContainerAndVolumes(
48+
originalPodWithMainContainer: PodWithMainContainer)
49+
: PodWithMainContainer = {
50+
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
51+
val keyPaths = hadoopConfigFiles.map(file =>
52+
new KeyToPathBuilder()
53+
.withKey(file.toPath.getFileName.toString)
54+
.withPath(file.toPath.getFileName.toString)
55+
.build())
56+
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
57+
.editSpec()
58+
.addNewVolume()
59+
.withName(HADOOP_FILE_VOLUME)
60+
.withNewConfigMap()
61+
.withName(hadoopConfConfigMapName)
62+
.withItems(keyPaths.asJava)
63+
.endConfigMap()
64+
.endVolume()
65+
.endSpec()
66+
.build()
67+
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
68+
originalPodWithMainContainer.mainContainer)
69+
.addNewVolumeMount()
70+
.withName(HADOOP_FILE_VOLUME)
71+
.withMountPath(HADOOP_CONF_DIR_PATH)
72+
.endVolumeMount()
73+
.addNewEnv()
74+
.withName(ENV_HADOOP_CONF_DIR)
75+
.withValue(HADOOP_CONF_DIR_PATH)
76+
.endEnv()
77+
.addNewEnv()
78+
.withName(ENV_SPARK_USER)
79+
.withValue(hadoopUGI.getShortName)
80+
.endEnv()
81+
.build()
82+
originalPodWithMainContainer.copy(
83+
pod = hadoopSupportedPod,
84+
mainContainer = mainContainerWithMountedHadoopConf)
85+
}
86+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
20+
21+
import scala.util.Try
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.FileSystem
25+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
26+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
27+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
28+
29+
30+
// Function of this class is merely for mocking reasons
31+
private[spark] class HadoopUGIUtil{
32+
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
33+
34+
def getShortName: String = getCurrentUser.getShortUserName
35+
36+
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
37+
38+
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
39+
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
40+
41+
def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials)
42+
: Iterable[Token[_ <: TokenIdentifier]] =
43+
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds)
44+
45+
def getCurrentTime: Long = System.currentTimeMillis()
46+
47+
// Functions that should be in Core with Rebase to 2.3
48+
@deprecated("Moved to core in 2.2", "2.2")
49+
def getTokenRenewalInterval(
50+
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
51+
hadoopConf: Configuration): Option[Long] = {
52+
val renewIntervals = renewedTokens.filter {
53+
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
54+
}.flatMap { token =>
55+
Try {
56+
val newExpiration = token.renew(hadoopConf)
57+
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
58+
val interval = newExpiration - identifier.getIssueDate
59+
interval
60+
}.toOption
61+
}
62+
renewIntervals.reduceLeftOption(_ min _)
63+
}
64+
65+
@deprecated("Moved to core in 2.2", "2.2")
66+
def serialize(creds: Credentials): Array[Byte] = {
67+
val byteStream = new ByteArrayOutputStream
68+
val dataStream = new DataOutputStream(byteStream)
69+
creds.writeTokenStorageToStream(dataStream)
70+
byteStream.toByteArray
71+
}
72+
73+
@deprecated("Moved to core in 2.2", "2.2")
74+
def deserialize(tokenBytes: Array[Byte]): Credentials = {
75+
val creds = new Credentials()
76+
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes)))
77+
creds
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
20+
21+
import org.apache.spark.deploy.k8s.constants._
22+
import org.apache.spark.internal.Logging
23+
24+
25+
/**
26+
* This is separated out from the HadoopConf steps API because this component can be reused to
27+
* mounted the DT secret for executors as well.
28+
*/
29+
private[spark] trait KerberosTokenConfBootstrap {
30+
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
31+
// pointing to the mounted file containing the DT for Secure HDFS interaction
32+
def bootstrapMainContainerAndVolumes(
33+
originalPodWithMainContainer: PodWithMainContainer)
34+
: PodWithMainContainer
35+
}
36+
37+
private[spark] class KerberosTokenConfBootstrapImpl(
38+
secretName: String,
39+
secretItemKey: String,
40+
userName: String) extends KerberosTokenConfBootstrap with Logging{
41+
42+
43+
override def bootstrapMainContainerAndVolumes(
44+
originalPodWithMainContainer: PodWithMainContainer)
45+
: PodWithMainContainer = {
46+
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
47+
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
48+
.editOrNewSpec()
49+
.addNewVolume()
50+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
51+
.withNewSecret()
52+
.withSecretName(secretName)
53+
.endSecret()
54+
.endVolume()
55+
.endSpec()
56+
.build()
57+
val mainContainerWithMountedKerberos = new ContainerBuilder(
58+
originalPodWithMainContainer.mainContainer)
59+
.addNewVolumeMount()
60+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
61+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
62+
.endVolumeMount()
63+
.addNewEnv()
64+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
65+
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
66+
.endEnv()
67+
.addNewEnv()
68+
.withName(ENV_SPARK_USER)
69+
.withValue(userName)
70+
.endEnv()
71+
.build()
72+
originalPodWithMainContainer.copy(
73+
pod = dtMountedPod,
74+
mainContainer = mainContainerWithMountedKerberos)
75+
}
76+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ package object config extends Logging {
157157
.stringConf
158158
.createOptional
159159

160+
private[spark] val KUBERNETES_SHUFFLE_DIR =
161+
ConfigBuilder("spark.kubernetes.shuffle.dir")
162+
.doc("Path to the shared shuffle directories.")
163+
.stringConf
164+
.createOptional
165+
160166
private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI =
161167
ConfigBuilder("spark.kubernetes.shuffle.apiServer.url")
162168
.doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.")
@@ -496,6 +502,49 @@ package object config extends Logging {
496502

497503
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
498504

505+
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
506+
ConfigBuilder("spark.kubernetes.kerberos.enabled")
507+
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
508+
.booleanConf
509+
.createWithDefault(false)
510+
511+
private[spark] val KUBERNETES_KERBEROS_KEYTAB =
512+
ConfigBuilder("spark.kubernetes.kerberos.keytab")
513+
.doc("Specify the location of keytab" +
514+
" for Kerberos in order to access Secure HDFS")
515+
.stringConf
516+
.createOptional
517+
518+
private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
519+
ConfigBuilder("spark.kubernetes.kerberos.principal")
520+
.doc("Specify the principal" +
521+
" for Kerberos in order to access Secure HDFS")
522+
.stringConf
523+
.createOptional
524+
525+
private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL =
526+
ConfigBuilder("spark.kubernetes.kerberos.rewnewer.principal")
527+
.doc("Specify the principal" +
528+
" you wish to renew and retrieve your Kerberos values with")
529+
.stringConf
530+
.createOptional
531+
532+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
533+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
534+
.doc("Specify the name of the secret where " +
535+
" your existing delegation token is stored. This removes the need" +
536+
" for the job user to provide any keytab for launching a job")
537+
.stringConf
538+
.createOptional
539+
540+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
541+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey")
542+
.doc("Specify the item key of the data where " +
543+
" your existing delegation token is stored. This removes the need" +
544+
" for the job user to provide any keytab for launching a job")
545+
.stringConf
546+
.createOptional
547+
499548
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
500549
if (!rawMasterString.startsWith("k8s://")) {
501550
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

0 commit comments

Comments
 (0)