Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Basic Secure HDFS Support [514] #540

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
56 changes: 55 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.enabled</code></td>
<td>false</td>
<td>
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.renewer.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as
we will set the principal to be the job users principal by default.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
created.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious. Is the token refresh server supposed to renew this pre-populated token as well? Or is it supposed to be renewed by the job user? We may want to comment on that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token refresh server is supposed to renew this pre-populated token. The assumption is that if you supply a pre-populated token it will be automatically updated by either an administrator or the token refresh server. In the later PR if you think, you should probably note this.

the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Expand Down Expand Up @@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html
Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
should be lifted in the future include:
* Applications can only run in cluster mode.
* Only Scala and Java applications can be run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.File

import scala.collection.JavaConverters._
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not consistent with the rest of the project. Order should be as follows everywhere:

  1. java.io.*
  2. Empty Space
  3. Everything that isn't java.io.* or org.apache.spark.* (this includes scala.*)
  4. Empty space
  5. org.apache.spark.*

Please look over all files and fix all imports.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious about the import order. According to http://spark.apache.org/contributing.html, the recommended import order is slightly different. scala.* and other 3rd parties libraries are separated by an empty space. Do we know which one is correct?

In addition, sort imports in the following order (use alphabetical order within each group):

  • java.* and javax.*
  • scala.*
  • Third-party libraries (org., com., etc)
  • Project classes (org.apache.spark.*)

An example from the same page:

import java.*
import javax.*
 
import scala.*
 
import *
 
import org.apache.spark.*

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @kimoonkim and I think our code is incorrect in most places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah Cool. Should we then follow the import order suggested in http://spark.apache.org/contributing.html going forward?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. We can fix the ordering as we merge upstream.


import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging

/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
* mounted as volumes and an ENV variable pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this all fit on one line?

: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File],
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a space after Logging.


override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific .xml files")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we filter .xml files only?

val keyPaths = hadoopConfigFiles.map{ file =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty space after map.

val fileStringPath = file.toPath.getFileName.toString
new KeyToPathBuilder()
.withKey(fileStringPath)
.withPath(fileStringPath)
.build() }
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indention.

.endVolume()
.endSpec()
.build()
val hadoopSupportedContainer = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(hadoopUGI.getShortName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put an empty line before the returned value.

pod = hadoopSupportedPod,
mainContainer = hadoopSupportedContainer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier


// Function of this class is merely for mocking reasons
private[spark] class HadoopUGIUtil{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a trait over this and extend the trait. Then, only mock the trait.

def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser

def getShortName: String = getCurrentUser.getShortUserName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need this method. There is only one caller. And the caller can easily just do hadoopUgiUtil.getCurrentUser.getShortUserName itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used purely for mocking

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should wrap a minimal set of methods for mocking. And we already wrap getCurrentUser, so this wrapping is unnecessary. Besides, this method name getShortName makes the caller code a bit difficult to read by masking that the short name is for the current user. The reader will question "short name of what?".


def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)

def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials)
: Iterable[Token[_ <: TokenIdentifier]] =
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return a FileSystem,the test can mock the FileSystem object, and then call addDelegationTokens on the mock FileSystem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How exactly can this be done? This has been tripping me up, as I am trying to mock this FileSystem object but with no luck (while ensuring that it passes Integration tests)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can add a method to this class like:

def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)


def getCurrentTime: Long = System.currentTimeMillis()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, getting current time does not belong to HadoopUGI. Move to some other class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, used purely for mocking of the HadoopUGI...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mocking this system interaction makes sense. I'm just pointing out that we can have multiple utility classes for mocking, and time-related methods don't seem to belong to UGI.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Clock instead.


// Functions that should be in Core with Rebase to 2.3
@deprecated("Moved to core in 2.3", "2.3")
def getTokenRenewalInterval(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 30 says "Function of this class is merely for mocking reasons". But it seems this function has real business logic, more than just mocking purpose. Move it to some other class?

renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
hadoopConf: Configuration): Option[Long] = {
val renewIntervals = renewedTokens.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
interval
}.toOption
}
renewIntervals.reduceLeftOption(_ min _)
}

@deprecated("Moved to core in 2.3", "2.3")
def serialize(creds: Credentials): Array[Byte] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. It has business logic that should be tested than just being mocked. Move to some other class?

val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writeTokenStorageToStream calling close on dataStream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handled this below

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Utils.tryWithResource. That will close even if an exception is thrown.

creds.writeTokenStorageToStream(dataStream)
dataStream.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure this is called even if creds.writeTokenStorageToStream(dataStream) throws an exception (unlikely but still worth considering). Not sure what's the best practice to do this in Scala.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

byteStream.toByteArray
}

@deprecated("Moved to core in 2.3", "2.3")
def deserialize(tokenBytes: Array[Byte]): Credentials = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Move to some other class?

val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataInputStream also needs to be closed.

creds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenConfBootstrap {
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
// pointing to the mounted file containing the DT for Secure HDFS interaction
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off here - think we want this line and the next line indented in one more.

: PodWithMainContainer
}

private[spark] class KerberosTokenConfBootstrapImpl(
secretName: String,
secretItemKey: String,
userName: String) extends KerberosTokenConfBootstrap with Logging{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after Logging.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Have one empty line instead of two?

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off here, indent in one more along with the line below.

: PodWithMainContainer = {
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
val secretMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
// TODO: ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key.
val secretMountedContainer = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized we have an edge case that this will fail. Imagine a job ran for many weeks and the refresh server added new weekly tokens. And also imagine the dynamic allocation is enabled and new executors are launching.

Those new executors should use the latest token, not the initial token. i.e. ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key.

I don't know how we can solve this yet. And we should probably address it later in a follow-up PR that we'll write for picking up the new token. Maybe add a TODO here so we don't forget this?

.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopConfBootstrapImpl also sets SPARK_USER, but to hadoopUGI.getShortName. So one will override the value set by the other, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK_USER could be different from Job User so yes we are overwriting it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we resolve it in one place and set it consistently everywhere? Right now the ordering and overwriting is ambiguous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not ambiguous as there are scenarios where the UGI could be either the Job User or taken from the TGT

.withValue(userName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = secretMountedPod,
mainContainer = secretMountedContainer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,49 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_KERBEROS_SUPPORT =
ConfigBuilder("spark.kubernetes.kerberos.enabled")
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_KERBEROS_KEYTAB =
ConfigBuilder("spark.kubernetes.kerberos.keytab")
.doc("Specify the location of keytab" +
" for Kerberos in order to access Secure HDFS")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty space at the end of the first part. Ditto below.

.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.principal")
.doc("Specify the principal" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.renewer.principal")
.doc("Specify the principal" +
" you wish to renew and retrieve your Kerberos values with")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
.doc("Specify the name of the secret where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey")
.doc("Specify the item key of the data where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Loading