Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Secure HDFS Support #414

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ea9e516
Initial architecture design for HDFS support
ifilonenko Jul 15, 2017
47ea307
Minor styling
ifilonenko Jul 15, 2017
60a19ca
Added proper logic for mounting ConfigMaps
ifilonenko Jul 18, 2017
1d0175a
styling
ifilonenko Jul 18, 2017
163193a
modified otherKubernetesResource logic
ifilonenko Jul 18, 2017
8381fa6
fixed Integration tests and modified HADOOP_CONF_DIR variable to be F…
ifilonenko Jul 18, 2017
d4b1a68
setting HADOOP_CONF_DIR env variables
ifilonenko Jul 18, 2017
0bba092
Included integration tests for Stage 1
ifilonenko Jul 18, 2017
06df962
Initial Kerberos support
ifilonenko Jul 19, 2017
d7f54dd
initial Stage 2 architecture using deprecated 2.1 methods
ifilonenko Jul 21, 2017
d3c5a03
Added current, BROKEN, integration test environment for review
ifilonenko Jul 26, 2017
d7441ba
working hadoop cluster
ifilonenko Jul 28, 2017
04eed68
Using locks and monitors to ensure proper configs for setting up kerb…
ifilonenko Jul 29, 2017
62354eb
working Stage 2
ifilonenko Jul 31, 2017
514ac19
documentation
ifilonenko Aug 1, 2017
3fbf88c
Integration Stages 1,2 and 3
ifilonenko Aug 2, 2017
b321436
further testing work
ifilonenko Aug 2, 2017
b6912d2
fixing imports
ifilonenko Aug 2, 2017
c6b11f8
Stage 3 Integration tests pass
ifilonenko Aug 3, 2017
1e71ca7
uncommented SparkDockerBuilder
ifilonenko Aug 4, 2017
350c8ed
testing fix
ifilonenko Aug 4, 2017
5e4051c
handled comments and increased test hardening
ifilonenko Aug 8, 2017
8338fdb
Solve failing integration test problem and lower TIMEOUT time
ifilonenko Aug 9, 2017
d6d0945
modify security.authoization
ifilonenko Aug 9, 2017
e3f14e1
Modifying HADOOP_CONF flags
ifilonenko Aug 9, 2017
61a7414
Refactored tests and included modifications to pass all tests regardl…
ifilonenko Aug 15, 2017
7a0b4e4
Adding unit test and one more integration test
ifilonenko Aug 16, 2017
8dacb19
completed unit tests w/o UGI mocking
ifilonenko Aug 16, 2017
d9b7b50
cleanup and various small fixes
ifilonenko Aug 18, 2017
d53a50f
added back sparkdockerbuilder images
ifilonenko Aug 22, 2017
499b037
merge issues
ifilonenko Aug 31, 2017
ffe7891
address initial comments and scalastyle issues
ifilonenko Aug 31, 2017
6efa379
addresses comments from PR
ifilonenko Aug 31, 2017
6052a13
mocking hadoopUGI
ifilonenko Aug 31, 2017
f9ca47d
Fix executor env to include simple authn
kimoonkim Sep 1, 2017
91e364c
Merge remote-tracking branch 'bloomberg/secure-hdfs-support4' into pr…
kimoonkim Sep 1, 2017
4fe86f0
Merge pull request #1 from kimoonkim/pr-414
ifilonenko Sep 1, 2017
d2c8649
Fix a bug in executor env handling
kimoonkim Sep 1, 2017
4780878
Merge remote-tracking branch 'bloomberg/secure-hdfs-support4' into pr…
kimoonkim Sep 1, 2017
17f2702
Merge pull request #2 from kimoonkim/pr-414
ifilonenko Sep 1, 2017
b566fa9
Fix a bug in how the driver sets simple authn
kimoonkim Sep 1, 2017
726ff64
Merge pull request #3 from kimoonkim/pr-414
ifilonenko Sep 2, 2017
2d48613
handling Pr comments
ifilonenko Sep 19, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
55 changes: 55 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,61 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.enabled</code></td>
<td>false</td>
<td>
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.rewewer.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
we will set the principal to be the job users principal by default.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this is optional in case you want to use pre-existing secret and a new secret will be automatically created otherwise?

This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
created.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging

/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
* mounted as volumes and an ENV variable pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File],
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
val keyPaths = hadoopConfigFiles.map(file =>
new KeyToPathBuilder()
.withKey(file.toPath.getFileName.toString)
.withPath(file.toPath.getFileName.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key and path are exactly same. Is this intended? If yes, why not use a set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended to make my life easier when looking at the configmap and seeing the data be: core-site.xml

.build()).toList
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(hadoopUGI.getShortName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = hadoopSupportedPod,
mainContainer = mainContainerWithMountedHadoopConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier


// Function of this class is merely for mocking reasons
private[spark] class HadoopUGIUtil{
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser

def getShortName: String = getCurrentUser.getShortUserName

def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)

def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials)
: Iterable[Token[_ <: TokenIdentifier]] =
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds)

def getCurrentTime: Long = System.currentTimeMillis()

// Functions that should be in Core with Rebase to 2.3
@deprecated("Moved to core in 2.2", "2.2")
def getTokenRenewalInterval(
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
hadoopConf: Configuration): Option[Long] = {
val renewIntervals = renewedTokens.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]}
.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
interval
}.toOption}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}

@deprecated("Moved to core in 2.2", "2.2")
def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

@deprecated("Moved to core in 2.2", "2.2")
def deserialize(tokenBytes: Array[Byte]): Credentials = {
val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes)))
creds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenConfBootstrap {
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
// pointing to the mounted file containing the DT for Secure HDFS interaction
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class KerberosTokenConfBootstrapImpl(
secretName: String,
secretItemKey: String,
userName: String) extends KerberosTokenConfBootstrap with Logging{


override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedKerberos = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(userName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = dtMountedPod,
mainContainer = mainContainerWithMountedKerberos)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

/**
* The purpose of this case class is so that we can package together
* the driver pod with its container so we can bootstrap and modify
* the class instead of each component seperately
*/
private[spark] case class PodWithMainContainer(
pod: Pod,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,49 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_KERBEROS_SUPPORT =
ConfigBuilder("spark.kubernetes.kerberos.enabled")
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_KERBEROS_KEYTAB =
ConfigBuilder("spark.kubernetes.kerberos.keytab")
.doc("Specify the location of keytab" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.principal")
.doc("Specify the principal" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.rewnewer.principal")
.doc("Specify the principal" +
" you wish to renew and retrieve your Kerberos values with")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
.doc("Specify the name of the secret where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey")
.doc("Specify the item key of the data where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Loading