Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 488c535

Browse files
committed
Address a bunch of style and other comments
1 parent cf82b21 commit 488c535

File tree

10 files changed

+33
-110
lines changed

10 files changed

+33
-110
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717

1818
package org.apache.spark.deploy.k8s
1919

20-
import org.apache.spark.{SparkConf, SparkException}
21-
import org.apache.spark.internal.Logging
20+
import org.apache.spark.SparkConf
2221

23-
private[spark] object ConfigurationUtils extends Logging {
22+
private[spark] object ConfigurationUtils {
2423
def parsePrefixedKeyValuePairs(
2524
sparkConf: SparkConf,
2625
prefix: String,
@@ -34,4 +33,24 @@ private[spark] object ConfigurationUtils extends Logging {
3433
}
3534
fromPrefix.toMap
3635
}
36+
37+
def requireBothOrNeitherDefined(
38+
opt1: Option[_],
39+
opt2: Option[_],
40+
errMessageWhenFirstIsMissing: String,
41+
errMessageWhenSecondIsMissing: String): Unit = {
42+
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
43+
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
44+
}
45+
46+
def requireSecondIfFirstIsDefined(
47+
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
48+
opt1.foreach { _ =>
49+
require(opt2.isDefined, errMessageWhenSecondIsMissing)
50+
}
51+
}
52+
53+
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
54+
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
55+
}
3756
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/OptionRequirements.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private[spark] object SparkKubernetesClientFactory {
4848
.map(new File(_))
4949
.orElse(maybeServiceAccountToken)
5050
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
51-
OptionRequirements.requireNandDefined(
51+
ConfigurationUtils.requireNandDefined(
5252
oauthTokenFile,
5353
oauthTokenValue,
5454
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ package object config extends Logging {
3131
.stringConf
3232
.createWithDefault("default")
3333

34-
private[spark] val DRIVER_DOCKER_IMAGE =
35-
ConfigBuilder("spark.kubernetes.driver.docker.image")
36-
.doc("Docker image to use for the driver. Specify this using the standard Docker tag format.")
37-
.stringConf
38-
.createWithDefault(s"spark-driver:$sparkVersion")
39-
4034
private[spark] val EXECUTOR_DOCKER_IMAGE =
4135
ConfigBuilder("spark.kubernetes.executor.docker.image")
4236
.doc("Docker image to use for the executors. Specify this using the standard Docker tag" +
@@ -81,22 +75,8 @@ package object config extends Logging {
8175
.bytesConf(ByteUnit.MiB)
8276
.createOptional
8377

84-
private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
85-
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
86-
.doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" +
87-
" driver submission server. This is memory that accounts for things like VM overheads," +
88-
" interned strings, other native overheads, etc. This tends to grow with the driver's" +
89-
" memory size (typically 6-10%).")
90-
.bytesConf(ByteUnit.MiB)
91-
.createOptional
92-
93-
private[spark] val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
94-
private[spark] val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
9578
private[spark] val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
9679
private[spark] val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
97-
private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
98-
private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
99-
private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
10080

10181
private[spark] val KUBERNETES_DRIVER_POD_NAME =
10282
ConfigBuilder("spark.kubernetes.driver.pod.name")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,10 @@ package object constants {
2424
private[spark] val SPARK_POD_DRIVER_ROLE = "driver"
2525
private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor"
2626

27-
// Credentials secrets
28-
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
29-
"/mnt/secrets/spark-kubernetes-credentials"
30-
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
31-
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
32-
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
33-
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
34-
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
35-
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
36-
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
37-
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
38-
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
39-
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
40-
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
41-
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
42-
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
43-
4427
// Default and fixed ports
4528
private[spark] val DEFAULT_DRIVER_PORT = 7078
4629
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
47-
private[spark] val DEFAULT_UI_PORT = 4040
4830
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
49-
private[spark] val DRIVER_PORT_NAME = "driver-rpc-port"
5031
private[spark] val EXECUTOR_PORT_NAME = "executor"
5132

5233
// Environment Variables
@@ -57,20 +38,11 @@ package object constants {
5738
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
5839
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
5940
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
60-
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
61-
private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
6241
private[spark] val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
6342
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
64-
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
65-
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
66-
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
67-
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
68-
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
69-
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
7043
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
7144

7245
// Miscellaneous
73-
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
7446
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
7547
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
7648
private[spark] val MEMORY_OVERHEAD_MIN_MIB = 384L

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
22-
import org.apache.commons.io.FilenameUtils
21+
import io.fabric8.kubernetes.api.model._
2322

2423
import org.apache.spark.{SparkConf, SparkException}
2524
import org.apache.spark.deploy.k8s.ConfigurationUtils
@@ -48,7 +47,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
4847
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
4948
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
5049

51-
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs (
50+
private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
5251
sparkConf,
5352
KUBERNETES_EXECUTOR_LABEL_PREFIX,
5453
"executor label")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import java.io.File
2121
import io.fabric8.kubernetes.client.Config
2222

2323
import org.apache.spark.SparkContext
24-
import org.apache.spark.deploy.k8s.{ConfigurationUtils, SparkKubernetesClientFactory}
24+
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
2727
import org.apache.spark.internal.Logging
28-
import org.apache.spark.network.netty.SparkTransportConf
2928
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
30-
import org.apache.spark.util.{ThreadUtils, Utils}
29+
import org.apache.spark.util.ThreadUtils
3130

3231
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
3332

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import java.net.InetAddress
2121
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
2222
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
2323

24-
import scala.collection.{concurrent, mutable}
2524
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
2626
import scala.concurrent.{ExecutionContext, Future}
2727

2828
import io.fabric8.kubernetes.api.model._

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,14 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import scala.collection.JavaConverters._
2020

21-
import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _}
21+
import io.fabric8.kubernetes.api.model.{Pod, _}
2222
import io.fabric8.kubernetes.client.KubernetesClient
23-
import org.apache.commons.io.FilenameUtils
24-
import org.mockito.{AdditionalAnswers, MockitoAnnotations}
25-
import org.mockito.Matchers.{any, eq => mockitoEq}
26-
import org.mockito.Mockito._
27-
import org.mockito.invocation.InvocationOnMock
28-
import org.mockito.stubbing.Answer
23+
import org.mockito.MockitoAnnotations
2924
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
3025

3126
import org.apache.spark.{SparkConf, SparkFunSuite}
3227
import org.apache.spark.deploy.k8s.config._
3328
import org.apache.spark.deploy.k8s.constants
34-
import org.apache.spark.scheduler.cluster.k8s.ExecutorPodFactoryImpl
3529

3630
class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
3731
private val driverPodName: String = "driver-pod"

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
2727
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
2828
import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
2929
import org.mockito.Matchers.{any, eq => mockitoEq}
30-
import org.mockito.Mockito.{mock => _, _}
30+
import org.mockito.Mockito.{doNothing, never, times, verify, when}
3131
import org.scalatest.BeforeAndAfter
3232
import org.scalatest.mock.MockitoSugar._
3333

@@ -74,7 +74,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
7474
.build()
7575

7676
private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
77-
private type LABELLED_PODS = FilterWatchListDeletable[
77+
private type LABELED_PODS = FilterWatchListDeletable[
7878
Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
7979
private type IN_NAMESPACE_PODS = NonNamespaceOperation[
8080
Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
@@ -104,7 +104,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite
104104
private var podOperations: PODS = _
105105

106106
@Mock
107-
private var podsWithLabelOperations: LABELLED_PODS = _
107+
private var podsWithLabelOperations: LABELED_PODS = _
108108

109109
@Mock
110110
private var podsInNamespace: IN_NAMESPACE_PODS = _

0 commit comments

Comments
 (0)