Skip to content

Commit c6b11f8

Browse files
committed
Stage 3 Integration tests pass
1 parent b6912d2 commit c6b11f8

File tree

8 files changed

+48
-38
lines changed

8 files changed

+48
-38
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
132132
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
133133
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
134134

135-
private val maybeMountedHadoopSecret = conf.get(MOUNTED_HADOOP_SECRET_CONF)
136-
137135
private val driverPod = try {
138136
kubernetesClient.pods().inNamespace(kubernetesNamespace).
139137
withName(kubernetesDriverPodName).get()

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/HDFSTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private[spark] object HDFSTest{
4040
val end = System.currentTimeMillis()
4141
println("Iteration " + iter + " took " + (end-start) + " ms")
4242
}
43+
println(s"File contents: ${file.map(s => s.toString).collect().mkString(",")}")
4344
println(s"Returned length(s) of: ${file.map(s => s.length).collect().mkString(",")}")
4445
// scalastyle:on println
4546
spark.stop()

resource-managers/kubernetes/integration-tests/kerberos-yml/test-env.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ until /usr/bin/kinit -kt /var/keytabs/hdfs.keytab hdfs/nn.${NAMESPACE}.svc.clust
1919
--conf spark.kubernetes.executor.docker.image=spark-executor:latest \
2020
--conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
2121
--conf spark.hadoop.fs.defaultFS=hdfs://nn.${NAMESPACE}.svc.cluster.local:9000 \
22+
--conf spark.hadoop.dfs.data.transfer.protection=authentication \
2223
--conf spark.kubernetes.kerberos=true \
2324
--conf spark.kubernetes.kerberos.keytab=/var/keytabs/hdfs.keytab \
2425
--conf spark.kubernetes.kerberos.principal=hdfs/nn.${NAMESPACE}[email protected] \

resource-managers/kubernetes/integration-tests/src/test/resources/core-site.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,8 @@
3131
<name>fs.defaultFS</name>
3232
<value>hdfs://nn.REPLACE_ME.svc.cluster.local:9000</value>
3333
</property>
34+
<property>
35+
<name>hadoop.rpc.protection</name>
36+
<value>authentication</value>
37+
</property>
3438
</configuration>

resource-managers/kubernetes/integration-tests/src/test/resources/hdfs-site.xml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,8 @@
6767
<!-- DataNode security config -->
6868
<property>
6969
<name>dfs.data.transfer.protection</name>
70-
<value>authentication</value>
70+
<value>integrity</value>
7171
</property>
72-
7372
<property>
7473
<name>dfs.datanode.address</name>
7574
<value>0.0.0.0:10019</value>
@@ -128,6 +127,18 @@
128127
<name>dfs.datanode.kerberos.principal</name>
129128
<value>hdfs/[email protected]</value>
130129
</property>
130+
<property>
131+
<name>dfs.encrypt.data.transfer</name>
132+
<value>true</value>
133+
</property>
134+
<property>
135+
<name>dfs.encrypt.data.transfer.cipher.suites</name>
136+
<value>AES/CTR/NoPadding</value>
137+
</property>
138+
<property>
139+
<name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
140+
<value>256</value>
141+
</property>
131142

132143
<!-- Web Authentication config -->
133144
<property>

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
7878
}
7979

8080
after {
81-
kubernetesTestComponents.deleteKubernetesResources()
81+
kubernetesTestComponents.deleteKubernetesPVs()
8282
kubernetesTestComponents.deleteNamespace()
8383
}
8484

@@ -105,7 +105,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
105105
Map("spark-app-locator" -> APP_LOCATOR_LABEL))
106106
driverWatcherCache.start()
107107
driverWatcherCache.stop()
108-
val expectedLogOnCompletion = Seq("Returned length(s) of:")
108+
val expectedLogOnCompletion = Seq(
109+
"Returned length(s) of: 1",
110+
"File contents: [This is an awesome word count file]")
109111
val driverPod = kubernetesClient
110112
.pods()
111113
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,21 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
3939
.endMetadata()
4040
.done()
4141
}
42-
42+
def deleteKubernetesPVs(): Unit = {
43+
// Temporary hack until client library for fabric8 is updated to get around
44+
// the NPE that comes about when I do .list().getItems().asScala
45+
try {
46+
val pvList = kubernetesClient.persistentVolumes().list().getItems().asScala
47+
if (pvList.nonEmpty) {
48+
kubernetesClient.persistentVolumes().delete()
49+
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
50+
require(!pvList.exists(_.getMetadata.getNamespace == namespace))
51+
}
52+
}
53+
} catch {
54+
case ex: java.lang.NullPointerException =>
55+
}
56+
}
4357
def deleteNamespace(): Unit = {
4458
defaultClient.namespaces.withName(namespace).delete()
4559
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
@@ -52,27 +66,6 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
5266
}
5367
}
5468

55-
def deleteKubernetesResources(): Unit = {
56-
kubernetesClient.persistentVolumes().delete()
57-
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
58-
val persistentList = kubernetesClient
59-
.persistentVolumes()
60-
.list()
61-
.getItems()
62-
.asScala
63-
require(!persistentList.exists(_.getMetadata.getNamespace == namespace))
64-
}
65-
kubernetesClient.configMaps().delete()
66-
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
67-
val configMapsList = kubernetesClient
68-
.configMaps()
69-
.list()
70-
.getItems()
71-
.asScala
72-
require(!configMapsList.exists(_.getMetadata.getNamespace == namespace))
73-
}
74-
}
75-
7669
def newSparkConf(): SparkConf = {
7770
new SparkConf(true)
7871
.setMaster(s"k8s://${kubernetesClient.getMasterUrl}")

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,16 @@ private[spark] class SparkDockerImageBuilder
8787
if (exitCode != 0) {
8888
logInfo(s"exitCode: $exitCode")
8989
}
90-
buildImage("spark-base", BASE_DOCKER_FILE)
91-
buildImage("spark-driver", DRIVER_DOCKER_FILE)
92-
buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE)
93-
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
94-
buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE)
95-
buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE)
96-
buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE)
97-
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
98-
buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE)
99-
buildImage("kerberos-test", KERBEROS_DOCKER_FILE)
90+
// buildImage("spark-base", BASE_DOCKER_FILE)
91+
// buildImage("spark-driver", DRIVER_DOCKER_FILE)
92+
// buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE)
93+
// buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
94+
// buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE)
95+
// buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE)
96+
// buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE)
97+
// buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
98+
// buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE)
99+
// buildImage("kerberos-test", KERBEROS_DOCKER_FILE)
100100
}
101101

102102
private def buildImage(name: String, dockerFile: String): Unit = {

0 commit comments

Comments
 (0)