Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,18 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl

## Client Mode

Client mode is not currently supported.
Client mode is currently supported when launched `Out Cluster` (not in a Pod).

Both `spark-submit` and `spark-shell` can be used.

For this, you need to add to your launch configuration two properties:

```
spark.submit.deployMode="client"
spark.kubernetes.driver.pod.name="$HOSTNAME"
```

Client mode is currently **not** supported when launched `In Cluster` (in a Pod).

## Future Work

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,21 @@ private[spark] object KubernetesUtils {
case _ => uri
}
}

def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{Config => Fabric8Client, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher

Expand All @@ -36,6 +36,21 @@ import org.apache.spark.util.ThreadUtils
private[spark] object SparkKubernetesClientFactory {

def createKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
new java.io.File(Fabric8Client.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => createInClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
case false => createOutClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
}
}

private def createInClusterKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
Expand Down Expand Up @@ -88,6 +103,56 @@ private[spark] object SparkKubernetesClientFactory {
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

def createOutClusterKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
KubernetesUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
.build()
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,28 +152,42 @@ private[spark] class BasicExecutorFeatureStep(
.build()
}.getOrElse(executorContainer)
val driverPod = kubernetesConf.roleSpecificConf.driverPod
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()

val executorPod = (driverPod == null) match {
case true => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
case false => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
}
SparkPod(executorPod, containerWithLimitCores)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,65 @@ import java.io.File
import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.client.Config

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
import io.fabric8.kubernetes.client.{Config, KubernetesClient}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
trait ManagerSpecificHandlers {
def createKubernetesClient(sparkConf: SparkConf): KubernetesClient
}

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
private[spark] class KubernetesClusterManager extends ExternalClusterManager
with ManagerSpecificHandlers with Logging {

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
if (masterURL.startsWith("k8s") &&
sc.deployMode == "client" &&
!sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
class InClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
}

class OutClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
sparkConf.get("spark.master").replace("k8s://", ""),
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this prefix spark.kubernetes.authenticate.driver.mounted sounds weird in this case given that the client is running outside the cluster. BTW: can we alternatively use the config at $HOME//.kube/config to build a kubernetes client instead? I think this is a common approach for building clients outside a cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call to createKubernetesClient is not used in two different ways:

  • KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX is used in KubernetesClusterManager
  • KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX is used in KubernetesClientApplication

I would favor the second and remove the first.

For the config place, I remember that the fabric8 k8s client does also some inspection to see if it is in or out cluster, and loads the config form the default place (depending the case), with possiblity to specify other places for the cert, token... (this is what we give as property to the end-user).

sparkConf,
None,
None)
}

val modeHandler: ManagerSpecificHandlers = {
new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => new InClusterHandlers()
case false => new OutClusterHandlers()
}
}

override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
modeHandler.createKubernetesClient(sparkConf)

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
new TaskSchedulerImpl(sc)
}

override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sc.conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
val kubernetesClient = createKubernetesClient(sc.getConf)

val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
Expand Down