Skip to content

Commit 7a0b4e4

Browse files
committed
Adding unit test and one more integration test
1 parent 61a7414 commit 7a0b4e4

File tree

19 files changed

+432
-21
lines changed

19 files changed

+432
-21
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ package object config extends Logging {
512512
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
513513

514514
private[spark] val KUBERNETES_KERBEROS_SUPPORT =
515-
ConfigBuilder("spark.kubernetes.kerberos")
515+
ConfigBuilder("spark.kubernetes.kerberos.enabled")
516516
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
517517
.booleanConf
518518
.createWithDefault(false)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.spark.internal.Logging
4141
* assumes that the job user has either specified a principal and keytab or ran
4242
* $kinit before running spark-submit. With a TGT stored locally, by running
4343
* UGI.getCurrentUser you are able to obtain the current user, alternatively
44-
* you can run UGI.logingUserFromKeytabAndReturnUGI and by running .doAs run
44+
* you can run UGI.loginUserFromKeytabAndReturnUGI and by running .doAs run
4545
* as the logged into user instead of the current user. With the Job User principal
4646
* you then retrieve the delegation token from the NameNode and store values in
4747
* DelegationToken. Lastly, the class puts the data into a secret. All this is

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrapImpl, OptionRequir
2323
import org.apache.spark.deploy.kubernetes.config._
2424
import org.apache.spark.internal.Logging
2525

26-
2726
/**
2827
* Returns the complete ordered list of steps required to configure the hadoop configurations.
2928
*/

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/InitContainerResourceStagingServerSecretPluginSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
2323
import org.apache.spark.deploy.kubernetes.constants._
2424
import org.apache.spark.SparkFunSuite
2525

26-
class InitContainerResourceStagingServerSecretPluginSuite extends SparkFunSuite with BeforeAndAfter{
26+
private[spark] class InitContainerResourceStagingServerSecretPluginSuite extends SparkFunSuite with BeforeAndAfter{
2727
private val INIT_CONTAINER_SECRET_NAME = "init-secret"
2828
private val INIT_CONTAINER_SECRET_MOUNT = "/tmp/secret"
2929

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientSuite.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,22 @@
1717
package org.apache.spark.deploy.kubernetes.submit
1818

1919
import com.google.common.collect.Iterables
20-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
20+
import io.fabric8.kubernetes.api.model._
21+
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
2122
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
22-
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable, PodResource, Resource}
23-
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
23+
import org.apache.spark.deploy.kubernetes.constants._
24+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec}
25+
import org.apache.spark.{SparkConf, SparkFunSuite}
2426
import org.mockito.Mockito.{doReturn, verify, when}
2527
import org.mockito.invocation.InvocationOnMock
2628
import org.mockito.stubbing.Answer
29+
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
2730
import org.scalatest.BeforeAndAfter
2831
import org.scalatest.mock.MockitoSugar._
29-
import scala.collection.JavaConverters._
3032

31-
import org.apache.spark.{SparkConf, SparkFunSuite}
32-
import org.apache.spark.deploy.kubernetes.constants._
33-
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec}
33+
import scala.collection.JavaConverters._
3434

35-
class ClientSuite extends SparkFunSuite with BeforeAndAfter {
35+
private[spark] class ClientSuite extends SparkFunSuite with BeforeAndAfter {
3636

3737
private val DRIVER_POD_UID = "pod-id"
3838
private val DRIVER_POD_API_VERSION = "v1"

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStepSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222

2323
import org.apache.spark.{SparkConf, SparkFunSuite}
2424

25-
class PythonStepSuite extends SparkFunSuite with BeforeAndAfter {
25+
private[spark] class PythonStepSuite extends SparkFunSuite with BeforeAndAfter {
2626
private val FILE_DOWNLOAD_PATH = "/var/data/spark-files"
2727
private val PYSPARK_FILES = Seq(
2828
"hdfs://localhost:9000/app/files/file1.py",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import java.io.File
20+
import java.util.UUID
21+
22+
import scala.collection.JavaConverters._
23+
24+
import com.google.common.io.Files
25+
import io.fabric8.kubernetes.api.model._
26+
import org.apache.commons.io.FileUtils.readFileToString
27+
import org.mockito.{Mock, MockitoAnnotations}
28+
import org.mockito.Matchers.any
29+
import org.mockito.Mockito.when
30+
import org.mockito.invocation.InvocationOnMock
31+
import org.mockito.stubbing.Answer
32+
import org.scalatest.BeforeAndAfter
33+
34+
import org.apache.spark.SparkFunSuite
35+
import org.apache.spark.deploy.kubernetes.{HadoopConfBootstrap, PodWithMainContainer}
36+
import org.apache.spark.deploy.kubernetes.constants.HADOOP_CONF_DIR_LOC
37+
import org.apache.spark.util.Utils
38+
39+
40+
private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with BeforeAndAfter{
41+
private val CONFIG_MAP_NAME = "config-map"
42+
private val HADOOP_CONF_DIR_VAL = "/etc/hadoop"
43+
private val POD_LABEL = Map("bootstrap" -> "true")
44+
private val DRIVER_CONTAINER_NAME = "driver-container"
45+
private val TEMP_HADOOP_FILE = createTempFile("core-site.xml")
46+
private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE)
47+
48+
@Mock
49+
private var hadoopConfBootstrap : HadoopConfBootstrap = _
50+
51+
before {
52+
MockitoAnnotations.initMocks(this)
53+
when(hadoopConfBootstrap.bootstrapMainContainerAndVolumes(
54+
any[PodWithMainContainer])).thenAnswer(new Answer[PodWithMainContainer] {
55+
override def answer(invocation: InvocationOnMock) : PodWithMainContainer = {
56+
val pod = invocation.getArgumentAt(0, classOf[PodWithMainContainer])
57+
pod.copy(
58+
pod =
59+
new PodBuilder(pod.pod)
60+
.withNewMetadata()
61+
.addToLabels("bootstrap", "true")
62+
.endMetadata()
63+
.withNewSpec().endSpec()
64+
.build(),
65+
mainContainer =
66+
new ContainerBuilder()
67+
.withName(DRIVER_CONTAINER_NAME).build()
68+
)}})
69+
}
70+
71+
test("Test of mounting hadoop_conf_dir files into HadoopConfigSpec") {
72+
val hadoopConfStep = new HadoopConfMounterStep(
73+
CONFIG_MAP_NAME,
74+
HADOOP_FILES,
75+
hadoopConfBootstrap,
76+
HADOOP_CONF_DIR_VAL)
77+
val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL)
78+
val expectedConfigMap = Map(
79+
TEMP_HADOOP_FILE.toPath.getFileName.toString ->
80+
readFileToString(TEMP_HADOOP_FILE)
81+
)
82+
val hadoopConfSpec = HadoopConfigSpec(
83+
Map.empty[String, String],
84+
new Pod(),
85+
new Container(),
86+
Map.empty[String, String],
87+
None,
88+
"",
89+
"")
90+
val returnContainerSpec = hadoopConfStep.configureContainers(hadoopConfSpec)
91+
assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf)
92+
assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
93+
assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
94+
assert(returnContainerSpec.configMapProperties === expectedConfigMap)
95+
}
96+
private def createTempFile(contents: String): File = {
97+
val dir = Utils.createTempDir()
98+
val file = new File(dir, s"${UUID.randomUUID().toString}")
99+
Files.write(contents.getBytes, file)
100+
file
101+
}
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import java.io.File
20+
import java.util.UUID
21+
22+
import scala.collection.JavaConverters._
23+
24+
import com.google.common.io.Files
25+
import io.fabric8.kubernetes.api.model._
26+
27+
import org.apache.spark.{SparkConf, SparkFunSuite}
28+
import org.apache.spark.deploy.kubernetes.constants._
29+
import org.apache.spark.util.Utils
30+
31+
32+
33+
private[spark] class HadoopKerberosKeytabResolverStepSuite extends SparkFunSuite {
34+
private val POD_LABEL = Map("bootstrap" -> "true")
35+
private val DRIVER_CONTAINER_NAME = "driver-container"
36+
private val TEMP_KEYTAB_FILE = createTempFile("keytab")
37+
private val KERB_PRINCIPAL = "[email protected]"
38+
39+
// TODO: Require mocking of UGI methods
40+
test("Testing keytab login") {
41+
val keytabStep = new HadoopKerberosKeytabResolverStep(
42+
new SparkConf(),
43+
Some(KERB_PRINCIPAL),
44+
Some(TEMP_KEYTAB_FILE))
45+
val hadoopConfSpec = HadoopConfigSpec(
46+
Map.empty[String, String],
47+
new PodBuilder()
48+
.withNewMetadata()
49+
.addToLabels("bootstrap", "true")
50+
.endMetadata()
51+
.withNewSpec().endSpec()
52+
.build(),
53+
new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(),
54+
Map.empty[String, String],
55+
None,
56+
"",
57+
"")
58+
val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec)
59+
assert(returnContainerSpec.additionalDriverSparkConf(HADOOP_KERBEROS_CONF_LABEL)
60+
.contains(KERBEROS_SECRET_LABEL_PREFIX))
61+
assert(returnContainerSpec.additionalDriverSparkConf(HADOOP_KERBEROS_CONF_SECRET) ===
62+
HADOOP_KERBEROS_SECRET_NAME)
63+
assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
64+
assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
65+
assert(returnContainerSpec.dtSecretLabel.contains(KERBEROS_SECRET_LABEL_PREFIX))
66+
assert(returnContainerSpec.dtSecretName === HADOOP_KERBEROS_SECRET_NAME)
67+
assert(returnContainerSpec.dtSecret.nonEmpty)
68+
assert(returnContainerSpec.dtSecret.get.getMetadata.getName === HADOOP_KERBEROS_SECRET_NAME)
69+
}
70+
71+
private def createTempFile(contents: String): File = {
72+
val dir = Utils.createTempDir()
73+
val file = new File(dir, s"${UUID.randomUUID().toString}")
74+
Files.write(contents.getBytes, file)
75+
file
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import scala.collection.JavaConverters._
20+
21+
import io.fabric8.kubernetes.api.model._
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
import org.apache.spark.deploy.kubernetes.constants._
25+
26+
private[spark] class HadoopKerberosSecretResolverStepSuite extends SparkFunSuite {
27+
private val CONFIG_MAP_NAME = "config-map"
28+
private val HADOOP_CONF_DIR_VAL = "/etc/hadoop"
29+
private val POD_LABEL = Map("bootstrap" -> "true")
30+
private val DRIVER_CONTAINER_NAME = "driver-container"
31+
private val TOKEN_SECRET_NAME = "secretName"
32+
private val TOKEN_SECRET_LABEL = "secretLabel"
33+
34+
test("Testing kerberos with Secret") {
35+
val keytabStep = new HadoopKerberosSecretResolverStep(
36+
new SparkConf(),
37+
TOKEN_SECRET_NAME,
38+
TOKEN_SECRET_LABEL)
39+
val expectedDriverSparkConf = Map(
40+
HADOOP_KERBEROS_CONF_LABEL -> TOKEN_SECRET_LABEL,
41+
HADOOP_KERBEROS_CONF_SECRET -> TOKEN_SECRET_NAME)
42+
val hadoopConfSpec = HadoopConfigSpec(
43+
Map.empty[String, String],
44+
new PodBuilder()
45+
.withNewMetadata()
46+
.addToLabels("bootstrap", "true")
47+
.endMetadata()
48+
.withNewSpec().endSpec()
49+
.build(),
50+
new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(),
51+
Map.empty[String, String],
52+
None,
53+
"",
54+
"")
55+
val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec)
56+
assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf)
57+
assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME)
58+
assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL)
59+
assert(returnContainerSpec.dtSecret === None)
60+
assert(returnContainerSpec.dtSecretLabel === TOKEN_SECRET_LABEL)
61+
assert(returnContainerSpec.dtSecretName === TOKEN_SECRET_NAME)
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps.hadoopsteps
18+
19+
import org.apache.spark.{SparkConf, SparkFunSuite}
20+
import org.apache.spark.deploy.kubernetes.config._
21+
22+
private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite {
23+
private val NAMESPACE = "testNamespace"
24+
private val HADOOP_CONFIG_MAP = "hadoop-config-map"
25+
private val HADOOP_CONF_DIR_VAL = "/etc/hadoop/conf"
26+
27+
test("Testing without Kerberos") {
28+
val sparkTestConf = new SparkConf(true)
29+
.set(KUBERNETES_KERBEROS_SUPPORT, false)
30+
val hadoopOrchestrator = new HadoopStepsOrchestrator(
31+
NAMESPACE,
32+
HADOOP_CONFIG_MAP,
33+
sparkTestConf,
34+
HADOOP_CONF_DIR_VAL)
35+
val steps = hadoopOrchestrator.getHadoopSteps()
36+
assert(steps.length === 1)
37+
assert(steps.head.isInstanceOf[HadoopConfMounterStep])
38+
assert(true)
39+
}
40+
41+
// test("Testing with Keytab Kerberos Login") {
42+
// val sparkTestConf2 = new SparkConf(true)
43+
// .set(KUBERNETES_KERBEROS_SUPPORT, true)
44+
// .set(KUBERNETES_KERBEROS_KEYTAB, "keytab.file")
45+
// .set(KUBERNETES_KERBEROS_PRINCIPAL, "user@kerberos")
46+
// val hadoopOrchestrator = new HadoopStepsOrchestrator(
47+
// NAMESPACE,
48+
// HADOOP_CONFIG_MAP,
49+
// sparkTestConf2,
50+
// HADOOP_CONF_DIR_VAL)
51+
// val steps = hadoopOrchestrator.getHadoopSteps()
52+
// assert(steps.length === 2)
53+
// assert(steps.head.isInstanceOf[HadoopConfMounterStep])
54+
// assert(steps(1).isInstanceOf[HadoopKerberosKeytabResolverStep])
55+
// }
56+
57+
// test("Testing with Keytab Kerberos Login") {
58+
// val sparkTestConf3 = new SparkConf(true)
59+
// .set(KUBERNETES_KERBEROS_SUPPORT, true)
60+
// .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, "dtSecret")
61+
// .set(KUBERNETES_KERBEROS_DT_SECRET_LABEL, "dtLabel")
62+
// val hadoopOrchestrator = new HadoopStepsOrchestrator(
63+
// NAMESPACE,
64+
// HADOOP_CONFIG_MAP,
65+
// sparkTestConf3,
66+
// HADOOP_CONF_DIR_VAL)
67+
// val steps = hadoopOrchestrator.getHadoopSteps()
68+
// assert(steps.length === 2)
69+
// assert(steps.head.isInstanceOf[HadoopConfMounterStep])
70+
// assert(steps(1).isInstanceOf[HadoopKerberosSecretResolverStep])
71+
// }
72+
}

0 commit comments

Comments
 (0)