diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 88df7324a354a..1702f7d31c42a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -335,7 +335,8 @@ private[spark] class SparkSubmit extends Logging {
val targetDir = Utils.createTempDir()
// assure a keytab is available from any place in a JVM
- if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
+ if (clusterManager == YARN || clusterManager == LOCAL ||
+ clusterManager == KUBERNETES || isMesosClient) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 10cd8742f2b49..321f34a85acea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -70,8 +70,8 @@ private[spark] class HadoopDelegationTokenManager(
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
- private val principal = sparkConf.get(PRINCIPAL).orNull
- private val keytab = sparkConf.get(KEYTAB).orNull
+ protected val principal = sparkConf.get(PRINCIPAL).orNull
+ protected val keytab = sparkConf.get(KEYTAB).orNull
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
@@ -81,8 +81,8 @@ private[spark] class HadoopDelegationTokenManager(
logDebug("Using the following builtin delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")
- private var renewalExecutor: ScheduledExecutorService = _
- private val driverRef = new AtomicReference[RpcEndpointRef]()
+ protected var renewalExecutor: ScheduledExecutorService = _
+ protected val driverRef = new AtomicReference[RpcEndpointRef]()
/** Set the endpoint used to send tokens to the driver. */
def setDriverRef(ref: RpcEndpointRef): Unit = {
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 2917197a2e2ec..aa09a3573a763 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -884,7 +884,7 @@ specific to Spark on Kubernetes.
(none) |
Specify the local file that contains the driver [pod template](#pod-template). For example
- spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`
+ spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml
|
@@ -892,7 +892,16 @@ specific to Spark on Kubernetes.
(none) |
Specify the local file that contains the executor [pod template](#pod-template). For example
- spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`
+ spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml
+ |
+
+
+ spark.kubernetes.kerberos.tokenSecret.renewal |
+ false |
+
+ Enabling the driver to watch the secret specified at
+ spark.kubernetes.kerberos.tokenSecret.name for updates so that the tokens can be
+ propagated to the executors.
|
diff --git a/docs/security.md b/docs/security.md
index ffae683df6256..a23117f6ea9a2 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -706,22 +706,6 @@ The following options provides finer-grained control for this feature:
-## Long-Running Applications
-
-Long-running applications may run into issues if their run time exceeds the maximum delegation
-token lifetime configured in services it needs to access.
-
-Spark supports automatically creating new tokens for these applications when running in YARN mode.
-Kerberos credentials need to be provided to the Spark application via the `spark-submit` command,
-using the `--principal` and `--keytab` parameters.
-
-The provided keytab will be copied over to the machine running the Application Master via the Hadoop
-Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
-with encryption, at least.
-
-The Kerberos login will be periodically renewed using the provided credentials, and new delegation
-tokens for supported will be created.
-
## Secure Interaction with Kubernetes
When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
@@ -798,6 +782,50 @@ achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing Co
local:///opt/spark/examples/jars/spark-examples_.jar \
```
+
+## Long-Running Applications
+
+Long-running applications may run into issues if their run time exceeds the maximum delegation
+token lifetime configured in services it needs to access.
+
+Spark supports automatically creating new tokens for these applications when running in YARN, Mesos, and Kubernetes modes.
+If one wishes to launch the renewal thread in the Driver, Kerberos credentials need to be provided to the Spark application
+via the `spark-submit` command, using the `--principal` and `--keytab` parameters.
+
+The provided keytab will be copied over to the machine running the Application Master via the Hadoop
+Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured
+with encryption, at least.
+
+The Kerberos login will be periodically renewed using the provided credentials, and new delegation
+tokens for supported will be created.
+
+#### Long-Running Kerberos in Kubernetes
+
+This section addresses the additional feature added uniquely to Kubernetes. If you are running an external token service
+that updates the secrets containing the Delegation Token for both the Driver and Executors to use, the ability for the
+executors to be updated with the secrets will be handled via a Watcher thread setup by the Driver. This Watcher thread
+will be launched only when you enable the `spark.kubernetes.kerberos.tokenSecret.renewal` config. This Watcher thread will
+be responsible for detecting updates that happen to the secret,defined at `spark.kubernetes.kerberos.tokenSecret.name`.
+
+The contract that an external token service must have with this secret, is that the secret must be defined with the following
+specifications:
+
+```yaml
+kind: Secret
+metadata:
+ name: YOUR_SECRET_NAME
+ namespace: YOUR_NAMESPACE
+type: Opaque
+data:
+ spark.kubernetes.dt-CREATION_TIME-RENEWAL_TIME: YOUR_TOKEN_DATA
+```
+
+where `YOUR_SECRET_NAME` is the value of `spark.kubernetes.kerberos.tokenSecret.name`, `YOUR_NAMESPACE` is the namespace
+in which the Driver and Executor are running, `CREATION_TIME` and `RENEWAL_TIME` are times related to UNIX timestamps
+defined by the time when the secrets are created and when the next time it should be renewed, respectively, and
+`YOUR_TOKEN_DATA` is Base.64() data containing your delegation token. The Driver Watcher thread will automatically pick up
+the data given these specifications and find the most recent token based on the `CREATION_TIME`.
+
# Event Logging
If your applications are using event logging, the directory where the event logs go
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 862f1d63ed39f..b277da0fcd1ad 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -262,6 +262,15 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_KERBEROS_DT_SECRET_RENEWAL =
+ ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.renewal")
+ .doc("Enabling the driver to watch the secret specified at " +
+ "spark.kubernetes.kerberos.tokenSecret.name for updates so that the " +
+ "tokens can be propagated to the executors.")
+ .booleanConf
+ .createWithDefault(false)
+
+
val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 1c6d53c16871e..f7b1d7b6a622a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -109,6 +109,7 @@ private[spark] object Constants {
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
+ val SECRET_DATA_ITEM_PREFIX_TOKENS = "spark.kubernetes.dt-"
// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 066547dcbb408..26c898e44df69 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -78,7 +78,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
- new KubernetesHadoopDelegationTokenManager(conf, hConf)
+ new KubernetesHadoopDelegationTokenManager(conf, hConf, None)
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
index 3e98d5811d83f..cd0747f54cc47 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
@@ -17,21 +17,105 @@
package org.apache.spark.deploy.k8s.security
+import java.io.{ByteArrayInputStream, DataInputStream}
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.Secret
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watch, Watcher}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.Watcher.Action
+import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
/**
* Adds Kubernetes-specific functionality to HadoopDelegationTokenManager.
*/
private[spark] class KubernetesHadoopDelegationTokenManager(
_sparkConf: SparkConf,
- _hadoopConf: Configuration)
+ _hadoopConf: Configuration,
+ kubernetesClient: Option[KubernetesClient])
extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
+ private val isTokenRenewalEnabled =
+ _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_RENEWAL)
+
+ private val dtSecretName = _sparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
+
+ if (isTokenRenewalEnabled) {
+ require(dtSecretName.isDefined,
+ "Must specify the token secret which the driver must watch for updates")
+ }
+
+ private def deserialize(credentials: Credentials, data: Array[Byte]): Unit = {
+ val byteStream = new ByteArrayInputStream(data)
+ val dataStream = new DataInputStream(byteStream)
+ credentials.readTokenStorageStream(dataStream)
+ }
+
+ private var watch: Watch = _
+
+ /**
+ * As in HadoopDelegationTokenManager this starts the token renewer.
+ * Upon start, if a principal and keytab are defined, the renewer will:
+ *
+ * - log in the configured principal, and set up a task to keep that user's ticket renewed
+ * - obtain delegation tokens from all available providers
+ * - send the tokens to the driver, if it's already registered
+ * - schedule a periodic task to update the tokens when needed.
+ *
+ * In the case that the principal is NOT configured, one may still service a long running
+ * app by enabling the KERBEROS_SECRET_RENEWER config and relying on an external service
+ * to populate a secret with valid Delegation Tokens that the application will then use.
+ * This is possibly via the use of a Secret watcher which the driver will leverage to
+ * detect updates that happen to the secret so that it may retrieve that secret's contents
+ * and send it to all expiring executors
+ *
+ * @return The newly logged in user, or null
+ */
+ override def start(): UserGroupInformation = {
+ val driver = driverRef.get()
+ if (isTokenRenewalEnabled &&
+ kubernetesClient.isDefined && driver != null) {
+ watch = kubernetesClient.get
+ .secrets()
+ .inNamespace(_sparkConf.get(KUBERNETES_NAMESPACE))
+ .withName(dtSecretName.get)
+ .watch(new Watcher[Secret] {
+ override def onClose(cause: KubernetesClientException): Unit =
+ logInfo("Ending the watch of DT Secret")
+ override def eventReceived(action: Watcher.Action, resource: Secret): Unit = {
+ action match {
+ case Action.ADDED | Action.MODIFIED =>
+ logInfo("Secret update")
+ val dataItems = resource.getData.asScala.filterKeys(
+ _.startsWith(SECRET_DATA_ITEM_PREFIX_TOKENS)).toSeq.sorted
+ val latestToken = if (dataItems.nonEmpty) Some(dataItems.max) else None
+ latestToken.foreach {
+ case (_, data) =>
+ val credentials = new Credentials
+ deserialize(credentials, Base64.decodeBase64(data))
+ val tokens = SparkHadoopUtil.get.serialize(credentials)
+ driver.send(UpdateDelegationTokens(tokens))
+ }
+ }
+ }
+ })
+ null
+ } else {
+ super.start()
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index ce10f766334ff..739766a17cb47 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -110,7 +110,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
- sc.env.rpcEnv,
+ sc,
kubernetesClient,
requestExecutorsService,
snapshotsStore,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c479bbf..e20f0dc812674 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -21,7 +21,10 @@ import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}
+import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
@@ -29,7 +32,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
- rpcEnv: RpcEnv,
+ sc: SparkContext,
kubernetesClient: KubernetesClient,
requestExecutorsService: ExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -37,7 +40,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
- extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
@@ -123,7 +126,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
- new KubernetesDriverEndpoint(rpcEnv, properties)
+ new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+ }
+
+ override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = {
+ Some(new KubernetesHadoopDelegationTokenManager(conf,
+ sc.hadoopConfiguration,
+ Some(kubernetesClient)))
}
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12dbaf06..92c9383a97f2f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -23,7 +23,7 @@ import org.mockito.Matchers.{eq => mockitoEq}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -44,6 +44,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
@Mock
private var rpcEnv: RpcEnv = _
+ @Mock
+ private var scEnv: SparkEnv = _
+
@Mock
private var driverEndpointRef: RpcEndpointRef = _
@@ -82,13 +85,15 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(taskScheduler.sc).thenReturn(sc)
when(sc.conf).thenReturn(sparkConf)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
+ when(sc.env).thenReturn(scEnv)
+ when(scEnv.rpcEnv).thenReturn(rpcEnv)
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
taskScheduler,
- rpcEnv,
+ sc,
kubernetesClient,
requestExecutorsService,
eventQueue,