diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 7149616e534aa..9e3670ede81a5 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -260,7 +260,18 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl ## Client Mode -Client mode is not currently supported. +Client mode is currently supported when launched `Out Cluster` (not in a Pod). + +Both `spark-submit` and `spark-shell` can be used. + +For this, you need to add to your launch configuration two properties: + +``` +spark.submit.deployMode="client" +spark.kubernetes.driver.pod.name="$HOSTNAME" +``` + +Client mode is currently **not** supported when launched `In Cluster` (in a Pod). ## Future Work diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 66fff267545dc..a558059761ba3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -58,4 +58,21 @@ private[spark] object KubernetesUtils { case _ => uri } } + + def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + def requireSecondIfFirstIsDefined( + opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index c47e78cbf19e3..3b99cf99e3a4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -20,7 +20,7 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.{Config => Fabric8Client, ConfigBuilder, DefaultKubernetesClient, KubernetesClient} import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher @@ -36,6 +36,21 @@ import org.apache.spark.util.ThreadUtils private[spark] object SparkKubernetesClientFactory { def createKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + new java.io.File(Fabric8Client.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => createInClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix, + sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert) + case false => createOutClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix, + sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert) + } + } + + private def createInClusterKubernetesClient( master: String, namespace: Option[String], kubernetesAuthConfPrefix: String, @@ -88,6 +103,56 @@ private[spark] object SparkKubernetesClientFactory { new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) } + def createOutClusterKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { + val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" + val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" + val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) + .map(new File(_)) + .orElse(maybeServiceAccountToken) + val oauthTokenValue = sparkConf.getOption(oauthTokenConf) + KubernetesUtils.requireNandDefined( + oauthTokenFile, + oauthTokenValue, + s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + + s" value $oauthTokenConf.") + + val caCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") + .orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) + val clientKeyFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") + val clientCertFile = sparkConf + .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + val dispatcher = new Dispatcher( + ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) + val config = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master) + .withWebsocketPingInterval(0) + .withOption(oauthTokenValue) { + (token, configBuilder) => configBuilder.withOauthToken(token) + }.withOption(caCertFile) { + (file, configBuilder) => configBuilder.withCaCertFile(file) + }.withOption(clientKeyFile) { + (file, configBuilder) => configBuilder.withClientKeyFile(file) + }.withOption(clientCertFile) { + (file, configBuilder) => configBuilder.withClientCertFile(file) + }.withOption(namespace) { + (ns, configBuilder) => configBuilder.withNamespace(ns) + }.build() + val baseHttpClient = HttpClientUtils.createHttpClient(config) + val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() + .dispatcher(dispatcher) + .build() + new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) + } + private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder) extends AnyVal { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index abaeff0313a79..5d51a22065275 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -152,28 +152,42 @@ private[spark] class BasicExecutorFeatureStep( .build() }.getOrElse(executorContainer) val driverPod = kubernetesConf.roleSpecificConf.driverPod - val executorPod = new PodBuilder(pod.pod) - .editOrNewMetadata() - .withName(name) - .withLabels(kubernetesConf.roleLabels.asJava) - .withAnnotations(kubernetesConf.roleAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() - .endMetadata() - .editOrNewSpec() - .withHostname(hostname) - .withRestartPolicy("Never") - .withNodeSelector(kubernetesConf.nodeSelector().asJava) - .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) - .endSpec() - .build() - + val executorPod = (driverPod == null) match { + case true => new PodBuilder(pod.pod) + .editOrNewMetadata() + .withName(name) + .withLabels(kubernetesConf.roleLabels.asJava) + .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .endMetadata() + .editOrNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) + .endSpec() + .build() + case false => new PodBuilder(pod.pod) + .editOrNewMetadata() + .withName(name) + .withLabels(kubernetesConf.roleLabels.asJava) + .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() + .editOrNewSpec() + .withHostname(hostname) + .withRestartPolicy("Never") + .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) + .endSpec() + .build() + } SparkPod(executorPod, containerWithLimitCores) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index de2a52bc7a0b8..7891ecacacbfc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -20,27 +20,57 @@ import java.io.File import java.util.concurrent.TimeUnit import com.google.common.cache.CacheBuilder -import io.fabric8.kubernetes.client.Config - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.{SystemClock, ThreadUtils} -private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { +trait ManagerSpecificHandlers { + def createKubernetesClient(sparkConf: SparkConf): KubernetesClient +} - override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") +private[spark] class KubernetesClusterManager extends ExternalClusterManager + with ManagerSpecificHandlers with Logging { - override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") + class InClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } + + class OutClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + sparkConf.get("spark.master").replace("k8s://", ""), + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + None, + None) + } + + val modeHandler: ManagerSpecificHandlers = { + new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match { + case true => new InClusterHandlers() + case false => new OutClusterHandlers() } + } + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + modeHandler.createKubernetesClient(sparkConf) + + override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { new TaskSchedulerImpl(sc) } @@ -48,13 +78,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { - val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - KUBERNETES_MASTER_INTERNAL_URL, - Some(sc.conf.get(KUBERNETES_NAMESPACE)), - KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - sc.conf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + val kubernetesClient = createKubernetesClient(sc.getConf) val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests")