Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy

import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
Expand Down Expand Up @@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
// Kerberos is not supported in standalone mode, and keytab support is not yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}

if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.security

import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull

// The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is
// needed later on, the code will check that it exists.
private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull

require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")

private val delegationTokenProviders = loadProviders()
logDebug("Using the following builtin delegation token providers: " +
Expand Down Expand Up @@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(

private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ private[spark] object Utils extends Logging {
*/
val DEFAULT_MAX_TO_STRING_FIELDS = 25

/** Scheme used for files that are locally available on worker nodes in the cluster. */
val LOCAL_SCHEME = "local"

private[spark] def maxNumToStringFields = {
if (SparkEnv.get != null) {
SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
Expand Down Expand Up @@ -2877,6 +2880,11 @@ private[spark] object Utils extends Logging {
def isClientMode(conf: SparkConf): Boolean = {
"client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
}

/** Returns whether the URI is a "local:" URI. */
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}
}

private[util] object CallerContext extends Logging {
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ object KubernetesIntegrationTests {
s"-Dspark.kubernetes.test.unpackSparkDir=$sparkHome"
),
// Force packaging before building images, so that the latest code is tested.
dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly).value
dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly)
.dependsOn(packageBin in Compile in examples).value
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,22 @@ private[spark] object Constants {
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
val HADOOP_CONF_VOLUME = "hadoop-properties"
val KRB_FILE_VOLUME = "krb5-file"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KRB_FILE_DIR_PATH = "/etc"
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB5_CONFIG_MAP_NAME =
"spark.kubernetes.executor.krb5ConfigMapName"

// Kerberos Configuration
val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
val KERBEROS_DT_SECRET_NAME =
"spark.kubernetes.kerberos.dt-secret-name"
val KERBEROS_DT_SECRET_KEY =
"spark.kubernetes.kerberos.dt-secret-key"
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry
Expand Down Expand Up @@ -82,9 +81,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](

def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"

def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(conf, hConf)

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

private[spark] case class SparkPod(pod: Pod, container: Container)
private[spark] case class SparkPod(pod: Pod, container: Container) {

/**
* Convenience method to apply a series of chained transformations to a pod.
*
* Use it like:
*
* original.modify { case pod =>
* // update pod and return new one
* }.modify { case pod =>
* // more changes that create a new pod
* }.modify {
* case pod if someCondition => // new pod
* }
*
* This makes it cleaner to apply multiple transformations, avoiding having to create
* a bunch of awkwardly-named local variables. Since the argument is a partial function,
* it can do matching without needing to exhaust all the possibilities. If the function
* is not applied, then the original pod will be kept.
*/
def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because I started to get tired of code like the following:

val someIntermediateName = someOption.map { blah =>
   // create the updated pod
}.getOrElse(previousPodName)

// lather, rinse, repeat

To me that's hard to follow and brittle, and this pattern makes things clearer IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that this change is out of the scope of this PR, but I do love the use of a PartialFunction here. Thanks for this!


}


private[spark] object SparkPod {
def initialPod(): SparkPod = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private[spark] class BasicDriverFeatureStep(
.withContainerPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(driverCustomEnvs.asJava)
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ private[spark] class BasicExecutorFeatureStep(
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
.endResources()
.addNewEnv()
.withName(ENV_SPARK_USER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you noted that this is always done across resource managers. What is the reason for that, just wondering? as I introduced it exclusively in the HadoopSteps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do this, whatever is the OS user in the container will become the identity used to talk to Hadoop services (when kerberos is not on).

In YARN, for example, that would be the "yarn" user.

In k8s, with the current image, that would be "root".

You probably don't want that by default. We're talking about non-secured Hadoop here, so users can easily override this stuff, but by default let's at least try to identify the user correctly.

.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava)
.addToArgs("executor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config._

/**
* Delegation token support for Spark apps on kubernetes.
*
* When preparing driver resources, this step will generate delegation tokens for the app if
* they're needed.
*
* When preparing pods, this step will mount the delegation token secret (either pre-defined,
* or generated by this step when preparing the driver).
*/
private[spark] class DelegationTokenFeatureStep(conf: KubernetesConf[_], isDriver: Boolean)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the recent discussion in the sig meeting I was thinking about this guy. I think it may not be necessary to mount this secret in executors, and let the driver propagate the tokens to executors through its normal means.

e.g. later if/when adding code to monitor delegation tokens for updates, that would mean only the driver has to do it.

I'll git this a try; if it works, it might be feasible to re-merge this code with the kerberos step.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I meant above when I said that the HadoopKerberosLogin logic could be deleted. The assumption here is that the secret should not be created as the keytab will use the HadoopDelegationTokenManager logic. The only secret that should be created would be the keytab. However, I personally thought that we should point to a secretName that is either the delegationToken or the keytab. Hence why I suggested that the secretName and secretItemKey remain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're missing one use case there. There are 3 use cases:

  • keytab: keytab is provided to driver, driver handles kerberos login, creates tokens, distributes tokens to executors
  • pre-defined token secret: secret is mounted in the driver, env variable is set, driver loads them and distributes to executors
  • default kerberos case: submission client generates delegation tokens, creates a secret for them, then this behaves like the bullet above

The third use case is actually the most common. In your reply above you're only covering the other two. My code covers all three.

I'm just saying that this code actually doesn't need to do anything on the executor side, because the driver takes care of everything when the credentials are provided.

extends KubernetesFeatureConfigStep {

private val existingSecret = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
private val existingItemKey = conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
private val shouldCreateTokens = isDriver && !conf.sparkConf.contains(KEYTAB) &&
existingSecret.isEmpty && isSecurityEnabled

KubernetesUtils.requireBothOrNeitherDefined(
existingSecret,
existingItemKey,
"If a secret data item-key where the data of the Kerberos Delegation Token is specified" +
" you must also specify the name of the secret",
"If a secret storing a Kerberos Delegation Token is specified you must also" +
" specify the item-key where the data is stored")

private def dtSecretName: String = s"${conf.appResourceNamePrefix}-delegation-tokens"

override def configurePod(pod: SparkPod): SparkPod = {
pod.transform { case pod if shouldCreateTokens | existingSecret.isDefined =>
val secretName = existingSecret.getOrElse(dtSecretName)
val itemKey = existingItemKey.getOrElse(KERBEROS_SECRET_KEY)

val podWithTokens = new PodBuilder(pod.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()

val containerWithTokens = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
.endEnv()
.build()

SparkPod(podWithTokens, containerWithTokens)
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (shouldCreateTokens) {
val tokenData = createDelegationTokens()
Seq(new SecretBuilder()
.withNewMetadata()
.withName(dtSecretName)
.endMetadata()
.addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData))
.build())
} else {
Nil
}
}

// Visible for testing.
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled()

// Visible for testing.
def createDelegationTokens(): Array[Byte] = {
val tokenManager = new HadoopDelegationTokenManager(conf.sparkConf,
SparkHadoopUtil.get.newConfiguration(conf.sparkConf))
val creds = UserGroupInformation.getCurrentUser().getCredentials()
tokenManager.obtainDelegationTokens(creds)
SparkHadoopUtil.get.serialize(creds)
}

}
Loading