Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 569f73c

Browse files
ifilonenkoerikerlandson
authored andcommitted
Secure HDFS Support (#414)
* Initial architecture design for HDFS support * Minor styling * Added proper logic for mounting ConfigMaps * styling * modified otherKubernetesResource logic * fixed Integration tests and modified HADOOP_CONF_DIR variable to be FILE_DIR for Volume mount * setting HADOOP_CONF_DIR env variables * Included integration tests for Stage 1 * Initial Kerberos support * initial Stage 2 architecture using deprecated 2.1 methods * Added current, BROKEN, integration test environment for review * working hadoop cluster * Using locks and monitors to ensure proper configs for setting up kerberized cluster in integration tests * working Stage 2 * documentation * Integration Stages 1,2 and 3 * further testing work * fixing imports * Stage 3 Integration tests pass * uncommented SparkDockerBuilder * testing fix * handled comments and increased test hardening * Solve failing integration test problem and lower TIMEOUT time * modify security.authoization * Modifying HADOOP_CONF flags * Refactored tests and included modifications to pass all tests regardless of environment * Adding unit test and one more integration test * completed unit tests w/o UGI mocking * cleanup and various small fixes * added back sparkdockerbuilder images * address initial comments and scalastyle issues * addresses comments from PR * mocking hadoopUGI * Fix executor env to include simple authn * Fix a bug in executor env handling * Fix a bug in how the driver sets simple authn * handling Pr comments
1 parent 5c29bf8 commit 569f73c

File tree

72 files changed

+3503
-156
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+3503
-156
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
575575
}
576576

577577
// assure a keytab is available from any place in a JVM
578-
if (clusterManager == YARN || clusterManager == LOCAL) {
578+
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
579579
if (args.principal != null) {
580580
require(args.keytab != null, "Keytab must be specified when principal is specified")
581581
if (!new File(args.keytab).exists()) {

docs/running-on-kubernetes.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,61 @@ from the other deployment modes. See the [configuration page](configuration.html
783783
</td>
784784
</tr>
785785
<tr>
786+
<td><code>spark.kubernetes.kerberos.enabled</code></td>
787+
<td>false</td>
788+
<td>
789+
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
790+
will assume that you will not require secure HDFS access.
791+
</td>
792+
</tr>
793+
<tr>
794+
<td><code>spark.kubernetes.kerberos.keytab</code></td>
795+
<td>(none)</td>
796+
<td>
797+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
798+
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
799+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
800+
will look within your local TGT cache to resolve this.
801+
</td>
802+
</tr>
803+
<tr>
804+
<td><code>spark.kubernetes.kerberos.principal</code></td>
805+
<td>(none)</td>
806+
<td>
807+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
808+
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
809+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
810+
will look within your local TGT cache to resolve this.
811+
</td>
812+
</tr>
813+
<tr>
814+
<td><code>spark.kubernetes.kerberos.rewewer.principal</code></td>
815+
<td>(none)</td>
816+
<td>
817+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
818+
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
819+
we will set the principal to be the job users principal by default.
820+
</td>
821+
</tr>
822+
<tr>
823+
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
824+
<td>(none)</td>
825+
<td>
826+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
827+
the name of the secret where your existing delegation token data is stored. You must also specify the
828+
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
829+
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
830+
created.
831+
</td>
832+
</tr>
833+
<tr>
834+
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
835+
<td>spark.kubernetes.kerberos.dt.label</td>
836+
<td>
837+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
838+
the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
839+
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
840+
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
786841
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
787842
<td>(none)</td>
788843
<td>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.io.File
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
24+
25+
import org.apache.spark.deploy.kubernetes.constants._
26+
import org.apache.spark.internal.Logging
27+
28+
/**
29+
* This is separated out from the HadoopConf steps API because this component can be reused to
30+
* set up the Hadoop Configuration for executors as well.
31+
*/
32+
private[spark] trait HadoopConfBootstrap {
33+
/**
34+
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
35+
* mounted as volumes and an ENV variable pointing to the mounted file.
36+
*/
37+
def bootstrapMainContainerAndVolumes(
38+
originalPodWithMainContainer: PodWithMainContainer)
39+
: PodWithMainContainer
40+
}
41+
42+
private[spark] class HadoopConfBootstrapImpl(
43+
hadoopConfConfigMapName: String,
44+
hadoopConfigFiles: Seq[File],
45+
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{
46+
47+
override def bootstrapMainContainerAndVolumes(
48+
originalPodWithMainContainer: PodWithMainContainer)
49+
: PodWithMainContainer = {
50+
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
51+
val keyPaths = hadoopConfigFiles.map(file =>
52+
new KeyToPathBuilder()
53+
.withKey(file.toPath.getFileName.toString)
54+
.withPath(file.toPath.getFileName.toString)
55+
.build()).toList
56+
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
57+
.editSpec()
58+
.addNewVolume()
59+
.withName(HADOOP_FILE_VOLUME)
60+
.withNewConfigMap()
61+
.withName(hadoopConfConfigMapName)
62+
.withItems(keyPaths.asJava)
63+
.endConfigMap()
64+
.endVolume()
65+
.endSpec()
66+
.build()
67+
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
68+
originalPodWithMainContainer.mainContainer)
69+
.addNewVolumeMount()
70+
.withName(HADOOP_FILE_VOLUME)
71+
.withMountPath(HADOOP_CONF_DIR_PATH)
72+
.endVolumeMount()
73+
.addNewEnv()
74+
.withName(ENV_HADOOP_CONF_DIR)
75+
.withValue(HADOOP_CONF_DIR_PATH)
76+
.endEnv()
77+
.addNewEnv()
78+
.withName(ENV_SPARK_USER)
79+
.withValue(hadoopUGI.getShortName)
80+
.endEnv()
81+
.build()
82+
originalPodWithMainContainer.copy(
83+
pod = hadoopSupportedPod,
84+
mainContainer = mainContainerWithMountedHadoopConf)
85+
}
86+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
20+
21+
import scala.util.Try
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.FileSystem
25+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
26+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
27+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
28+
29+
30+
// Function of this class is merely for mocking reasons
31+
private[spark] class HadoopUGIUtil{
32+
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
33+
34+
def getShortName: String = getCurrentUser.getShortUserName
35+
36+
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
37+
38+
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
39+
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
40+
41+
def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials)
42+
: Iterable[Token[_ <: TokenIdentifier]] =
43+
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds)
44+
45+
def getCurrentTime: Long = System.currentTimeMillis()
46+
47+
// Functions that should be in Core with Rebase to 2.3
48+
@deprecated("Moved to core in 2.2", "2.2")
49+
def getTokenRenewalInterval(
50+
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
51+
hadoopConf: Configuration): Option[Long] = {
52+
val renewIntervals = renewedTokens.filter {
53+
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]}
54+
.flatMap { token =>
55+
Try {
56+
val newExpiration = token.renew(hadoopConf)
57+
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
58+
val interval = newExpiration - identifier.getIssueDate
59+
interval
60+
}.toOption}
61+
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
62+
}
63+
64+
@deprecated("Moved to core in 2.2", "2.2")
65+
def serialize(creds: Credentials): Array[Byte] = {
66+
val byteStream = new ByteArrayOutputStream
67+
val dataStream = new DataOutputStream(byteStream)
68+
creds.writeTokenStorageToStream(dataStream)
69+
byteStream.toByteArray
70+
}
71+
72+
@deprecated("Moved to core in 2.2", "2.2")
73+
def deserialize(tokenBytes: Array[Byte]): Credentials = {
74+
val creds = new Credentials()
75+
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes)))
76+
creds
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
20+
21+
import org.apache.spark.deploy.kubernetes.constants._
22+
import org.apache.spark.internal.Logging
23+
24+
25+
/**
26+
* This is separated out from the HadoopConf steps API because this component can be reused to
27+
* mounted the DT secret for executors as well.
28+
*/
29+
private[spark] trait KerberosTokenConfBootstrap {
30+
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
31+
// pointing to the mounted file containing the DT for Secure HDFS interaction
32+
def bootstrapMainContainerAndVolumes(
33+
originalPodWithMainContainer: PodWithMainContainer)
34+
: PodWithMainContainer
35+
}
36+
37+
private[spark] class KerberosTokenConfBootstrapImpl(
38+
secretName: String,
39+
secretItemKey: String,
40+
userName: String) extends KerberosTokenConfBootstrap with Logging{
41+
42+
43+
override def bootstrapMainContainerAndVolumes(
44+
originalPodWithMainContainer: PodWithMainContainer)
45+
: PodWithMainContainer = {
46+
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
47+
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
48+
.editOrNewSpec()
49+
.addNewVolume()
50+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
51+
.withNewSecret()
52+
.withSecretName(secretName)
53+
.endSecret()
54+
.endVolume()
55+
.endSpec()
56+
.build()
57+
val mainContainerWithMountedKerberos = new ContainerBuilder(
58+
originalPodWithMainContainer.mainContainer)
59+
.addNewVolumeMount()
60+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
61+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
62+
.endVolumeMount()
63+
.addNewEnv()
64+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
65+
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
66+
.endEnv()
67+
.addNewEnv()
68+
.withName(ENV_SPARK_USER)
69+
.withValue(userName)
70+
.endEnv()
71+
.build()
72+
originalPodWithMainContainer.copy(
73+
pod = dtMountedPod,
74+
mainContainer = mainContainerWithMountedKerberos)
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{Container, Pod}
20+
21+
/**
22+
* The purpose of this case class is so that we can package together
23+
* the driver pod with its container so we can bootstrap and modify
24+
* the class instead of each component seperately
25+
*/
26+
private[spark] case class PodWithMainContainer(
27+
pod: Pod,
28+
mainContainer: Container)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,49 @@ package object config extends Logging {
542542

543543
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
544544

545+
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
546+
ConfigBuilder("spark.kubernetes.kerberos.enabled")
547+
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
548+
.booleanConf
549+
.createWithDefault(false)
550+
551+
private[spark] val KUBERNETES_KERBEROS_KEYTAB =
552+
ConfigBuilder("spark.kubernetes.kerberos.keytab")
553+
.doc("Specify the location of keytab" +
554+
" for Kerberos in order to access Secure HDFS")
555+
.stringConf
556+
.createOptional
557+
558+
private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
559+
ConfigBuilder("spark.kubernetes.kerberos.principal")
560+
.doc("Specify the principal" +
561+
" for Kerberos in order to access Secure HDFS")
562+
.stringConf
563+
.createOptional
564+
565+
private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL =
566+
ConfigBuilder("spark.kubernetes.kerberos.rewnewer.principal")
567+
.doc("Specify the principal" +
568+
" you wish to renew and retrieve your Kerberos values with")
569+
.stringConf
570+
.createOptional
571+
572+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
573+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
574+
.doc("Specify the name of the secret where " +
575+
" your existing delegation token is stored. This removes the need" +
576+
" for the job user to provide any keytab for launching a job")
577+
.stringConf
578+
.createOptional
579+
580+
private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
581+
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey")
582+
.doc("Specify the item key of the data where " +
583+
" your existing delegation token is stored. This removes the need" +
584+
" for the job user to provide any keytab for launching a job")
585+
.stringConf
586+
.createOptional
587+
545588
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
546589
if (!rawMasterString.startsWith("k8s://")) {
547590
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

0 commit comments

Comments
 (0)