Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit dc8f2eb

Browse files
committed
Included integration tests for Stage 1
1 parent a6431a0 commit dc8f2eb

File tree

10 files changed

+61
-46
lines changed

10 files changed

+61
-46
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,17 @@ package object constants {
8686
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
8787
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_DIR = "/etc/spark-init"
8888
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
89+
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
90+
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
91+
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
92+
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
93+
8994
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
9095
private[spark] val HADOOP_FILE_DIR = "/etc/hadoop"
9196
private[spark] val HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
97+
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
9298
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
9399
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"
94-
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
95-
s"$INIT_CONTAINER_PROPERTIES_FILE_DIR/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
96-
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
97-
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
98100

99101
// Miscellaneous
100102
private[spark] val ANNOTATION_EXECUTOR_NODE_AFFINITY = "scheduler.alpha.kubernetes.io/affinity"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ private[spark] class Client(
149149
}
150150

151151
private[spark] object Client {
152-
def run(sparkConf: SparkConf, clientArguments: ClientArguments): Unit = {
152+
def run(sparkConf: SparkConf,
153+
clientArguments: ClientArguments,
154+
hadoopConfDir: Option[String]): Unit = {
153155
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
154156
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
155157
val launchTime = System.currentTimeMillis()
@@ -168,6 +170,7 @@ private[spark] object Client {
168170
clientArguments.mainClass,
169171
clientArguments.driverArgs,
170172
clientArguments.otherPyFiles,
173+
hadoopConfDir,
171174
sparkConf)
172175
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
173176
master,
@@ -195,6 +198,7 @@ private[spark] object Client {
195198
def main(args: Array[String]): Unit = {
196199
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
197200
val sparkConf = new SparkConf()
198-
run(sparkConf, parsedArguments)
201+
val hadoopConfDir = sys.env.get("HADOOP_CONF_DIR")
202+
run(sparkConf, parsedArguments, hadoopConfDir)
199203
}
200204
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
4040
mainClass: String,
4141
appArgs: Array[String],
4242
additionalPythonFiles: Seq[String],
43+
hadoopConfDir: Option[String],
4344
submissionSparkConf: SparkConf) {
4445

4546
// The resource name prefix is derived from the application name, making it easy to connect the
@@ -98,9 +99,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9899
submissionSparkConf)
99100
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
100101
submissionSparkConf, kubernetesResourceNamePrefix)
101-
val hadoopConfigurations =
102-
sys.env.get("HADOOP_CONF_DIR").map{ conf => getHadoopConfFiles(conf)}
103-
.getOrElse(Array.empty[File])
102+
val hadoopConfigurations = hadoopConfDir.map(conf => getHadoopConfFiles(conf))
103+
.getOrElse(Array.empty[File])
104104
val hadoopConfigSteps =
105105
if (hadoopConfigurations.isEmpty) {
106106
Option.empty[DriverConfigurationStep]
@@ -109,7 +109,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
109109
namespace,
110110
hadoopConfigMapName,
111111
submissionSparkConf,
112-
hadoopConfigurations)
112+
hadoopConfigurations,
113+
hadoopConfDir)
113114
val hadoopConfSteps =
114115
hadoopStepsOrchestrator.getHadoopSteps()
115116
Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/HadoopConfigBootstrapStep.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ private[spark] class HadoopConfigBootstrapStep(
3737
var currentHadoopSpec = HadoopConfigSpec(
3838
driverPod = driverSpec.driverPod,
3939
driverContainer = driverSpec.driverContainer,
40-
configMapProperties = Map.empty[String, String])
40+
configMapProperties = Map.empty[String, String],
41+
additionalDriverSparkConf = Map.empty[String, String])
4142
for (nextStep <- hadoopConfigurationSteps) {
4243
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
4344
}
@@ -50,6 +51,7 @@ private[spark] class HadoopConfigBootstrapStep(
5051
.build()
5152
val executorSparkConf = driverSpec.driverSparkConf.clone()
5253
.set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName)
54+
.setAll(currentHadoopSpec.additionalDriverSparkConf)
5355
driverSpec.copy(
5456
driverPod = currentHadoopSpec.driverPod,
5557
driverContainer = currentHadoopSpec.driverContainer,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
1818

1919
import java.io.File
20+
2021
import org.apache.commons.io.FileUtils.readFileToString
2122

2223
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrap, PodWithMainContainer}
24+
import org.apache.spark.deploy.kubernetes.constants._
2325

2426
/**
2527
* Step that configures the ConfigMap + Volumes for the driver
2628
*/
2729
private[spark] class HadoopConfMounterStep(
2830
hadoopConfigMapName: String,
2931
hadoopConfigurationFiles: Array[File],
30-
hadoopConfBootstrapConf: HadoopConfBootstrap)
32+
hadoopConfBootstrapConf: HadoopConfBootstrap,
33+
hadoopConfDir: Option[String])
3134
extends HadoopConfigurationStep {
3235

3336
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = {
@@ -42,7 +45,10 @@ private[spark] class HadoopConfMounterStep(
4245
driverContainer = bootstrappedPodAndMainContainer.mainContainer,
4346
configMapProperties =
4447
hadoopConfigurationFiles.map(file =>
45-
(file.toPath.getFileName.toString, readFileToString(file))).toMap
48+
(file.toPath.getFileName.toString, readFileToString(file))).toMap,
49+
additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++
50+
hadoopConfDir.map(conf_dir => Map(HADOOP_CONF_DIR_LOC -> conf_dir)).getOrElse(
51+
Map.empty[String, String])
4652
)
4753
}
4854
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod}
3030
* pairs of (path, data)
3131
*/
3232
private[spark] case class HadoopConfigSpec(
33-
// additionalDriverSparkConf: Map[String, String],
33+
additionalDriverSparkConf: Map[String, String],
3434
driverPod: Pod,
3535
driverContainer: Container,
3636
configMapProperties: Map[String, String])

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ private[spark] class HadoopStepsOrchestrator(
3030
namespace: String,
3131
hadoopConfigMapName: String,
3232
submissionSparkConf: SparkConf,
33-
hadoopConfigurationFiles: Array[File]) {
33+
hadoopConfigurationFiles: Array[File],
34+
hadoopConfDir: Option[String]) {
3435
private val maybeKerberosSupport = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT)
3536

3637
def getHadoopSteps(): Seq[HadoopConfigurationStep] = {
@@ -40,7 +41,8 @@ private[spark] class HadoopStepsOrchestrator(
4041
val hadoopConfMounterStep = new HadoopConfMounterStep(
4142
hadoopConfigMapName,
4243
hadoopConfigurationFiles,
43-
hadoopConfBootstrapImpl)
44+
hadoopConfBootstrapImpl,
45+
hadoopConfDir)
4446
val maybeHadoopKerberosMountingStep =
4547
if (maybeKerberosSupport) {
4648
// TODO: Implement mounting secrets

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
4242
val maybeConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP)
4343
val maybeConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY)
4444
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME)
45+
val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC)
4546

4647
val maybeExecutorInitContainerSecretName =
4748
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
@@ -75,9 +76,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
7576
val hadoopBootStrap = for {
7677
hadoopConfigMap <- maybeHadoopConfigMap
7778
} yield {
78-
val hadoopConfigurations =
79-
sys.env.get("HADOOP_CONF_DIR").map{ conf => getHadoopConfFiles(conf)}
80-
.getOrElse(Array.empty[File])
79+
val hadoopConfigurations = maybeHadoopConfDir.map(
80+
conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File])
8181
new HadoopConfBootstrapImpl(
8282
hadoopConfigMap,
8383
hadoopConfigurations

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
4242
MAIN_CLASS,
4343
APP_ARGS,
4444
ADDITIONAL_PYTHON_FILES,
45+
None,
4546
sparkConf)
4647
val steps = orchestrator.getAllConfigurationSteps()
4748
assert(steps.size === 3)
@@ -63,6 +64,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
6364
MAIN_CLASS,
6465
APP_ARGS,
6566
ADDITIONAL_PYTHON_FILES,
67+
None,
6668
sparkConf)
6769
val steps = orchestrator.getAllConfigurationSteps()
6870
assert(steps.size === 4)
@@ -84,6 +86,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
8486
MAIN_CLASS,
8587
APP_ARGS,
8688
ADDITIONAL_PYTHON_FILES,
89+
None,
8790
sparkConf)
8891
val steps = orchestrator.getAllConfigurationSteps()
8992
assert(steps.size === 4)

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -74,31 +74,19 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
7474

7575
test("Include HADOOP_CONF for HDFS based jobs ") {
7676
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
77-
// Ensuring that HADOOP_CONF_DIR env variable is set
78-
val builder = new ProcessBuilder(
79-
Seq("/bin/bash", "-c", "export HADOOP_CONF_DIR=" +
80-
"test-data/hadoop-conf-files && exec").asJava)
81-
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
82-
val process = builder.start()
83-
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
84-
val exitCode = process.waitFor()
85-
if (exitCode != 0) {
86-
logInfo(s"exitCode: $exitCode")
87-
}
77+
// Ensuring that HADOOP_CONF_DIR variable is set, could also be one via env HADOOP_CONF_DIR
8878
sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
89-
runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE)
79+
runSparkApplicationAndVerifyCompletion(
80+
JavaMainAppResource(CONTAINER_LOCAL_MAIN_APP_RESOURCE),
81+
SPARK_PI_MAIN_CLASS,
82+
Seq("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files", "Pi is roughly 3"),
83+
Array("5"),
84+
Seq.empty[String],
85+
Some("test-data/hadoop-conf-files"))
9086
}
9187

9288
test("Run PySpark Job on file from SUBMITTER with --py-files") {
9389
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
94-
// Ensuring that HADOOP_CONF_DIR env variable is unset
95-
val builder = new ProcessBuilder(
96-
Seq("/bin/bash", "-c", "export HADOOP_CONF_DIR=" +
97-
" && exec").asJava)
98-
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
99-
val process = builder.start()
100-
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
101-
val exitCode = process.waitFor()
10290
launchStagingServer(SSLOptions(), None)
10391
sparkConf
10492
.set(DRIVER_DOCKER_IMAGE,
@@ -183,7 +171,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
183171
GROUP_BY_MAIN_CLASS,
184172
Seq("The Result is"),
185173
Array.empty[String],
186-
Seq.empty[String])
174+
Seq.empty[String],
175+
None)
187176
}
188177

189178
test("Use remote resources without the resource staging server.") {
@@ -247,7 +236,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
247236
FILE_EXISTENCE_MAIN_CLASS,
248237
Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."),
249238
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS),
250-
Seq.empty[String])
239+
Seq.empty[String],
240+
None)
251241
}
252242

253243
test("Use a very long application name.") {
@@ -277,9 +267,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
277267
runSparkApplicationAndVerifyCompletion(
278268
JavaMainAppResource(appResource),
279269
SPARK_PI_MAIN_CLASS,
280-
Seq("Pi is roughly 3"),
270+
Seq(
271+
"hadoop config map key was not specified",
272+
"Pi is roughly 3"),
281273
Array.empty[String],
282-
Seq.empty[String])
274+
Seq.empty[String],
275+
None)
283276
}
284277

285278
private def runPySparkPiAndVerifyCompletion(
@@ -289,21 +282,23 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
289282
PYSPARK_PI_MAIN_CLASS,
290283
Seq("Submitting 5 missing tasks from ResultStage", "Pi is roughly 3"),
291284
Array("5"),
292-
otherPyFiles)
285+
otherPyFiles,
286+
None)
293287
}
294288

295289
private def runSparkApplicationAndVerifyCompletion(
296290
appResource: MainAppResource,
297291
mainClass: String,
298292
expectedLogOnCompletion: Seq[String],
299293
appArgs: Array[String],
300-
otherPyFiles: Seq[String]): Unit = {
294+
otherPyFiles: Seq[String],
295+
hadoopConfDir: Option[String]): Unit = {
301296
val clientArguments = ClientArguments(
302297
mainAppResource = appResource,
303298
mainClass = mainClass,
304299
driverArgs = appArgs,
305300
otherPyFiles = otherPyFiles)
306-
Client.run(sparkConf, clientArguments)
301+
Client.run(sparkConf, clientArguments, hadoopConfDir)
307302
val driverPod = kubernetesTestComponents.kubernetesClient
308303
.pods()
309304
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)

0 commit comments

Comments
 (0)