Skip to content

[SPARK-25825][K8S][WIP] Enable token renewal for both --keytab and tokenSecret #22915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ private[spark] class SparkSubmit extends Logging {
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (clusterManager == YARN || clusterManager == LOCAL ||
clusterManager == KUBERNETES || isMesosClient) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager(
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull
protected val principal = sparkConf.get(PRINCIPAL).orNull
protected val keytab = sparkConf.get(KEYTAB).orNull

require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
Expand All @@ -81,8 +81,8 @@ private[spark] class HadoopDelegationTokenManager(
logDebug("Using the following builtin delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

private var renewalExecutor: ScheduledExecutorService = _
private val driverRef = new AtomicReference[RpcEndpointRef]()
protected var renewalExecutor: ScheduledExecutorService = _
protected val driverRef = new AtomicReference[RpcEndpointRef]()

/** Set the endpoint used to send tokens to the driver. */
def setDriverRef(ref: RpcEndpointRef): Unit = {
Expand Down
13 changes: 11 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -884,15 +884,24 @@ specific to Spark on Kubernetes.
<td>(none)</td>
<td>
Specify the local file that contains the driver [pod template](#pod-template). For example
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml</code>
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the executor [pod template](#pod-template). For example
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml</code>
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokenSecret.renewal</code></td>
<td>false</td>
<td>
Enabling the driver to watch the secret specified at
<code>spark.kubernetes.kerberos.tokenSecret.name</code> for updates so that the tokens can be
propagated to the executors.
</td>
</tr>
</table>
Expand Down
60 changes: 44 additions & 16 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -706,22 +706,6 @@ The following options provides finer-grained control for this feature:
</tr>
</table>

## Long-Running Applications

Long-running applications may run into issues if their run time exceeds the maximum delegation
token lifetime configured in services it needs to access.

Spark supports automatically creating new tokens for these applications when running in YARN mode.
Kerberos credentials need to be provided to the Spark application via the `spark-submit` command,
using the `--principal` and `--keytab` parameters.

The provided keytab will be copied over to the machine running the Application Master via the Hadoop
Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
with encryption, at least.

The Kerberos login will be periodically renewed using the provided credentials, and new delegation
tokens for supported will be created.

## Secure Interaction with Kubernetes

When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
Expand Down Expand Up @@ -798,6 +782,50 @@ achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing Co
local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
<HDFS_FILE_LOCATION>
```

## Long-Running Applications

Long-running applications may run into issues if their run time exceeds the maximum delegation
token lifetime configured in services it needs to access.

Spark supports automatically creating new tokens for these applications when running in YARN, Mesos, and Kubernetes modes.
If one wishes to launch the renewal thread in the Driver, Kerberos credentials need to be provided to the Spark application
via the `spark-submit` command, using the `--principal` and `--keytab` parameters.

The provided keytab will be copied over to the machine running the Application Master via the Hadoop
Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
with encryption, at least.

The Kerberos login will be periodically renewed using the provided credentials, and new delegation
tokens for supported will be created.

#### Long-Running Kerberos in Kubernetes

This section addresses the additional feature added uniquely to Kubernetes. If you are running an external token service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, let's make a deal.

You point me at an existing service that does this, that anyone can download and use. It doesn't even need to be open source, it just needs to have a well defined interface that we can actually write code for or document. And then you write according to that service's interface.

Then I'll stop saying that this service does not exist.

It does not matter how many times you talk about this service if that service is not defined. You need to provide something that people can use, not give them hand-wavy explanations and leave it to them to figure out how to implement this service or where to find it.

So please, until this service actually exists in some form, please refrain from mentioning it in Spark's documentation and building things based on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that such a service can run in a variety of ways, so I thought it was a matter of defining what the resulting secret would look like. We wrote an example external service in our deprecated-fork to give an example of how such a service would function like: apache-spark-on-k8s#453. In essence, using a service keytab it should aquire delegation tokens bounded to the job-users principle. and place the contents in the secret as a new data-item. For us internally, and other companies running their own unique external renewal services. we might have varying implementations, but I just want to have a well-defined spec of the resulting secret, so I am just experimenting with a WIP spec below.

However, it clearly seems necessary to define how such a service should function as well. Would that be sufficient? Sadly, that would still be a bit hand-wavy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want us to support internal proprietary services. If a company has an internal proprietary service for this, they can also spend their own resources to customize Spark so it does what they need.

We (the Spark community) cannot maintain compatibility and make sure code keeps working with things we don't know exist. Which is why I've always complained when you mention this service.

it clearly seems necessary to define how such a service should function as well. that would still be a bit hand-wavy.

It can't be hand wavy. Otherwise if someone tells us their own custom implementation of that service doesn't work, whose fault it is?

Again, we just can't write code against something that does not exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea here is to create a command line tool in Spark to implement this functionality. Users could use that tool to do what it is you're trying to achieve here. It can be k8s-specific - lots of features start that way.

Then there is a clear protocol between the running app and this external entity, and people can even create a simple service around this tool if they'd like. And you avoid the whole "who's actually using this stuff" question.

Copy link
Contributor

@skonto skonto Nov 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API should be defined first but It would be good to have a reference implementation of this service targeting K8s, that could be run out of the box. End-user wants the whole thing most of the time. This should be part of Spark since this way we make sure we keep things updated all the time.
I like the server implementation in the fork and btw this functionality should be backported to the spark operator project as well. Actually an operator on K8s is for managing these kind of things and I am wondering if it should have been part of the Spark project, like the mesos dispatcher is. On the other hand, the unfortunate thing is that ideally resource managers should be separate projects but the separation never happened. I am not sure how a cli tool would help here, we need a pod service right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an open source implementation that you want to add to Spark, by all means, do it!

The CLI tool was a suggestion to work around the fact that a bunch of code and documentation references this thing that doesn't exist. The CLI tool would make it exist.

that updates the secrets containing the Delegation Token for both the Driver and Executors to use, the ability for the
executors to be updated with the secrets will be handled via a Watcher thread setup by the Driver. This Watcher thread
will be launched only when you enable the `spark.kubernetes.kerberos.tokenSecret.renewal` config. This Watcher thread will
be responsible for detecting updates that happen to the secret,defined at `spark.kubernetes.kerberos.tokenSecret.name`.

The contract that an external token service must have with this secret, is that the secret must be defined with the following
specifications:

```yaml
kind: Secret
metadata:
name: YOUR_SECRET_NAME
namespace: YOUR_NAMESPACE
type: Opaque
data:
spark.kubernetes.dt-CREATION_TIME-RENEWAL_TIME: YOUR_TOKEN_DATA
```

where `YOUR_SECRET_NAME` is the value of `spark.kubernetes.kerberos.tokenSecret.name`, `YOUR_NAMESPACE` is the namespace
in which the Driver and Executor are running, `CREATION_TIME` and `RENEWAL_TIME` are times related to UNIX timestamps
defined by the time when the secrets are created and when the next time it should be renewed, respectively, and
`YOUR_TOKEN_DATA` is Base.64() data containing your delegation token. The Driver Watcher thread will automatically pick up
the data given these specifications and find the most recent token based on the `CREATION_TIME`.

# Event Logging

If your applications are using event logging, the directory where the event logs go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL =
ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal")
.doc("Enabling the driver to watch the secret specified at " +
"spark.kubernetes.kerberos.tokenSecret.name for updates so that the " +
"tokens can be propagated to the executors.")
.booleanConf
.createWithDefault(false)


val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ private[spark] object Constants {
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"

def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(conf, hConf)
new KubernetesHadoopDelegationTokenManager(conf, hConf, None)

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,105 @@

package org.apache.spark.deploy.k8s.security

import java.io.{ByteArrayInputStream, DataInputStream}
import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.Secret
import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens

/**
* Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
*/
private[spark] class KubernetesHadoopDelegationTokenManager(
_sparkConf: SparkConf,
_hadoopConf: Configuration)
_hadoopConf: Configuration,
kubernetesClient: Option[KubernetesClient])
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {

def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

private val isTokenRenewalEnabled =
_sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL)

private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)

if (isTokenRenewalEnabled) {
require(dtSecretName.isDefined,
"Must specify the token secret which the driver must watch for updates")
}

private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = {
val byteStream = new ByteArrayInputStream(data)
val dataStream = new DataInputStream(byteStream)
credentials.readTokenStorageStream(dataStream)
}

private var watch: Watch = _

/**
* As in HadoopDelegationTokenManager this starts the token renewer.
* Upon start, if a principal and keytab are defined, the renewer will:
*
* - log in the configured principal, and set up a task to keep that user's ticket renewed
* - obtain delegation tokens from all available providers
* - send the tokens to the driver, if it's already registered
* - schedule a periodic task to update the tokens when needed.
*
* In the case that the principal is NOT configured, one may still service a long running
* app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service
* to populate a secret with valid Delegation Tokens that the application will then use.
* This is possibly via the use of a Secret watcher which the driver will leverage to
* detect updates that happen to the secret so that it may retrieve that secret's contents
* and send it to all expiring executors
*
* @return The newly logged in user, or null
*/
override def start(): UserGroupInformation = {
val driver = driverRef.get()
if (isTokenRenewalEnabled &&
kubernetesClient.isDefined && driver != null) {
watch = kubernetesClient.get
.secrets()
.inNamespace(_sparkConf.get(KUBERNETES_NAMESPACE))
.withName(dtSecretName.get)
.watch(new Watcher[Secret] {
override def onClose(cause: KubernetesClientException): Unit =
logInfo("Ending the watch of DT Secret")
override def eventReceived(action: Watcher.Action, resource: Secret): Unit = {
action match {
case Action.ADDED | Action.MODIFIED =>
logInfo("Secret update")
val dataItems = resource.getData.asScala.filterKeys(
_.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted
val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None
latestToken.foreach {
case (_, data) =>
val credentials = new Credentials
deserialize(credentials, Base64.decodeBase64(data))
val tokens = SparkHadoopUtil.get.serialize(credentials)
driver.send(UpdateDelegationTokens(tokens))
}
}
}
})
null
} else {
super.start()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
sc,
kubernetesClient,
requestExecutorsService,
snapshotsStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,26 @@ import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
sc: SparkContext,
kubernetesClient: KubernetesClient,
requestExecutorsService: ExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
Expand Down Expand Up @@ -123,7 +126,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
}

override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
Some(new KubernetesHadoopDelegationTokenManager(conf,
sc.hadoopConfiguration,
Some(kubernetesClient)))
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
Expand All @@ -44,6 +44,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
@Mock
private var rpcEnv: RpcEnv = _

@Mock
private var scEnv: SparkEnv = _

@Mock
private var driverEndpointRef: RpcEndpointRef = _

Expand Down Expand Up @@ -82,13 +85,15 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(taskScheduler.sc).thenReturn(sc)
when(sc.conf).thenReturn(sparkConf)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
when(sc.env).thenReturn(scEnv)
when(scEnv.rpcEnv).thenReturn(rpcEnv)
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
rpcEnv,
sc,
kubernetesClient,
requestExecutorsService,
eventQueue,
Expand Down