Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy

import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
Expand Down Expand Up @@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
// Kerberos is not supported in standalone mode, and keytab support is not yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}

if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.security

import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull

// The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is
// needed later on, the code will check that it exists.
private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull

require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")

private val delegationTokenProviders = loadProviders()
logDebug("Using the following builtin delegation token providers: " +
Expand Down Expand Up @@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(

private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/** Scheme used for files that are locally available on worker nodes in the cluster. */
val LOCAL_SCHEME = "local"

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging {
def isClientMode(conf: SparkConf): Boolean = {
"client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
}

/** Returns whether the URI is a "local:" URI. */
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,22 @@ private[spark] object Constants {
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
val HADOOP_CONF_VOLUME = "hadoop-properties"
val KRB_FILE_VOLUME = "krb5-file"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KRB_FILE_DIR_PATH = "/etc"
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB5_CONFIG_MAP_NAME =
"spark.kubernetes.executor.krb5ConfigMapName"

// Kerberos Configuration
val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
val KERBEROS_DT_SECRET_NAME =
"spark.kubernetes.kerberos.dt-secret-name"
val KERBEROS_DT_SECRET_KEY =
"spark.kubernetes.kerberos.dt-secret-key"
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {

def appName: String = get("spark.app.name", "spark")

def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"

def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"

def namespace: String = get(KUBERNETES_NAMESPACE)

def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

private[spark] case class SparkPod(pod: Pod, container: Container)
private[spark] case class SparkPod(pod: Pod, container: Container) {

/**
* Convenience method to apply a series of chained transformations to a pod.
*
* Use it like:
*
* original.modify { case pod =>
* // update pod and return new one
* }.modify { case pod =>
* // more changes that create a new pod
* }.modify {
* case pod if someCondition => // new pod
* }
*
* This makes it cleaner to apply multiple transformations, avoiding having to create
* a bunch of awkwardly-named local variables. Since the argument is a partial function,
* it can do matching without needing to exhaust all the possibilities. If the function
* is not applied, then the original pod will be kept.
*/
def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because I started to get tired of code like the following:

val someIntermediateName = someOption.map { blah =>
   // create the updated pod
}.getOrElse(previousPodName)

// lather, rinse, repeat

To me that's hard to follow and brittle, and this pattern makes things clearer IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that this change is out of the scope of this PR, but I do love the use of a PartialFunction here. Thanks for this!


}


private[spark] object SparkPod {
def initialPod(): SparkPod = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.withContainerPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(driverCustomEnvs.asJava)
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep(
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
.endResources()
.addNewEnv()
.withName(ENV_SPARK_USER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you noted that this is always done across resource managers. What is the reason for that, just wondering? as I introduced it exclusively in the HadoopSteps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do this, whatever is the OS user in the container will become the identity used to talk to Hadoop services (when kerberos is not on).

In YARN, for example, that would be the "yarn" user.

In k8s, with the current image, that would be "root".

You probably don't want that by default. We're talking about non-secured Hadoop here, so users can easily override this stuff, but by default let's at least try to identify the user correctly.

.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava)
.addToArgs("executor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import java.io.File
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

/**
* Mounts the Hadoop configuration - either a pre-defined config map, or a local configuration
* directory - on the driver pod.
*/
private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)

KubernetesUtils.requireNandDefined(
confDir,
existingConfMap,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous")

private lazy val confFiles: Seq[File] = {
val dir = new File(confDir.get)
if (dir.isDirectory) {
dir.listFiles.filter(_.isFile).toSeq
} else {
Nil
}
}

private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"

private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasHadoopConf =>
val confVolume = if (confDir.isDefined) {
val keyPaths = confFiles.map { file =>
new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build()
}
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
} else {
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.build()
}

val podWithConf = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(confVolume)
.endVolume()
.endSpec()
.build()

val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(HADOOP_CONF_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()

SparkPod(podWithConf, containerWithMount)
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
val fileMap = confFiles.map { file =>
(file.getName(), Files.toString(file, StandardCharsets.UTF_8))
}.toMap.asJava

Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(newConfigMapName)
.endMetadata()
.addToData(fileMap)
.build())
} else {
Nil
}
}

}

This file was deleted.

This file was deleted.

Loading