Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

[WIP] Secure HDFS Support #391

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
47 changes: 47 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,53 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos</code></td>
<td>false</td>
<td>
Specify whether your job is a job that will require a Delegation Token to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
label <code>spark.kubernetes.kerberos.tokensecret.name</code> where your data is stored on the secret.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.label</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
the label within the pre-specified secret where the data of your existing delegation token data is stored.
We have a default value of <code>spark.kubernetes.kerberos.dt.label</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps mounted as volumes and an ENV variable
* pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File]) extends HadoopConfBootstrap with Logging{

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
val keyPaths = hadoopConfigFiles.map(file =>
new KeyToPathBuilder()
.withKey(file.toPath.getFileName.toString)
.withPath(file.toPath.getFileName.toString)
.build()).toList
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_FILE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(HADOOP_CONF_DIR)
.withValue(HADOOP_FILE_DIR)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = hadoopSupportedPod,
mainContainer = mainContainerWithMountedHadoopConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.kubernetes.constants._
/**
* This is separated out from hadoopsteps because this component can be reused to
* set up the Kerberos logic for executors as well.
*/
private[spark] trait KerberosConfBootstrap {
/**
* Bootstraps a main container with an ENV variable
* pointing to the data storing the DT in the secret
*/
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}
private[spark] class KerberosConfBootstrapImpl(
delegationTokenLabelName: String) extends KerberosConfBootstrap{
override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewEnv()
.withName(ENV_KERBEROS_SECRET_LABEL)
.withValue(delegationTokenLabelName)
.endEnv()
.build()
originalPodWithMainContainer.copy(mainContainer = mainContainerWithMountedHadoopConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

/**
* The purpose of this case class is so that we can package together
* the driver pod with its container so we can bootstrap and modify
* the class instead of each component seperately
*/
private[spark] case class PodWithMainContainer(
pod: Pod,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,42 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_KERBEROS_SUPPORT =
ConfigBuilder("spark.kubernetes.kerberos")
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_KERBEROS_KEYTAB =
ConfigBuilder("spark.kubernetes.kerberos.keytab")
.doc("Specify the location of keytab" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.principal")
.doc("Specify the principal" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
.doc("Specify the name of the secret where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_LABEL =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.label")
.doc("Specify the label of the data where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createWithDefault("spark.kubernetes.kerberos.dt.label")

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ package object constants {
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Hadoop Configuration
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
private[spark] val HADOOP_FILE_DIR = "/etc/hadoop"
private[spark] val HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"

// Kerberos Configuration
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
"spark.kubernetes.kerberos.dt"
private[spark] val KERBEROS_SPARK_CONF_NAME =
"spark.kubernetes.kerberos.secretlabelname"
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
"hadoop-tokens"
private[spark] val ENV_KERBEROS_SECRET_LABEL =
"KERBEROS_SECRET_LABEL"

// Miscellaneous
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ private[spark] class Client(
}

private[spark] object Client {
def run(sparkConf: SparkConf, clientArguments: ClientArguments): Unit = {
def run(sparkConf: SparkConf,
clientArguments: ClientArguments,
hadoopConfDir: Option[String]): Unit = {
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val launchTime = System.currentTimeMillis()
Expand All @@ -168,6 +170,7 @@ private[spark] object Client {
clientArguments.mainClass,
clientArguments.driverArgs,
clientArguments.otherPyFiles,
hadoopConfDir,
sparkConf)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Expand Down Expand Up @@ -195,6 +198,9 @@ private[spark] object Client {
def main(args: Array[String]): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
val sparkConf = new SparkConf()
run(sparkConf, parsedArguments)
// hadoopConfDir is passed into Client#run() to allow for us to
// test this env variable within the integration test environment
val hadoopConfDir = sys.env.get("HADOOP_CONF_DIR")
run(sparkConf, parsedArguments, hadoopConfDir)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, PythonStep}
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand All @@ -37,6 +38,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass: String,
appArgs: Array[String],
additionalPythonFiles: Seq[String],
hadoopConfDir: Option[String],
submissionSparkConf: SparkConf) {

// The resource name prefix is derived from the application name, making it easy to connect the
Expand All @@ -51,6 +53,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config"
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config"

def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
val additionalMainAppJar = mainAppResource match {
Expand Down Expand Up @@ -94,6 +97,19 @@ private[spark] class DriverConfigurationStepsOrchestrator(
submissionSparkConf)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)
val hadoopConfigSteps =
if (hadoopConfDir.isEmpty) {
Option.empty[DriverConfigurationStep]
} else {
val hadoopStepsOrchestrator = new HadoopStepsOrchestrator(
namespace,
hadoopConfigMapName,
submissionSparkConf,
hadoopConfDir)
val hadoopConfSteps =
hadoopStepsOrchestrator.getHadoopSteps()
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))
}
val pythonStep = mainAppResource match {
case PythonMainAppResource(mainPyResource) =>
Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath))
Expand Down Expand Up @@ -133,6 +149,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
kubernetesCredentialsStep,
dependencyResolutionStep) ++
initContainerBootstrapStep.toSeq ++
hadoopConfigSteps.toSeq ++
pythonStep.toSeq
}
}
Loading