Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private[spark] object Config extends Logging {
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
private[spark] val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX was replaced by KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX . Please revert this addition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have KubernetesUtils that is supposed to be the place for all K8s-related utility methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

private[spark] object OptionRequirements {

def requireBothOrNeitherDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used, and if it were, perhaps would be clearer as

(opt1, opt2) match {
  case (Some(val1), None) =>
    ...
  case (None, Some(val2)) =>
    ...
  case _ => // ok
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of unused code that could be fixed in other PR (e.g. #21462). Will double check after merging master, but normally, client-mode does not impact this.

opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{Config => Fabric8Client, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher

Expand All @@ -36,6 +36,21 @@ import org.apache.spark.util.ThreadUtils
private[spark] object SparkKubernetesClientFactory {

def createKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
new java.io.File(Fabric8Client.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => createInClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
case false => createOutClusterKubernetesClient(master, namespace, kubernetesAuthConfPrefix,
sparkConf, maybeServiceAccountToken, maybeServiceAccountCaCert)
}
}

private def createInClusterKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
Expand Down Expand Up @@ -88,6 +103,56 @@ private[spark] object SparkKubernetesClientFactory {
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

def createOutClusterKubernetesClient(
master: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix ident

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
OptionRequirements.requireNandDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's only used once I'm not sure it warrents a separate file/method for checking Options.
Also the method signature isn't quite clear for me what it does. (especially requireN)
How about just a simple match that @squito suggested here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not introduce OptionRequirements.requireNandDefined but reused what was already in place.

oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
.build()
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
extends AnyVal {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,42 @@ private[spark] class BasicExecutorFeatureStep(
.build()
}.getOrElse(executorContainer)
val driverPod = kubernetesConf.roleSpecificConf.driverPod
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
val executorPod = (driverPod == null) match {
case true => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
case false => new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
}
SparkPod(executorPod, containerWithLimitCores)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,6 @@ private[spark] class Client(
throw e
}

if (waitForAppCompletion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not needed anymore? If we enable cluster mode we still want the same behavior defined here right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmh didn't change that behavior. Let's wait the next push and see the diff. master is evolving and numerous refactoring does not make this PR easy to merge with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I have added back that section... Thx @tnachen

logInfo(s"Waiting for application $appName to finish...")
watcher.awaitCompletion()
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $appName into Kubernetes.")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,68 @@ package org.apache.spark.scheduler.cluster.k8s

import java.io.File

import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.{Config, KubernetesClient}

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.util.ThreadUtils

private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
trait ManagerSpecificHandlers {
def createKubernetesClient(sparkConf: SparkConf): KubernetesClient
}

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
private[spark] class KubernetesClusterManager extends ExternalClusterManager
with ManagerSpecificHandlers with Logging {

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
if (masterURL.startsWith("k8s") &&
sc.deployMode == "client" &&
!sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) {
throw new SparkException("Client mode is currently not supported for Kubernetes.")
class InClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate conf prefix as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just took what was existing. will double check to ensure latest merge does not diverge.

sparkConf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
}

class OutClusterHandlers extends ManagerSpecificHandlers {
override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
SparkKubernetesClientFactory.createKubernetesClient(
sparkConf.get("spark.master").replace("k8s://", ""),
Some(sparkConf.get(KUBERNETES_NAMESPACE)),
APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sparkConf,
None,
None)
}

val modeHandler: ManagerSpecificHandlers = {
new java.io.File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH).exists() match {
case true => new InClusterHandlers()
case false => new OutClusterHandlers()
}
}

override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient =
modeHandler.createKubernetesClient(sparkConf)

override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
new TaskSchedulerImpl(sc)
}

override def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sc.conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
KUBERNETES_MASTER_INTERNAL_URL,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
sc.conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
val kubernetesClient = createKubernetesClient(sc.getConf)

val allocatorExecutor = ThreadUtils
.newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
Expand Down
Loading