diff --git a/assembly/pom.xml b/assembly/pom.xml
index 3c097bf1a23ab..cc726825c91df 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -150,6 +150,9 @@
kubernetes
+
org.apache.spark
diff --git a/pom.xml b/pom.xml
index 6063e03bb74fe..6b06738eea530 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2619,6 +2619,17 @@
+
+
+ kubernetes-hdfs-extra
+
+
+ resource-managers/kubernetes/token-refresh-server
+
+
+
kubernetes-integration-tests
diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md
index 31b721d193362..3a9b30decf0b6 100644
--- a/resource-managers/kubernetes/README.md
+++ b/resource-managers/kubernetes/README.md
@@ -28,11 +28,14 @@ building Spark normally. For example, to build Spark against Hadoop 2.7 and Kube
dev/make-distribution.sh --tgz -Phadoop-2.7 -Pkubernetes
-# Kubernetes Code Modules
+# Kubernetes Modules
Below is a list of the submodules for this cluster manager and what they do.
* `core`: Implementation of the Kubernetes cluster manager support.
+* `token-refresh-server`: Extra Kubernetes service that refreshes Hadoop
+ tokens for long-running Spark jobs that access secure data sources like
+ Kerberized HDFS.
* `integration-tests`: Integration tests for the project.
* `docker-minimal-bundle`: Base Dockerfiles for the driver and the executors. The Dockerfiles are used for integration
tests as well as being provided in packaged distributions of Spark.
@@ -40,6 +43,19 @@ Below is a list of the submodules for this cluster manager and what they do.
* `integration-tests-spark-jobs-helpers`: Dependencies for the spark jobs used in integration tests. These dependencies
are separated out to facilitate testing the shipping of jars to drivers running on Kubernetes clusters.
+# Building Extra Submodules for Spark with Kubernetes
+
+There are non-core extra submodules such as token-refresh-server. To build
+those, use the `kubernetes-*-extra` profiles like `kubernetes-hdfs-extra`
+when invoking Maven. For example, to build the token-refresh-server submodule:
+
+ build/mvn package -Pkubernetes-hdfs-extra \
+ -pl resource-managers/kubernetes/token-refresh-server -am
+
+Some of these submodules are helper Kubernetes services. They need not be part
+of the Spark distribution. The distribution build script will not include
+artifacts from these submodules.
+
# Running the Kubernetes Integration Tests
Note that the integration test framework is currently being heavily revised and is subject to change.
@@ -64,7 +80,7 @@ build/mvn integration-test \
-pl resource-managers/kubernetes/integration-tests -am
```
-# Running against an arbitrary cluster
+## Running against an arbitrary cluster
In order to run against any cluster, use the following:
```sh
@@ -74,7 +90,7 @@ build/mvn integration-test \
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage="
```
-# Preserve the Minikube VM
+## Preserve the Minikube VM
The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine
and setup a single-node kubernetes cluster within it. By default the vm is destroyed after the tests are finished.
diff --git a/resource-managers/kubernetes/token-refresh-server/README.md b/resource-managers/kubernetes/token-refresh-server/README.md
new file mode 100644
index 0000000000000..37d58109f8c62
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/README.md
@@ -0,0 +1,68 @@
+---
+layout: global
+title: Hadoop Token Refresh Server on Kubernetes
+---
+
+Spark on Kubernetes may use Kerberized Hadoop data sources such as secure HDFS or Kafka. If the job
+runs for days or weeks, someone should extend the lifetime of Hadoop delegation tokens, which
+expire every 24 hours. The Hadoop Token Refresh Server is a Kubernetes microservice that renews
+token lifetime and puts the replacement tokens in place.
+
+# Building the Refresh Server
+
+To build the refresh server jar, simply run Maven. For example:
+
+ mvn clean package
+
+The target directory will have a tarball that includes the project jar file as well as
+3rd party dependency jars. The tarball name would end with `-assembly.tar.gz`. For example:
+
+ target/token-refresh-server-kubernetes_2.11-2.2.0-k8s-0.3.0-SNAPSHOT-assembly.tar.gz
+
+# Running the Refresh Server
+
+To run the server, follow the steps below.
+
+1. Build and push the docker image:
+
+ docker build -t hadoop-token-refresh-server:latest \
+ -f src/main/docker/Dockerfile .
+ docker tag hadoop-token-refresh-server:latest :
+ docker push :
+
+2. Edit the main application config file, src/main/conf/application.conf
+ and create a `configmap`:
+
+ kubectl create configmap hadoop-token-refresh-server-application-conf \
+ --from-file=src/main/conf/application.conf
+
+3. Create another k8s `configmap` containing Hadoop config files. This should enable Kerberos and secure Hadoop.
+ It should also include the Hadoop servers that would issue delegation tokens such as the HDFS namenode
+ address:
+
+ kubectl create configmap hadoop-token-refresh-server-hadoop-config \
+ --from-file=/usr/local/hadoop/conf/core-site.xml
+
+4. Create yet another k8s `configmap` containing Kerberos config files. This should include
+ the kerberos server address and the correct realm name for Kerberos principals:
+
+ kubectl create configmap hadoop-token-refresh-server-kerberos-config \
+ --from-file=/etc/krb5.conf
+
+5. Create a k8s `secret` containing the Kerberos keytab file. The keytab file should include
+ the password for the system user Kerberos principal that the refresh server is using to
+ extend Hadoop delegation tokens. See
+ hadoop-token-refresh-server.kerberosPrincipal in the application.conf.
+
+ kubectl create secret generic hadoop-token-refresh-server-kerberos-keytab \
+ --from-file /mnt/secrets/krb5.keytab
+
+6. Optionally, create a k8s `service account` and `clusterrolebinding` that
+ the service pod will use. The service account should have `edit` capability for
+ job `secret`s that contains the Hadoop delegation tokens.
+
+7. Finally, edit the config file for k8s `deployment` and launch the service pod
+ using the deployment. The config file should include the right docker image tag
+ and the correct k8s `service account` name.
+
+ kubectl create -f src/main/conf/kubernetes-hadoop-token-refresh-server.yaml
diff --git a/resource-managers/kubernetes/token-refresh-server/pom.xml b/resource-managers/kubernetes/token-refresh-server/pom.xml
new file mode 100644
index 0000000000000..2da889ed46419
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.11
+ 2.2.0-k8s-0.5.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ token-refresh-server-kubernetes_2.11
+ jar
+ Hadoop Token Refresh Server on Kubernetes
+
+ 2.5.4
+ 1.2
+ 2.2.13
+
+
+
+ com.typesafe.akka
+ akka-actor_${scala.binary.version}
+ ${akka.actor.version}
+
+
+ io.fabric8
+ kubernetes-client
+ ${kubernetes.client.version}
+
+
+ log4j
+ log4j
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+ provided
+
+
+ commons-logging
+ commons-logging
+ ${commons-logging.version}
+
+
+ com.typesafe.akka
+ akka-testkit_${scala.binary.version}
+ ${akka.actor.version}
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ src/main/assembly/assembly.xml
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/assembly/assembly.xml b/resource-managers/kubernetes/token-refresh-server/src/main/assembly/assembly.xml
new file mode 100644
index 0000000000000..2bbc040bc5fa9
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/assembly/assembly.xml
@@ -0,0 +1,33 @@
+
+
+ assembly
+
+ tar.gz
+
+ false
+
+
+ false
+ compile
+
+
+ false
+ provided
+
+
+
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/conf/application.conf b/resource-managers/kubernetes/token-refresh-server/src/main/conf/application.conf
new file mode 100644
index 0000000000000..76f47ef73334e
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/conf/application.conf
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Main application config file for the Hadoop token refresh server. Override the values below
+# as needed.
+hadoop-token-refresh-server {
+
+ # Kerberos principal that the refresh server should use as its login user. This principal should
+ # match the keytab file used for the refresh server.
+ # For a token to be renewed for the next 24 hours by the refresh server, the token should
+ # designate this refresh server principal as the renewer. To allow a brand new token to be
+ # obtained by the refresh server, the HDFS namenode configuration should specify this refresh
+ # server principal as the special proxy for the job users. See
+ # https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/Superusers.html#Configurations
+ # for details.
+ kerberosPrincipal = "MY-REFRESH-SERVER-KERBEROS-PRINCIPAL"
+
+ # Set this to true if the refresh server should scan secrets across all namespaces. Set it to
+ # false and specify namespaceToScan if the refresh server should scan secrets only from
+ # the specific namespace.
+ scanAllNamespaces = true
+
+ # Effective only if scanAllNamespaces is false. A specific namespace that the refresh server
+ # should scan secrets from.
+ namespaceToScan = "default"
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/conf/kubernetes-hadoop-token-refresh-server.yaml b/resource-managers/kubernetes/token-refresh-server/src/main/conf/kubernetes-hadoop-token-refresh-server.yaml
new file mode 100644
index 0000000000000..ef2ef009e6615
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/conf/kubernetes-hadoop-token-refresh-server.yaml
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+apiVersion: apps/v1beta1
+kind: Deployment
+metadata:
+ name: hadoop-token-refresh-server
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ hadoop-token-refresh-server-instance: default
+ spec:
+ serviceAccountName: YOUR-SERVICE-ACCOUNT
+ volumes:
+ - name: application-conf
+ configMap:
+ name: hadoop-token-refresh-server-application-conf
+ - name: kerberos-config
+ configMap:
+ name: hadoop-token-refresh-server-kerberos-config
+ - name: hadoop-config
+ configMap:
+ name: hadoop-token-refresh-server-hadoop-config
+ - name: kerberos-keytab
+ secret:
+ secretName: hadoop-token-refresh-server-kerberos-keytab
+ containers:
+ - name: hadoop-token-refresh-server
+ image: YOUR-REPO:YOUR-TAG
+ env:
+ - name: APPLICATION_CONF_DIR
+ value: /etc/token-refresh-server/conf
+ - name: HADOOP_CONF_DIR
+ value: /etc/hadoop/conf
+ - name: TOKEN_REFRESH_SERVER_ARGS
+ value: --verbose
+ resources:
+ requests:
+ cpu: 100m
+ memory: 512Mi
+ limits:
+ cpu: 100m
+ memory: 512Mi
+ volumeMounts:
+ - name: application-conf
+ mountPath: '/etc/token-refreh-server/conf
+ readOnly: true
+ - name: hadoop-config
+ mountPath: '/etc/hadoop/conf'
+ readOnly: true
+ - name: kerberos-config
+ mountPath: '/etc/krb5.conf'
+ subPath: krb5.conf
+ readOnly: true
+ - name: kerberos-keytab
+ mountPath: '/mnt/secrets/krb5.keytab'
+ subPath: krb5.keytab
+ readOnly: true
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/docker/Dockerfile b/resource-managers/kubernetes/token-refresh-server/src/main/docker/Dockerfile
new file mode 100644
index 0000000000000..a89acaf3aec1d
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/docker/Dockerfile
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+
+RUN apk upgrade --no-cache && \
+ apk add --no-cache bash tini && \
+ rm /bin/sh && \
+ ln -sv /bin/bash /bin/sh && \
+ chgrp root /etc/passwd && \
+ chmod ug+rw /etc/passwd && \
+ mkdir -p /opt/token-refresh-server && \
+ mkdir -p /opt/token-refresh-server/jars && \
+ mkdir -p /opt/token-refresh-server/work-dir
+
+ADD target/token-refresh-server-kubernetes_2.11-2.2.0-k8s-0.5.0-SNAPSHOT-assembly.tar.gz \
+ /opt/token-refresh-server/jars
+WORKDIR /opt/token-refresh-server/work-dir
+
+# The docker build command should be invoked from the top level directory of
+# the token-refresh-server project. E.g.:
+# docker build -t hadoop-token-refresh-server:latest \
+# -f src/main/docker/Dockerfile .
+
+CMD /sbin/tini -s -- /usr/bin/java \
+ -cp $APPLICATION_CONF_DIR:$HADOOP_CONF_DIR:'/opt/token-refresh-server/jars/*' \
+ org.apache.spark.security.kubernetes.TokenRefreshServer \
+ $TOKEN_REFRESH_SERVER_ARGS
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/Logging.scala b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/Logging.scala
new file mode 100644
index 0000000000000..83feb88fc33e8
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/Logging.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import org.apache.log4j.{Logger, LogManager, Priority}
+
+trait Logging {
+
+ private val log: Logger = LogManager.getLogger(this.getClass)
+
+ protected def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg)
+
+ protected def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg)
+
+ protected def logWarning(msg: => String) = if (log.isEnabledFor(Priority.WARN)) log.warn(msg)
+
+ protected def logWarning(msg: => String, throwable: Throwable) =
+ if (log.isEnabledFor(Priority.WARN)) log.warn(msg, throwable)
+
+ protected def logError(msg: => String) = if (log.isEnabledFor(Priority.ERROR)) log.error(msg)
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/SecretFinder.scala b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/SecretFinder.scala
new file mode 100644
index 0000000000000..de7e134cadc76
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/SecretFinder.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import java.lang
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import akka.actor.{ActorRef, Scheduler}
+import io.fabric8.kubernetes.api.model.{Secret, SecretList}
+import io.fabric8.kubernetes.client._
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable
+
+import org.apache.spark.security.kubernetes.constants._
+
+
+private trait SecretSelection {
+
+ def selectSecrets(kubernetesClient: KubernetesClient, settings: Settings):
+ FilterWatchListDeletable[Secret, SecretList, lang.Boolean, Watch, Watcher[Secret]] = {
+ val selector = kubernetesClient.secrets()
+ val namespacedSelector = if (settings.shouldScanAllNamespaces) {
+ selector.inAnyNamespace()
+ } else {
+ selector.inNamespace(settings.namespaceToScan)
+ }
+ namespacedSelector
+ .withLabel(SECRET_LABEL_KEY_REFRESH_HADOOP_TOKENS, SECRET_LABEL_VALUE_REFRESH_HADOOP_TOKENS)
+ }
+}
+
+private class SecretFinder(refreshService: ActorRef,
+ scheduler: Scheduler,
+ kubernetesClient: KubernetesClient,
+ settings: Settings) extends SecretSelection {
+
+ private val cancellable = scheduler.schedule(
+ Duration(SECRET_SCANNER_INITIAL_DELAY_MILLIS, TimeUnit.MILLISECONDS),
+ Duration(SECRET_SCANNER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS),
+ new SecretScanner(refreshService, kubernetesClient, settings))
+ private val watched = selectSecrets(kubernetesClient, settings)
+ .watch(new SecretWatcher(refreshService))
+
+ def stop(): Unit = {
+ cancellable.cancel()
+ watched.close()
+ }
+}
+
+private class SecretScanner(refreshService: ActorRef,
+ kubernetesClient: KubernetesClient,
+ settings: Settings) extends Runnable with SecretSelection with Logging {
+
+ override def run(): Unit = {
+ val secrets = selectSecrets(kubernetesClient, settings).list.getItems.asScala.toList
+ logInfo(s"Scanned ${secrets.map(_.getMetadata.getSelfLink).mkString(", ")}")
+ refreshService ! UpdateSecretsToTrack(secrets)
+ }
+}
+
+private class SecretWatcher(refreshService: ActorRef) extends Watcher[Secret] with Logging {
+
+ override def eventReceived(action: Action, secret: Secret): Unit = {
+ action match {
+ case Action.ADDED =>
+ logInfo(s"Found ${secret.getMetadata.getSelfLink} added")
+ refreshService ! StartRefresh(secret)
+ case Action.DELETED =>
+ logInfo(s"Found ${secret.getMetadata.getSelfLink} deleted")
+ refreshService ! StopRefresh(secret)
+ case _ => // Do nothing
+ }
+ }
+
+ override def onClose(cause: KubernetesClientException): Unit = {
+ // Do nothing
+ }
+}
+
+private object SecretFinder {
+
+ def apply(refreshService: ActorRef, scheduler: Scheduler, client: KubernetesClient,
+ settings: Settings): SecretFinder =
+ new SecretFinder(refreshService, scheduler, client, settings)
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshServer.scala b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshServer.scala
new file mode 100644
index 0000000000000..02731c90dea7f
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshServer.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import scala.annotation.tailrec
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Scheduler}
+import com.typesafe.config.{Config, ConfigFactory}
+import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient}
+import org.apache.log4j.{Level, Logger}
+
+private class Server(injector: Injector) {
+
+ private val actorSystem = injector.newActorSystem()
+ private val kubernetesClient = injector.newKubernetesClient()
+ private val settings = injector.newSettings()
+ private var secretFinder : Option[SecretFinder] = None
+
+ def start(): Unit = {
+ val refreshService = injector.newTokenRefreshService(actorSystem, kubernetesClient, settings)
+ secretFinder = Some(injector.newSecretFinder(refreshService, kubernetesClient,
+ actorSystem.scheduler, settings))
+ }
+
+ def join() : Unit = {
+ // scalastyle:off awaitready
+ Await.ready(actorSystem.whenTerminated, Duration.Inf)
+ // scalastyle:on awaitready
+ }
+
+ def stop(): Unit = {
+ actorSystem.terminate()
+ secretFinder.foreach(_.stop())
+ kubernetesClient.close()
+ }
+}
+
+private class Injector {
+
+ def newActorSystem(): ActorSystem = ActorSystem("TokenRefreshServer")
+
+ def newKubernetesClient(): KubernetesClient = new DefaultKubernetesClient()
+
+ def newSettings(): Settings = new Settings()
+
+ def newTokenRefreshService(actorSystem: ActorSystem, client: KubernetesClient,
+ settings: Settings): ActorRef =
+ TokenRefreshService(actorSystem, client, settings)
+
+ def newSecretFinder(refreshService: ActorRef, client: KubernetesClient, scheduler: Scheduler,
+ settings: Settings): SecretFinder =
+ SecretFinder(refreshService, scheduler, client, settings)
+}
+
+private class Settings(config: Config = ConfigFactory.load) {
+
+ private val configKeyPrefix = "hadoop-token-refresh-server"
+
+ val refreshServerKerberosPrincipal : String = config.getString(
+ s"$configKeyPrefix.kerberosPrincipal")
+
+ val shouldScanAllNamespaces : Boolean = config.getBoolean(s"$configKeyPrefix.scanAllNamespaces")
+
+ val namespaceToScan : String = config.getString(s"$configKeyPrefix.namespaceToScan")
+}
+
+private class CommandLine(args: List[String]) {
+
+ var logLevel: Level = Level.WARN
+
+ parse(args)
+
+ @tailrec
+ private def parse(args: List[String]): Unit = args match {
+ case ("--verbose" | "-v") :: tail =>
+ logLevel = Level.INFO
+ parse(tail)
+ case ("--debug" | "-d") :: tail =>
+ logLevel = Level.DEBUG
+ parse(tail)
+ case unknown if unknown.nonEmpty =>
+ usage()
+ throw new IllegalArgumentException(s"Got an unknown argument: $unknown")
+ case _ =>
+ }
+
+ private def usage(): Unit = {
+ // scalastyle:off println
+ println("Usage: TokenRefreshServer [--verbose | -v] [--debug | -d]")
+ // scalastyle:on println
+ }
+}
+
+private class Launcher(parsedArgs: CommandLine, server: Server) {
+
+ def launch(): Unit = {
+ Logger.getRootLogger.setLevel(parsedArgs.logLevel)
+ try {
+ server.start()
+ server.join()
+ } finally {
+ server.stop()
+ }
+ }
+}
+
+/*
+ * TODO: Support REST endpoint for checking status of tokens.
+ */
+object TokenRefreshServer {
+
+ def main(args: Array[String]): Unit = {
+ val parsedArgs = new CommandLine(args.toList)
+ val server = new Server(new Injector)
+ val launcher = new Launcher(parsedArgs, server)
+ launcher.launch()
+ }
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshService.scala b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshService.scala
new file mode 100644
index 0000000000000..01d168675a681
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/TokenRefreshService.scala
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Cancellable, Props, Scheduler}
+import io.fabric8.kubernetes.api.model.{ObjectMeta, Secret}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+
+import org.apache.spark.security.kubernetes.constants._
+
+
+private class TokenRefreshService(kubernetesClient: KubernetesClient, scheduler: Scheduler,
+ ugi: UgiUtil,
+ settings: Settings,
+ clock: Clock) extends Actor with Logging {
+
+ private val secretUidToTaskHandle = mutable.HashMap[String, Cancellable]()
+ private val recentlyAddedSecretUids = mutable.HashSet[String]()
+ private val extraCancellableByClass = mutable.HashMap[Class[_], Cancellable]()
+ private val hadoopConf = new Configuration
+
+ ugi.loginUserFromKeytab(settings.refreshServerKerberosPrincipal,
+ REFRESH_SERVER_KERBEROS_KEYTAB_PATH)
+
+ override def preStart(): Unit = {
+ super.preStart()
+ val duration = Duration(REFRESH_SERVER_KERBEROS_RELOGIN_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
+ extraCancellableByClass.put(Relogin.getClass,
+ scheduler.schedule(duration, duration, self, Relogin))
+ }
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case Relogin =>
+ launchReloginTask()
+ case StartRefresh(secret) =>
+ startRefreshTask(secret)
+ case StopRefresh(secret) =>
+ removeRefreshTask(secret)
+ case UpdateSecretsToTrack(secrets) =>
+ updateSecretsToTrack(secrets)
+ case renew: Renew =>
+ scheduleRenewTask(renew)
+ }
+
+ override def postStop(): Unit = {
+ super.postStop()
+ secretUidToTaskHandle.values.map(_.cancel)
+ extraCancellableByClass.values.map(_.cancel)
+ }
+
+ private def launchReloginTask() = {
+ val task = new ReloginTask
+ extraCancellableByClass.remove(task.getClass).foreach(_.cancel) // Cancel in case of hanging
+ extraCancellableByClass.put(task.getClass,
+ scheduler.scheduleOnce(Duration(0L, TimeUnit.MILLISECONDS), task))
+ }
+
+ private def startRefreshTask(secret: Secret) = {
+ recentlyAddedSecretUids.add(getSecretUid(secret.getMetadata))
+ addRefreshTask(secret)
+ }
+
+ private def addRefreshTask(secret: Secret) = {
+ val secretUid = getSecretUid(secret.getMetadata)
+ secretUidToTaskHandle.remove(secretUid).foreach(_.cancel) // Cancel in case of hanging
+ secretUidToTaskHandle.put(secretUid, {
+ val task = new StarterTask(secret, hadoopConf, self, clock)
+ val cancellable = scheduler.scheduleOnce(
+ Duration(STARTER_TASK_INITIAL_DELAY_MILLIS, TimeUnit.MILLISECONDS),
+ task)
+ logInfo(s"Started refresh of tokens in $secretUid of ${secret.getMetadata.getSelfLink}" +
+ s" with $cancellable")
+ cancellable
+ })
+ }
+
+ private def removeRefreshTask(secret: Secret): Unit =
+ removeRefreshTask(getSecretUid(secret.getMetadata))
+
+ private def removeRefreshTask(secretUid: String): Unit = {
+ secretUidToTaskHandle.remove(secretUid).foreach(cancellable => {
+ logInfo(s"Canceling refresh of tokens in $secretUid")
+ cancellable.cancel()
+ })
+ }
+
+ private def updateSecretsToTrack(currentSecrets: List[Secret]): Unit = {
+ val secretsByUids = currentSecrets.map(secret => (getSecretUid(secret.getMetadata), secret))
+ .toMap
+ val currentUids = secretsByUids.keySet
+ val priorUids = secretUidToTaskHandle.keys.toSet
+ val uidsToAdd = currentUids -- priorUids
+ uidsToAdd.foreach(uid => addRefreshTask(secretsByUids(uid)))
+ val uidsToRemove = priorUids -- currentUids -- recentlyAddedSecretUids
+ uidsToRemove.foreach(uid => removeRefreshTask(uid))
+ recentlyAddedSecretUids.clear()
+ }
+
+ private def scheduleRenewTask(renew: Renew) = {
+ val secretUid = getSecretUid(renew.secretMeta)
+ val priorTask = secretUidToTaskHandle.remove(secretUid)
+ if (priorTask.nonEmpty) {
+ priorTask.get.cancel() // Cancel in case of hanging.
+ val numConsecutiveErrors = renew.numConsecutiveErrors
+ if (numConsecutiveErrors < RENEW_TASK_MAX_CONSECUTIVE_ERRORS) {
+ val durationTillExpire = math.max(0L, renew.expireTime - clock.nowInMillis())
+ val renewTime = math.max(0L, renew.expireTime - durationTillExpire / 10) // 90% mark.
+ val durationTillRenew = math.max(0L, renewTime - clock.nowInMillis())
+ val task = new RenewTask(renew, hadoopConf, self, kubernetesClient, clock)
+ logInfo(s"Scheduling refresh of tokens in $secretUid of " +
+ s"${renew.secretMeta.getSelfLink} at now + $durationTillRenew millis.")
+ val cancellable = scheduler.scheduleOnce(
+ Duration(durationTillRenew, TimeUnit.MILLISECONDS), task)
+ secretUidToTaskHandle.put(secretUid, cancellable)
+ } else {
+ logWarning(s"Got too many errors for secret $secretUid of" +
+ s" ${renew.secretMeta.getSelfLink}. Abandoning.")
+ val maybeCancellable = secretUidToTaskHandle.remove(secretUid)
+ maybeCancellable.foreach(_.cancel())
+ }
+ } else {
+ logWarning(s"Could not find a StarterTask entry for a renew work for secret $secretUid of " +
+ s" ${renew.secretMeta.getSelfLink}. Maybe the secret got deleted")
+ }
+ }
+
+ private def getSecretUid(secret: ObjectMeta) = secret.getUid
+
+ // Exposed for testing
+ private[kubernetes] def numExtraCancellables() = extraCancellableByClass.size
+
+ // Exposed for testing
+ private[kubernetes] def hasExtraCancellable(key: Class[_], expected: Cancellable): Boolean = {
+ val value = extraCancellableByClass.get(key)
+ value.nonEmpty && expected == value.get
+ }
+
+ // Exposed for testing
+ private[kubernetes] def numPendingSecretTasks() = secretUidToTaskHandle.size
+
+ // Exposed for testing
+ private[kubernetes] def hasSecretTaskCancellable(secretUid: String, expected: Cancellable)
+ : Boolean = {
+ val value = secretUidToTaskHandle.get(secretUid)
+ value.nonEmpty && expected == value.get
+ }
+}
+
+private class UgiUtil {
+
+ def loginUserFromKeytab(kerberosPrincipal: String, kerberosKeytabPath: String): Unit =
+ UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabPath)
+
+ def getLoginUser: UserGroupInformation = UserGroupInformation.getLoginUser
+
+ def createProxyUser(user: String, realUser: UserGroupInformation): UserGroupInformation =
+ UserGroupInformation.createProxyUser(user, realUser)
+}
+
+private class FileSystemUtil {
+
+ def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
+
+ def renewToken(token: Token[_ <: TokenIdentifier], hadoopConf: Configuration): Long =
+ token.renew(hadoopConf)
+}
+
+private class ReloginTask(ugi: UgiUtil) extends Runnable {
+
+ def this() = this(new UgiUtil)
+
+ override def run(): Unit = {
+ ugi.getLoginUser.checkTGTAndReloginFromKeytab()
+ }
+}
+
+private class StarterTask(secret: Secret,
+ hadoopConf: Configuration,
+ refreshService: ActorRef,
+ clock: Clock) extends Runnable with Logging {
+
+ private var hasError = false
+
+ override def run(): Unit = {
+ val tokensToExpireTimes = readTokensFromSecret()
+ val tokenKeys = tokensToExpireTimes.keys.map(
+ token => token.getKind.toString + "@" + token.getService.toString).mkString(", ")
+ val nextExpireTime = if (tokensToExpireTimes.nonEmpty) {
+ val minExpireTime = tokensToExpireTimes.values.min
+ logInfo(s"Read Hadoop tokens: $tokenKeys with $minExpireTime")
+ minExpireTime
+ } else {
+ logWarning(s"Got an empty token list with secret ${secret.getMetadata.getUid} of" +
+ s" ${secret.getMetadata.getSelfLink}")
+ hasError = true
+ getRetryTime
+ }
+ val numConsecutiveErrors = if (hasError) 1 else 0
+ refreshService ! Renew(nextExpireTime, tokensToExpireTimes, secret.getMetadata,
+ numConsecutiveErrors)
+ }
+
+ private def readTokensFromSecret(): Map[Token[_ <: TokenIdentifier], Long] = {
+ val dataItems = secret.getData.asScala.filterKeys(
+ _.startsWith(SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS)).toSeq.sorted
+ val latestDataItem = if (dataItems.nonEmpty) Some(dataItems.max) else None
+ latestDataItem.map {
+ case (key, data) =>
+ val matcher = TokenRefreshService.hadoopTokenPattern.matcher(key)
+ val matches = matcher.matches()
+ logInfo(s"Matching secret data $key, result $matches")
+ val createTime = matcher.group(1).toLong
+ val duration = matcher.group(2).toLong
+ val expireTime = createTime + duration
+ val credentials = new Credentials
+ TokensSerializer.deserialize(credentials, Base64.decodeBase64(data))
+ credentials.getAllTokens.asScala.toList.map {
+ (_, expireTime)
+ }
+ }.toList.flatten.toMap
+ }
+
+ private def getRetryTime = clock.nowInMillis() + RENEW_TASK_RETRY_TIME_MILLIS
+}
+
+private class RenewTask(renew: Renew,
+ hadoopConf: Configuration,
+ refreshService: ActorRef,
+ kubernetesClient: KubernetesClient,
+ clock: Clock,
+ ugi: UgiUtil,
+ fsUtil: FileSystemUtil) extends Runnable with Logging {
+
+ def this(renew: Renew, hadoopConf: Configuration, refreshService: ActorRef,
+ client: KubernetesClient, clock: Clock) =
+ this(renew, hadoopConf, refreshService, client, clock, ugi = new UgiUtil,
+ fsUtil = new FileSystemUtil)
+
+ private var hasError = false
+
+ override def run(): Unit = {
+ val deadline = renew.expireTime + RENEW_TASK_DEADLINE_LOOK_AHEAD_MILLIS
+ val nowMillis = clock.nowInMillis()
+ val newExpireTimeByToken : Map[Token[_ <: TokenIdentifier], Long] =
+ renew.tokensToExpireTimes.map {
+ case (token, expireTime) =>
+ val (maybeNewToken, maybeNewExpireTime) = refresh(token, expireTime, deadline, nowMillis)
+ (maybeNewToken, maybeNewExpireTime)
+ }
+ .toMap
+ if (newExpireTimeByToken.nonEmpty) {
+ val newTokens = newExpireTimeByToken.keySet -- renew.tokensToExpireTimes.keySet
+ if (newTokens.nonEmpty) {
+ writeTokensToSecret(newExpireTimeByToken, nowMillis)
+ }
+ val nextExpireTime = newExpireTimeByToken.values.min
+ logInfo(s"Renewed tokens $newExpireTimeByToken. Next expire time $nextExpireTime")
+ val numConsecutiveErrors = if (hasError) renew.numConsecutiveErrors + 1 else 0
+ refreshService ! Renew(nextExpireTime, newExpireTimeByToken, renew.secretMeta,
+ numConsecutiveErrors)
+ } else {
+ logWarning(s"Got an empty token list with secret ${renew.secretMeta.getUid} of" +
+ s" ${renew.secretMeta.getSelfLink}")
+ }
+ }
+
+ private def refresh(token: Token[_ <: TokenIdentifier], expireTime: Long, deadline: Long,
+ nowMillis: Long) = {
+ val maybeNewToken = maybeObtainNewToken(token, expireTime, nowMillis)
+ val maybeNewExpireTime = maybeGetNewExpireTime(maybeNewToken, expireTime, deadline, nowMillis)
+ (maybeNewToken, maybeNewExpireTime)
+ }
+
+ private def maybeObtainNewToken(token: Token[_ <: TokenIdentifier], expireTime: Long,
+ nowMills: Long) = {
+ val maybeNewToken = if (token.getKind.equals(DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
+ // The token can casted to AbstractDelegationTokenIdentifier below only if the token kind
+ // is HDFS_DELEGATION_KIND, according to the YARN resource manager code. See if this can be
+ // generalized beyond HDFS tokens.
+ val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
+ val maxDate = identifier.getMaxDate
+ if (maxDate - expireTime < RENEW_TASK_REMAINING_TIME_BEFORE_NEW_TOKEN_MILLIS ||
+ maxDate <= nowMills) {
+ logDebug(s"Obtaining a new token with maxData $maxDate," +
+ s" expireTime $expireTime, now $nowMills")
+ val newToken = obtainNewToken(token, identifier)
+ logInfo(s"Obtained token $newToken")
+ newToken
+ } else {
+ token
+ }
+ } else {
+ token
+ }
+ maybeNewToken
+ }
+
+ private def maybeGetNewExpireTime(token: Token[_ <: TokenIdentifier], expireTime: Long,
+ deadline: Long,
+ nowMillis: Long) = {
+ if (expireTime <= deadline || expireTime <= nowMillis) {
+ try {
+ logDebug(s"Renewing token $token with current expire time $expireTime," +
+ s" deadline $deadline, now $nowMillis")
+ val newExpireTime = fsUtil.renewToken(token, hadoopConf)
+ logDebug(s"Renewed token $token. Next expire time $newExpireTime")
+ newExpireTime
+ } catch {
+ case t: Throwable =>
+ logWarning(t.getMessage, t)
+ hasError = true
+
+ getRetryTime
+ }
+ } else {
+ expireTime
+ }
+ }
+
+ private def obtainNewToken(token: Token[_ <: TokenIdentifier],
+ identifier: AbstractDelegationTokenIdentifier) = {
+ val owner = identifier.getOwner
+ val realUser = identifier.getRealUser
+ val user = if (realUser == null || realUser.toString.isEmpty || realUser.equals(owner)) {
+ owner.toString
+ } else {
+ realUser.toString
+ }
+ val credentials = new Credentials
+ val proxyUgi = ugi.createProxyUser(user, ugi.getLoginUser)
+ val newToken = proxyUgi.doAs(new PrivilegedExceptionAction[Token[_ <: TokenIdentifier]] {
+
+ override def run() : Token[_ <: TokenIdentifier] = {
+ val fs = fsUtil.getFileSystem(hadoopConf)
+ val tokens = fs.addDelegationTokens(ugi.getLoginUser.getUserName,
+ credentials)
+ tokens(0)
+ }
+ })
+ newToken
+ }
+
+ private def writeTokensToSecret(tokensToExpireTimes: Map[Token[_ <: TokenIdentifier], Long],
+ nowMillis: Long): Unit = {
+ val durationUntilExpire = tokensToExpireTimes.values.min - nowMillis
+ val key = s"$SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS$nowMillis-$durationUntilExpire"
+ val value = TokensSerializer.serializeBase64(tokensToExpireTimes.keys)
+ val secretMeta = renew.secretMeta
+ val editor = kubernetesClient.secrets
+ .inNamespace(secretMeta.getNamespace)
+ .withName(secretMeta.getName)
+ .edit()
+ editor.addToData(key, value)
+ val dataItemKeys = editor.getData.keySet().asScala.filter(
+ _.startsWith(SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS)).toSeq.sorted
+ // Remove data items except the latest two data items. A K8s secret can hold only up to 1 MB
+ // data. We need to remove old data items. We keep the latest two items to avoid race conditions
+ // where some newly launching executors may access the previous token.
+ dataItemKeys.dropRight(2).foreach(editor.removeFromData)
+ editor.done
+ logInfo(s"Wrote new tokens $tokensToExpireTimes to a data item $key in" +
+ s" secret ${secretMeta.getUid} of ${secretMeta.getSelfLink}")
+ }
+
+ private def getRetryTime = clock.nowInMillis() + RENEW_TASK_RETRY_TIME_MILLIS
+}
+
+private object TokensSerializer {
+
+ def serializeBase64(tokens: Iterable[Token[_ <: TokenIdentifier]]): String = {
+ val credentials = new Credentials()
+ tokens.foreach(token => credentials.addToken(token.getService, token))
+ val serialized = serializeCredentials(credentials)
+ Base64.encodeBase64String(serialized)
+ }
+
+ private def serializeCredentials(credentials: Credentials): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream
+ val dataStream = new DataOutputStream(byteStream)
+ credentials.writeTokenStorageToStream(dataStream)
+ dataStream.flush()
+ byteStream.toByteArray
+ }
+
+ def deserialize(credentials: Credentials, data: Array[Byte]): Unit = {
+ val byteStream = new ByteArrayInputStream(data)
+ val dataStream = new DataInputStream(byteStream)
+ credentials.readTokenStorageStream(dataStream)
+ }
+}
+
+private class Clock {
+
+ def nowInMillis() : Long = System.currentTimeMillis()
+}
+
+private sealed trait Command
+private case object Relogin extends Command
+private case class UpdateSecretsToTrack(secrets: List[Secret]) extends Command
+private case class StartRefresh(secret: Secret) extends Command
+private case class Renew(expireTime: Long,
+ tokensToExpireTimes: Map[Token[_ <: TokenIdentifier], Long],
+ secretMeta: ObjectMeta,
+ numConsecutiveErrors: Int) extends Command
+private case class StopRefresh(secret: Secret) extends Command
+
+private object TokenRefreshService {
+
+ val hadoopTokenPattern : Pattern = Pattern.compile(SECRET_DATA_ITEM_KEY_REGEX_HADOOP_TOKENS)
+
+ def apply(system: ActorSystem, kubernetesClient: KubernetesClient,
+ settings: Settings) : ActorRef = {
+ system.actorOf(Props(classOf[TokenRefreshService], kubernetesClient, system.scheduler,
+ new UgiUtil, settings, new Clock))
+ }
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/constants.scala b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/constants.scala
new file mode 100644
index 0000000000000..5a0ba70af192a
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/main/scala/org/apache/spark/security/kubernetes/constants.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+package object constants {
+
+ val REFRESH_SERVER_KERBEROS_KEYTAB_PATH = "/mnt/secrets/krb5.keytab"
+ val REFRESH_SERVER_KERBEROS_RELOGIN_INTERVAL_MILLIS = 60 * 60 * 1000L
+
+ val SECRET_LABEL_KEY_REFRESH_HADOOP_TOKENS = "refresh-hadoop-tokens"
+ val SECRET_LABEL_VALUE_REFRESH_HADOOP_TOKENS = "yes"
+ val SECRET_DATA_ITEM_KEY_PREFIX_HADOOP_TOKENS = "hadoop-tokens-"
+ val SECRET_DATA_ITEM_KEY_REGEX_HADOOP_TOKENS = "hadoop-tokens-(\\d+)-(\\d+)"
+
+ val STARTER_TASK_INITIAL_DELAY_MILLIS = 0L
+
+ val RENEW_TASK_SCHEDULE_AHEAD_MILLIS = 10000L
+ val RENEW_TASK_RETRY_TIME_MILLIS = 10000L
+ val RENEW_TASK_MAX_CONSECUTIVE_ERRORS = 3
+ val RENEW_TASK_DEADLINE_LOOK_AHEAD_MILLIS = 10000L
+ val RENEW_TASK_REMAINING_TIME_BEFORE_NEW_TOKEN_MILLIS = 3 * 60 * 60 * 1000L
+
+ val SECRET_SCANNER_INITIAL_DELAY_MILLIS = 10 * 1000L
+ val SECRET_SCANNER_INTERVAL_MILLIS = 60 * 60 * 1000L
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/test/resources/log4j.properties b/resource-managers/kubernetes/token-refresh-server/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000..ad95fadb7c0c0
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN
diff --git a/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/RenewTaskSuite.scala b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/RenewTaskSuite.scala
new file mode 100644
index 0000000000000..88cbfe8dba1e5
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/RenewTaskSuite.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConverters._
+import akka.actor.ActorSystem
+import akka.testkit.{TestKit, TestProbe}
+import io.fabric8.kubernetes.api.model.{DoneableSecret, Secret, SecretBuilder, SecretList}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, NonNamespaceOperation, Resource}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.mockito._
+import org.mockito.Matchers.{any, anyString}
+import org.mockito.Mockito.{doReturn, verify, when}
+import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+
+class RenewTaskSuite extends TestKit(ActorSystem("test")) with FunSuiteLike with BeforeAndAfter {
+
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var ugi: UgiUtil = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var fsUtil: FileSystemUtil = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var hdfs: DistributedFileSystem = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var loginUser: UserGroupInformation = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var proxyUser: UserGroupInformation = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var hadoopConf: Configuration = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var kubernetesClient: KubernetesClient = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var clock: Clock = _
+ private val tokenRefreshServiceProbe = TestProbe()
+ private val tokenRefreshService = tokenRefreshServiceProbe.ref
+ private val createTime1 = 1500100000000L // 2017/07/14-23:26:40
+ private val duration1 = 86400000L // one day in millis.
+ private val token1ExpireTime = createTime1 + duration1
+ private val maxDate1 = createTime1 + 7 * 86400000L // 7 days
+ private val token1 = buildHdfsToken(owner="john", renewer="refresh-server", realUser="john",
+ password="token1-password", service="196.0.0.1:8020", createTime1, maxDate1)
+ private val createTime2 = 1500200000000L // 2017/07/16-03:13:20
+ private val duration2 = 86400000L // one day in millis.
+ private val token2ExpireTime = createTime2 + duration2
+ // maxDate2 below is just over the expire time. RenewTask will get a brand new token.
+ private val maxDate2 = token2ExpireTime + 60 * 60 * 1000 // One hour after the expire time.
+ private val token2 = buildHdfsToken(owner="john", renewer="refresh-server", realUser="john",
+ password="token2-password", service="196.0.0.1:8020", createTime2, maxDate2)
+ private val secret1 = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0101")
+ .withNamespace("namespace1")
+ .withName("secret1")
+ .endMetadata()
+ .withData(Map(
+ s"hadoop-tokens-$createTime1-$duration1" ->
+ TokensSerializer.serializeBase64(List(token1))
+ ).asJava)
+ .build()
+ private val secret2 = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0202")
+ .withNamespace("namespace2")
+ .withName("secret2")
+ .endMetadata()
+ .withData(Map(
+ s"hadoop-tokens-$createTime1-$duration1" ->
+ TokensSerializer.serializeBase64(List(token1)),
+ s"hadoop-tokens-$createTime2-$duration2" ->
+ TokensSerializer.serializeBase64(List(token2))
+ ).asJava)
+ .build()
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretsOp: MixedOperation[Secret, SecretList, DoneableSecret,
+ Resource[Secret, DoneableSecret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretsOpInNamespace: NonNamespaceOperation[Secret, SecretList, DoneableSecret,
+ Resource[Secret, DoneableSecret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretOpWithName: Resource[Secret, DoneableSecret] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretEditor: DoneableSecret = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(ugi.getLoginUser).thenReturn(loginUser)
+ when(loginUser.getUserName).thenReturn("refresh-server")
+ when(ugi.createProxyUser(anyString, Matchers.eq(loginUser))).thenReturn(proxyUser)
+ }
+
+ test("ReloginTask logins on Kerberos again") {
+ val task = new ReloginTask(ugi)
+ task.run()
+
+ verify(loginUser).checkTGTAndReloginFromKeytab()
+ }
+
+ test("StarterTask reads a secret and schedules a renew task") {
+ val task = new StarterTask(secret1, hadoopConf, tokenRefreshService, clock)
+ task.run()
+
+ val renewCommand = Renew(expireTime = token1ExpireTime,
+ Map(token1 -> token1ExpireTime),
+ secret1.getMetadata,
+ numConsecutiveErrors = 0)
+ tokenRefreshServiceProbe.expectMsg(renewCommand)
+ }
+
+ test("RenewTask just renews an existing token if maxDate is far way") {
+ // maxDate1 of token1 is far away. So the RenewTask will only renew the existing token.
+ val nowMillis = token1ExpireTime - 60 * 60 * 1000 // One hour before token2 expire time.
+ when(clock.nowInMillis()).thenReturn(nowMillis)
+ val newExpireTime = nowMillis + duration1
+ when(fsUtil.renewToken(token1, hadoopConf)).thenReturn(newExpireTime)
+ val renewCommand = Renew(expireTime = token1ExpireTime,
+ Map(token1 -> token1ExpireTime),
+ secret1.getMetadata,
+ numConsecutiveErrors = 0)
+ val task = new RenewTask(renewCommand, hadoopConf, tokenRefreshService, kubernetesClient, clock,
+ ugi, fsUtil)
+ task.run()
+
+ val newRenewCommand = Renew(expireTime = newExpireTime,
+ Map(token1 -> newExpireTime),
+ secret1.getMetadata,
+ numConsecutiveErrors = 0)
+ tokenRefreshServiceProbe.expectMsg(newRenewCommand) // Sent a new Renew command to the service.
+ }
+
+ test("RenewTask obtains a new token and write it back to the secret") {
+ // maxDate2 of token2 is just over the expire time. So the RenewTask will get a brand new token.
+ val nowMillis = token2ExpireTime - 60 * 60 * 1000 // One hour before token2 expire time.
+ when(clock.nowInMillis()).thenReturn(nowMillis)
+ val duration3 = 86400000L // one day in millis.
+ val maxDate3 = nowMillis + 7 * 86400000L // 7 days
+ val token3ExpireTime = nowMillis + duration3
+ val token3 = buildHdfsToken(owner="john", renewer="refresh-server", realUser="john",
+ password="token3-password", service="196.0.0.1:8020", nowMillis, maxDate3)
+ doReturn(token3).when(proxyUser)
+ .doAs(any(classOf[PrivilegedExceptionAction[Token[_ <: TokenIdentifier]]]))
+ when(fsUtil.renewToken(token3, hadoopConf)).thenReturn(token3ExpireTime)
+ when(kubernetesClient.secrets()).thenReturn(secretsOp)
+ when(secretsOp.inNamespace("namespace2")).thenReturn(secretsOpInNamespace)
+ when(secretsOpInNamespace.withName("secret2")).thenReturn(secretOpWithName)
+ when(secretOpWithName.edit()).thenReturn(secretEditor)
+ when(secretEditor.getData).thenReturn(Map(
+ s"hadoop-tokens-$createTime1-$duration1" -> TokensSerializer.serializeBase64(List(token1)),
+ s"hadoop-tokens-$createTime2-$duration2" -> TokensSerializer.serializeBase64(List(token2)),
+ s"hadoop-tokens-$nowMillis-$duration3" -> TokensSerializer.serializeBase64(List(token3))
+ ).asJava)
+ when(secretEditor.removeFromData(anyString())).thenReturn(secretEditor)
+ val renewCommand = Renew(expireTime = token2ExpireTime,
+ Map(token2 -> token2ExpireTime),
+ secret2.getMetadata,
+ numConsecutiveErrors = 0)
+ val task = new RenewTask(renewCommand, hadoopConf, tokenRefreshService, kubernetesClient, clock,
+ ugi, fsUtil)
+ task.run()
+
+ verify(secretEditor)
+ .addToData(s"hadoop-tokens-$nowMillis-$duration3", // Added the new token to the secret.
+ TokensSerializer.serializeBase64(List(token3)))
+ verify(secretEditor)
+ .removeFromData(s"hadoop-tokens-$createTime1-$duration1") // Removed the oldest token.
+ val newRenewCommand = Renew(expireTime = token3ExpireTime,
+ Map(token3 -> token3ExpireTime),
+ secret2.getMetadata,
+ numConsecutiveErrors = 0)
+ tokenRefreshServiceProbe.expectMsg(newRenewCommand) // Sent a new Renew command to the service.
+
+ val actionCaptor: ArgumentCaptor[PrivilegedExceptionAction[Token[_ <: TokenIdentifier]]] =
+ ArgumentCaptor.forClass(classOf[PrivilegedExceptionAction[Token[_ <: TokenIdentifier]]])
+ verify(proxyUser).doAs(actionCaptor.capture())
+ val action = actionCaptor.getValue
+ when(fsUtil.getFileSystem(hadoopConf))
+ .thenReturn(hdfs)
+ doReturn(Array(token3)).when(hdfs)
+ .addDelegationTokens(Matchers.eq("refresh-server"), any(classOf[Credentials]))
+ assert(action.run() == token3)
+ }
+
+ private def buildHdfsToken(owner: String, renewer: String, realUser: String, password: String,
+ service: String,
+ issueDate: Long,
+ maxDate:Long): Token[_ <: TokenIdentifier] = {
+ val hdfsTokenIdentifier = new DelegationTokenIdentifier(new Text(owner), new Text(renewer),
+ new Text(realUser))
+ hdfsTokenIdentifier.setIssueDate(issueDate)
+ hdfsTokenIdentifier.setMaxDate(maxDate)
+ new Token[DelegationTokenIdentifier](hdfsTokenIdentifier.getBytes, password.getBytes,
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND, new Text(service))
+ }
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/SecretFinderSuite.scala b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/SecretFinderSuite.scala
new file mode 100644
index 0000000000000..dcbb4ade4e6d8
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/SecretFinderSuite.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
+import akka.actor.{ActorSystem, Cancellable, Scheduler}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.ConfigFactory
+import io.fabric8.kubernetes.api.model.{DoneableSecret, Secret, SecretList}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, FilterWatchListMultiDeletable, MixedOperation, Resource}
+import org.mockito._
+import org.mockito.Matchers.{any, anyString}
+import org.mockito.Mockito.{doReturn, never, verify, when}
+import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+
+import org.apache.spark.security.kubernetes.constants._
+
+
+class SecretFinderSuite extends TestKit(ActorSystem("test")) with FunSuiteLike with BeforeAndAfter {
+
+ private val configKeyPrefix = "hadoop-token-refresh-server"
+ private val configMap1 = Map(s"$configKeyPrefix.kerberosPrincipal" -> "my-principal",
+ s"$configKeyPrefix.scanAllNamespaces" -> true,
+ s"$configKeyPrefix.namespaceToScan" -> "my-namespace")
+ private val configMap2 = configMap1.updated(s"$configKeyPrefix.scanAllNamespaces", false)
+ private val tokenRefreshServiceProbe = TestProbe()
+ private val tokenRefreshService = tokenRefreshServiceProbe.ref
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var kubernetesClient: KubernetesClient = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secrets: MixedOperation[Secret, SecretList, DoneableSecret,
+ Resource[Secret, DoneableSecret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretsInAnynamespace: FilterWatchListMultiDeletable[Secret, SecretList, Boolean,
+ Watch, Watcher[Secret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretsInSpecifiedNamespace: FilterWatchListMultiDeletable[Secret, SecretList,
+ Boolean, Watch, Watcher[Secret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretsWithLabel: FilterWatchListDeletable[Secret, SecretList, Boolean,
+ Watch, Watcher[Secret]] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretList: SecretList = _
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private var secret1: Secret = _
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private var secret2: Secret = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var watch: Watch = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var scheduler: Scheduler = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var cancellable: Cancellable = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(scheduler.schedule(any(classOf[FiniteDuration]), any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(any(classOf[ExecutionContext])))
+ .thenReturn(cancellable)
+ when(kubernetesClient.secrets).thenReturn(secrets)
+ doReturn(secretsInAnynamespace).when(secrets).inAnyNamespace()
+ doReturn(secretsInSpecifiedNamespace).when(secrets).inNamespace("my-namespace")
+ doReturn(secretsWithLabel).when(secretsInAnynamespace).withLabel(
+ SECRET_LABEL_KEY_REFRESH_HADOOP_TOKENS, SECRET_LABEL_VALUE_REFRESH_HADOOP_TOKENS)
+ doReturn(secretsWithLabel).when(secretsInSpecifiedNamespace).withLabel(
+ SECRET_LABEL_KEY_REFRESH_HADOOP_TOKENS, SECRET_LABEL_VALUE_REFRESH_HADOOP_TOKENS)
+ when(secretsWithLabel.watch(any(classOf[Watcher[Secret]]))).thenReturn(watch)
+ }
+
+ test("Secret finder sets up the scanner and watcher, by default for all namespaces") {
+ val config = ConfigFactory.parseMap(configMap1.asJava)
+ val settings = new Settings(config)
+ val secretFinder = SecretFinder(tokenRefreshService, scheduler, kubernetesClient, settings)
+
+ verifyScannerScheduled()
+ verifyWatcherLaunched()
+ verify(secrets).inAnyNamespace()
+ verify(secrets, never).inNamespace(anyString)
+ }
+
+ test("Secret finder sets up the scanner and watcher for a specified namespace") {
+ val config = ConfigFactory.parseMap(configMap2.asJava)
+ val settings = new Settings(config)
+ val secretFinder = SecretFinder(tokenRefreshService, scheduler, kubernetesClient, settings)
+
+ verifyScannerScheduled()
+ verifyWatcherLaunched()
+ verify(secrets).inNamespace("my-namespace")
+ verify(secrets, never).inAnyNamespace()
+ }
+
+ test("Stopping the secret finder cancels the scanner and watcher") {
+ val config = ConfigFactory.parseMap(configMap1.asJava)
+ val settings = new Settings(config)
+ val secretFinder = SecretFinder(tokenRefreshService, scheduler, kubernetesClient, settings)
+ val scanner = captureScanner()
+ secretFinder.stop()
+
+ verify(cancellable).cancel()
+ verify(watch).close()
+ }
+
+ test("Scanner sends the refresh service secrets to track ") {
+ when(secretsWithLabel.list()).thenReturn(secretList)
+ val secrets = List(secret1, secret2)
+ when(secretList.getItems).thenReturn(secrets.asJava)
+ val config = ConfigFactory.parseMap(configMap1.asJava)
+ val settings = new Settings(config)
+ val scanner = new SecretScanner(tokenRefreshService, kubernetesClient, settings)
+ scanner.run()
+
+ tokenRefreshServiceProbe.expectMsg(UpdateSecretsToTrack(secrets))
+ }
+
+ test("Watcher sends the refresh service new or deleted secret") {
+ val config = ConfigFactory.parseMap(configMap1.asJava)
+ val settings = new Settings(config)
+ val watcher = new SecretWatcher(tokenRefreshService)
+
+ watcher.eventReceived(Action.ADDED, secret1)
+ tokenRefreshServiceProbe.expectMsg(StartRefresh(secret1))
+
+ watcher.eventReceived(Action.DELETED, secret2)
+ tokenRefreshServiceProbe.expectMsg(StopRefresh(secret2))
+
+ watcher.eventReceived(Action.MODIFIED, secret1) // Ignored.
+ watcher.eventReceived(Action.ERROR, secret1) // Ignored.
+ }
+
+ private def verifyScannerScheduled() = {
+ val scanner = captureScanner()
+ assert(scanner.getClass == classOf[SecretScanner])
+ }
+
+ private def captureScanner() = {
+ val scannerCaptor: ArgumentCaptor[Runnable] = ArgumentCaptor.forClass(classOf[Runnable])
+ verify(scheduler).schedule(
+ Matchers.eq(Duration(SECRET_SCANNER_INITIAL_DELAY_MILLIS, TimeUnit.MILLISECONDS)),
+ Matchers.eq(Duration(SECRET_SCANNER_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)),
+ scannerCaptor.capture())(
+ any(classOf[ExecutionContext])
+ )
+ scannerCaptor.getValue
+ }
+
+ private def verifyWatcherLaunched() = {
+ val watcher = captureWatcher()
+ assert(watcher.getClass == classOf[SecretWatcher])
+ }
+
+ private def captureWatcher() = {
+ val watcherCaptor: ArgumentCaptor[Watcher[Secret]] = ArgumentCaptor.forClass(
+ classOf[Watcher[Secret]])
+ verify(secretsWithLabel).watch(watcherCaptor.capture())
+ watcherCaptor.getValue
+ }
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServerSuite.scala b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServerSuite.scala
new file mode 100644
index 0000000000000..7807bab7d8bd2
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServerSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import scala.concurrent.Future
+import scala.collection.JavaConverters._
+import akka.actor.{ActorRef, ActorSystem, Scheduler}
+import com.typesafe.config.ConfigFactory
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.log4j.Level
+import org.mockito.{Answers, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{doReturn, never, verify, when}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+
+class TokenRefreshServerSuite extends FunSuite with BeforeAndAfter {
+
+ private val configKeyPrefix = "hadoop-token-refresh-server"
+ private val config = ConfigFactory.parseMap(
+ Map(s"$configKeyPrefix.kerberosPrincipal" -> "my-principal",
+ s"$configKeyPrefix.scanAllNamespaces" -> true,
+ s"$configKeyPrefix.namespaceToScan" -> "my-namespace").asJava)
+ private val settings = new Settings(config)
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var injector: Injector = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var actorSystem: ActorSystem = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var scheduler: Scheduler = _
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private var actorSystemAwaitFuture: Future[Unit] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var kubernetesClient: KubernetesClient = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var tokenRefreshServiceActorRef: ActorRef = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var secretFinder: SecretFinder = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var mockServer: Server = _
+ private var server : Server = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(injector.newActorSystem()).thenReturn(actorSystem)
+ when(actorSystem.scheduler).thenReturn(scheduler)
+ when(injector.newKubernetesClient()).thenReturn(kubernetesClient)
+ when(injector.newSettings()).thenReturn(settings)
+ when(injector.newTokenRefreshService(actorSystem, kubernetesClient, settings))
+ .thenReturn(tokenRefreshServiceActorRef)
+ when(injector.newSecretFinder(tokenRefreshServiceActorRef, kubernetesClient, scheduler,
+ settings))
+ .thenReturn(secretFinder)
+ doReturn(actorSystemAwaitFuture).when(actorSystem).whenTerminated
+ server = new Server(injector)
+ }
+
+ test("The token refresh server starts the refresh service actor and secret finder") {
+ server.start()
+ verify(injector).newTokenRefreshService(actorSystem, kubernetesClient, settings)
+ verify(injector).newSecretFinder(tokenRefreshServiceActorRef, kubernetesClient, scheduler,
+ settings)
+ verify(actorSystem, never()).whenTerminated
+ }
+
+ test("The token refresh server waits until the refresh service finishes") {
+ server.start()
+ server.join()
+ verify(actorSystem).whenTerminated
+ }
+
+ test("The token refresh server stops the refresh service and secret finder") {
+ server.start()
+ server.stop()
+ verify(actorSystem).terminate()
+ verify(kubernetesClient).close()
+ verify(secretFinder).stop()
+ }
+
+ test("The command line parses properly") {
+ var parsedArgs = new CommandLine(List())
+ assert(parsedArgs.logLevel == Level.WARN)
+
+ parsedArgs = new CommandLine(List("-v"))
+ assert(parsedArgs.logLevel == Level.INFO)
+ parsedArgs = new CommandLine(List("--verbose"))
+ assert(parsedArgs.logLevel == Level.INFO)
+
+ parsedArgs = new CommandLine(List("-d"))
+ assert(parsedArgs.logLevel == Level.DEBUG)
+ parsedArgs = new CommandLine(List("--debug"))
+ assert(parsedArgs.logLevel == Level.DEBUG)
+ }
+
+ test("Unknown command line arguments throws") {
+ intercept[IllegalArgumentException] {
+ new CommandLine(List(""))
+ }
+ intercept[IllegalArgumentException] {
+ new CommandLine(List("-f"))
+ }
+ intercept[IllegalArgumentException] {
+ new CommandLine(List("--unknown"))
+ }
+ }
+
+ test("The server launches properly") {
+ val launcher = new Launcher(new CommandLine(List()), mockServer)
+ launcher.launch()
+
+ verify(mockServer).start()
+ verify(mockServer).join()
+ verify(mockServer).stop()
+ }
+
+ test("The server stops properly upon error") {
+ when(mockServer.stop()).thenThrow(new RuntimeException)
+ val launcher = new Launcher(new CommandLine(List()), mockServer)
+ intercept[RuntimeException] {
+ launcher.launch()
+ }
+
+ verify(mockServer).start()
+ verify(mockServer).join()
+ verify(mockServer).stop()
+ }
+}
diff --git a/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServiceSuite.scala b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServiceSuite.scala
new file mode 100644
index 0000000000000..f8c09218e93f9
--- /dev/null
+++ b/resource-managers/kubernetes/token-refresh-server/src/test/scala/org/apache/spark/security/kubernetes/TokenRefreshServiceSuite.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security.kubernetes
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Props, Scheduler}
+import akka.testkit.{TestActorRef, TestKit}
+import com.typesafe.config.ConfigFactory
+import io.fabric8.kubernetes.api.model.SecretBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.mockito._
+import org.mockito.Matchers.{any, same}
+import org.mockito.Mockito.{verify, verifyNoMoreInteractions, when}
+import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+
+import org.apache.spark.security.kubernetes.constants._
+
+
+class TokenRefreshServiceSuite extends TestKit(ActorSystem("test"))
+ with FunSuiteLike with BeforeAndAfter {
+
+ private val configKeyPrefix = "hadoop-token-refresh-server"
+ private val configMap = Map(s"$configKeyPrefix.kerberosPrincipal" -> "my-principal",
+ s"$configKeyPrefix.scanAllNamespaces" -> true,
+ s"$configKeyPrefix.namespaceToScan" -> "my-namespace")
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var kubernetesClient: KubernetesClient = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var scheduler: Scheduler = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var ugi: UgiUtil = _
+ private val config = ConfigFactory.parseMap(configMap.asJava)
+ private val settings = new Settings(config)
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var clock: Clock = _
+ private val nowMillis = 1500000000L
+ private var actorRef: TestActorRef[TokenRefreshService] = _
+ private val reloginInterval = Duration(REFRESH_SERVER_KERBEROS_RELOGIN_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS)
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var reloginCommandCancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var reloginTaskCancellable: Cancellable = _
+ private val starterTaskDelay = Duration(STARTER_TASK_INITIAL_DELAY_MILLIS, TimeUnit.MILLISECONDS)
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var starterTaskCancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var starterTask1Cancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var starterTask2Cancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var starterTask3Cancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var renewTaskCancellable: Cancellable = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var token1: Token[_ <: TokenIdentifier] = _
+ @Mock(answer = Answers.RETURNS_SMART_NULLS)
+ private var token2: Token[_ <: TokenIdentifier] = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(scheduler.schedule(Matchers.eq(reloginInterval), Matchers.eq(reloginInterval),
+ any(classOf[ActorRef]),
+ Matchers.eq(Relogin))(
+ any(classOf[ExecutionContext]),
+ any(classOf[ActorRef])))
+ .thenReturn(reloginCommandCancellable)
+ when(clock.nowInMillis()).thenReturn(nowMillis)
+ actorRef = TestActorRef(Props(classOf[TokenRefreshService], kubernetesClient, scheduler,
+ ugi, settings, clock))
+ }
+
+ test("The token refresh service actor starts properly") {
+ verify(ugi).loginUserFromKeytab("my-principal", REFRESH_SERVER_KERBEROS_KEYTAB_PATH)
+ verify(scheduler).schedule(Matchers.eq(reloginInterval), Matchers.eq(reloginInterval),
+ same(actorRef),
+ Matchers.eq(Relogin))(
+ any(classOf[ExecutionContext]),
+ same(actorRef))
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 1)
+ assert(actor.hasExtraCancellable(Relogin.getClass, reloginCommandCancellable))
+ assert(actor.numPendingSecretTasks() == 0)
+ verifyNoMoreInteractions(scheduler)
+ }
+
+ test("The Relogin command launches a ReloginTask") {
+ when(scheduler.scheduleOnce(any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(
+ any(classOf[ExecutionContext])))
+ .thenReturn(reloginTaskCancellable)
+ actorRef ! Relogin
+
+ val taskCaptor: ArgumentCaptor[Runnable] = ArgumentCaptor.forClass(classOf[Runnable])
+ verify(scheduler).scheduleOnce(Matchers.eq(Duration(0, TimeUnit.MILLISECONDS)),
+ taskCaptor.capture())(
+ any(classOf[ExecutionContext]))
+ val task = taskCaptor.getValue
+ assert(task.getClass == classOf[ReloginTask])
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 2) // Relogin and ReloginTask
+ assert(actor.hasExtraCancellable(classOf[ReloginTask], reloginTaskCancellable))
+ assert(actor.numPendingSecretTasks() == 0)
+ }
+
+ test("The StartRefresh command launches a StarterTask") {
+ when(scheduler.scheduleOnce(any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(
+ any(classOf[ExecutionContext])))
+ .thenReturn(starterTaskCancellable)
+ val secret = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0101")
+ .endMetadata()
+ .build()
+ actorRef ! StartRefresh(secret)
+
+ val taskCaptor: ArgumentCaptor[Runnable] = ArgumentCaptor.forClass(classOf[Runnable])
+ verify(scheduler).scheduleOnce(Matchers.eq(starterTaskDelay),
+ taskCaptor.capture())(
+ any(classOf[ExecutionContext]))
+ val task = taskCaptor.getValue
+ assert(task.getClass == classOf[StarterTask])
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 1) // Relogin
+ assert(actor.numPendingSecretTasks() == 1)
+ assert(actor.hasSecretTaskCancellable("uid-0101", starterTaskCancellable))
+ }
+
+ test("The Renew command launches a RenewTask") {
+ when(scheduler.scheduleOnce(any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(
+ any(classOf[ExecutionContext])))
+ .thenReturn(starterTaskCancellable, renewTaskCancellable)
+ val secret = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0101")
+ .endMetadata()
+ .build()
+ actorRef ! StartRefresh(secret)
+ actorRef ! Renew(expireTime = nowMillis + 10000L, Map(), secret.getMetadata,
+ numConsecutiveErrors = 0)
+
+ val taskCaptor: ArgumentCaptor[Runnable] = ArgumentCaptor.forClass(classOf[Runnable])
+ val renewTaskDelay = Duration((10000L * 0.9).toLong, // 90% of expire time from now.
+ TimeUnit.MILLISECONDS)
+ verify(scheduler).scheduleOnce(Matchers.eq(renewTaskDelay),
+ taskCaptor.capture())(
+ any(classOf[ExecutionContext]))
+ val task = taskCaptor.getValue
+ assert(task.getClass == classOf[RenewTask])
+
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 1) // Relogin
+ assert(actor.numPendingSecretTasks() == 1)
+ assert(actor.hasSecretTaskCancellable("uid-0101", renewTaskCancellable))
+ }
+
+ test("The StopRefresh command cancels a RenewTask") {
+ when(scheduler.scheduleOnce(any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(
+ any(classOf[ExecutionContext])))
+ .thenReturn(starterTaskCancellable, renewTaskCancellable)
+ val secret = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0101")
+ .endMetadata()
+ .build()
+ actorRef ! StartRefresh(secret)
+ actorRef ! Renew(expireTime = nowMillis + 10000L, Map(), secret.getMetadata,
+ numConsecutiveErrors = 0)
+ actorRef ! StopRefresh(secret)
+
+ verify(renewTaskCancellable).cancel()
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 1) // Relogin
+ assert(actor.numPendingSecretTasks() == 0)
+ }
+
+ test("The UpdateSecretsToTrack command resets the task set to track") {
+ val secret1 = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0101")
+ .endMetadata()
+ .build()
+ val secret2 = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0202")
+ .endMetadata()
+ .build()
+ val secret3 = new SecretBuilder()
+ .withNewMetadata()
+ .withUid("uid-0303")
+ .endMetadata()
+ .build()
+ when(scheduler.scheduleOnce(any(classOf[FiniteDuration]),
+ any(classOf[Runnable]))(
+ any(classOf[ExecutionContext])))
+ .thenReturn(starterTask1Cancellable, // for secret1
+ starterTask2Cancellable, // for secret2
+ renewTaskCancellable, // for secret2
+ starterTask3Cancellable) // for secret3
+ actorRef ! UpdateSecretsToTrack(List(secret1)) // This adds a task for secret1.
+ actorRef ! StartRefresh(secret2) // Adds the secret2 task.
+ actorRef ! Renew(expireTime = nowMillis + 10000L, Map(), secret2.getMetadata,
+ numConsecutiveErrors = 0)
+ // This removes secret1, but not the recently added secret2.
+ actorRef ! UpdateSecretsToTrack(List(secret3))
+
+ verify(starterTask1Cancellable).cancel()
+ val actor: TokenRefreshService = actorRef.underlyingActor
+ assert(actor.numExtraCancellables() == 1) // Relogin
+ assert(actor.numPendingSecretTasks() == 2)
+ assert(actor.hasSecretTaskCancellable("uid-0202", renewTaskCancellable))
+ assert(actor.hasSecretTaskCancellable("uid-0303", starterTask3Cancellable))
+ }
+}