Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private[spark] object Config extends Logging {
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX was replaced by KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX . Please revert this addition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have KubernetesUtils that is supposed to be the place for all K8s-related utility methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

private[spark] object OptionRequirements {

def requireBothOrNeitherDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used, and if it were, perhaps would be clearer as

(opt1, opt2) match {
  case (Some(val1), None) =>
    ...
  case (None, Some(val2)) =>
    ...
  case _ => // ok
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of unused code that could be fixed in other PR (e.g. #21462). Will double check after merging master, but normally, client-mode does not impact this.

opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{Config => Fabric8Client, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher

Expand All @@ -36,6 +36,21 @@ import org.apache.spark.util.ThreadUtils
private[spark] object SparkKubernetesClientFactory {

def createKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
new java.io.File(Fabric8Client.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => createInClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
case false => createOutClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
}
}

private def createInClusterKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
Expand Down Expand Up @@ -88,6 +103,56 @@ private[spark] object SparkKubernetesClientFactory {
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

def createOutClusterKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
OptionRequirements.requireNandDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's only used once I'm not sure it warrents a separate file/method for checking Options.
Also the method signature isn't quite clear for me what it does. (especially requireN)
How about just a simple match that @squito suggested here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not introduce OptionRequirements.requireNandDefined but reused what was already in place.

oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
.build()
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,27 +152,42 @@ private[spark] class BasicExecutorFeatureStep(
.build()
}.getOrElse(executorContainer)
val driverPod = kubernetesConf.roleSpecificConf.driverPod
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
val executorPod = (driverPod == null) match {
case true => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
case false => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
}
SparkPod(executorPod, containerWithLimitCores)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,65 @@ import java.io.File
import java.util.concurrent.TimeUnit

import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.client.Config

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
import io.fabric8.kubernetes.client.{Config, KubernetesClient}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.{SystemClock, ThreadUtils}

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
trait ManagerSpecificHandlers {
def createKubernetesClient(sparkConf: SparkConf): KubernetesClient
}

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
private[spark] class KubernetesClusterManager extends ExternalClusterManager
with ManagerSpecificHandlers with Logging {

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
if (masterURL.startsWith("k8s") &&
sc.deployMode == "client" &&
!sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
class InClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
}

class OutClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
sparkConf.get("spark.master").replace("k8s://", ""),
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this prefix spark.kubernetes.authenticate.driver.mounted sounds weird in this case given that the client is running outside the cluster. BTW: can we alternatively use the config at $HOME//.kube/config to build a kubernetes client instead? I think this is a common approach for building clients outside a cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call to createKubernetesClient is not used in two different ways:

  • KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX is used in KubernetesClusterManager
  • KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX is used in KubernetesClientApplication

I would favor the second and remove the first.

For the config place, I remember that the fabric8 k8s client does also some inspection to see if it is in or out cluster, and loads the config form the default place (depending the case), with possiblity to specify other places for the cert, token... (this is what we give as property to the end-user).

sparkConf,
None,
None)
}

val modeHandler: ManagerSpecificHandlers = {
new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => new InClusterHandlers()
case false => new OutClusterHandlers()
}
}

override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
modeHandler.createKubernetesClient(sparkConf)

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
new TaskSchedulerImpl(sc)
}

override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sc.conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
val kubernetesClient = createKubernetesClient(sc.getConf)

val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
Expand Down