Skip to content

Commit 9051b55

Browse files
ifilonenkofoxish
authored andcommitted
Use a pre-installed Minikube instance -- porting over logic from PR 521 (#14)
* initial logic needed for porting PR 521 * remove spark.version in pom * minor styling * handling flags, readmes, and POM changes * resolve issues with minikube and some comment resolution * updated readme * config values * Modify SparkDockerImageBuilder so it can delete docker images * Move the docker manager in git * Address more comments. * Address more comments
1 parent e24ecda commit 9051b55

File tree

11 files changed

+288
-216
lines changed

11 files changed

+288
-216
lines changed

README.md

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ $ ./e2e/runner.sh -m https://xyz -i test -r https://github.com/my-spark/spark -b
4646

4747
## Running the tests using maven
4848

49+
Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
50+
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
51+
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.
52+
4953
Running the integration tests requires a Spark distribution package tarball that
5054
contains Spark jars, submission clients, etc. You can download a tarball from
5155
http://spark.apache.org/downloads.html. Or, you can create a distribution from
@@ -82,37 +86,20 @@ In order to run against any cluster, use the following:
8286
```sh
8387
$ mvn clean integration-test \
8488
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
85-
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"
86-
```
87-
88-
## Preserve the Minikube VM
89-
90-
The integration tests make use of
91-
[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual
92-
machine and setup a single-node kubernetes cluster within it. By default the vm
93-
is destroyed after the tests are finished. If you want to preserve the vm, e.g.
94-
to reduce the running time of tests during development, you can pass the
95-
property `spark.docker.test.persistMinikube` to the test process:
96-
97-
```
98-
$ mvn clean integration-test \
99-
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
100-
-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true
101-
```
89+
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master>
10290
10391
## Reuse the previous Docker images
10492
10593
The integration tests build a number of Docker images, which takes some time.
10694
By default, the images are built every time the tests run. You may want to skip
10795
re-building those images during development, if the distribution package did not
10896
change since the last run. You can pass the property
109-
`spark.docker.test.skipBuildImages` to the test process. This will work only if
110-
you have been setting the property `spark.docker.test.persistMinikube`, in the
111-
previous run since the docker daemon run inside the minikube environment. Here
112-
is an example:
97+
`spark.kubernetes.test.imageDockerTag` to the test process and specify the Docker
98+
image tag that is appropriate.
99+
Here is an example:
113100
114101
```
115102
$ mvn clean integration-test \
116103
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
117-
"-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true"
104+
-Dspark.kubernetes.test.imageDockerTag=latest
118105
```

integration-test/pom.xml

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
<slf4j-log4j12.version>1.7.24</slf4j-log4j12.version>
4141
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
4242
<spark-distro-tgz>YOUR-SPARK-DISTRO-TARBALL-HERE</spark-distro-tgz>
43-
<spark-dockerfiles-dir>YOUR-DOCKERFILES-DIR-HERE</spark-dockerfiles-dir>
4443
<test.exclude.tags></test.exclude.tags>
4544
</properties>
4645
<packaging>jar</packaging>
@@ -141,37 +140,6 @@
141140
</execution>
142141
</executions>
143142
</plugin>
144-
<plugin>
145-
<groupId>com.googlecode.maven-download-plugin</groupId>
146-
<artifactId>download-maven-plugin</artifactId>
147-
<version>${download-maven-plugin.version}</version>
148-
<executions>
149-
<execution>
150-
<id>download-minikube-linux</id>
151-
<phase>pre-integration-test</phase>
152-
<goals>
153-
<goal>wget</goal>
154-
</goals>
155-
<configuration>
156-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
157-
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
158-
<outputFileName>minikube</outputFileName>
159-
</configuration>
160-
</execution>
161-
<execution>
162-
<id>download-minikube-darwin</id>
163-
<phase>pre-integration-test</phase>
164-
<goals>
165-
<goal>wget</goal>
166-
</goals>
167-
<configuration>
168-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
169-
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
170-
<outputFileName>minikube</outputFileName>
171-
</configuration>
172-
</execution>
173-
</executions>
174-
</plugin>
175143
<plugin>
176144
<!-- Triggers scalatest plugin in the integration-test phase instead of
177145
the test phase. -->

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
3030
import org.scalatest.time.{Minutes, Seconds, Span}
3131

3232
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
33-
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
33+
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
34+
import org.apache.spark.deploy.k8s.integrationtest.constants._
35+
import org.apache.spark.deploy.k8s.integrationtest.config._
3436

3537
private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
3638

@@ -65,6 +67,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
6567
.set("spark.kubernetes.driver.container.image", driverImage)
6668
.set("spark.kubernetes.executor.container.image", executorImage)
6769
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
70+
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
71+
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
72+
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
6873
.set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL)
6974
kubernetesTestComponents.createNamespace()
7075
}
@@ -98,6 +103,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
98103
}
99104

100105
test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") {
106+
doMinikubeCheck
101107
sparkAppConf
102108
.set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi")
103109
.set("spark.kubernetes.driver.label.label1", "label1-value")
@@ -245,6 +251,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
245251
}
246252
}
247253
}
254+
private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
248255

249256
private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
250257
assert(driverPod.getSpec.getContainers.get(0).getImage === driverImage)

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,32 @@ object Utils extends Logging {
2626
try f.apply(resource) finally resource.close()
2727
}
2828

29+
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
30+
var originalThrowable: Throwable = null
31+
try {
32+
block
33+
} catch {
34+
case t: Throwable =>
35+
// Purposefully not using NonFatal, because even fatal exceptions
36+
// we don't want to have our finallyBlock suppress
37+
originalThrowable = t
38+
throw originalThrowable
39+
} finally {
40+
try {
41+
finallyBlock
42+
} catch {
43+
case t: Throwable =>
44+
if (originalThrowable != null) {
45+
originalThrowable.addSuppressed(t)
46+
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
47+
throw originalThrowable
48+
} else {
49+
throw t
50+
}
51+
}
52+
}
53+
}
54+
2955
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
3056
require(rawMasterURL.startsWith("k8s://"),
3157
"Kubernetes master URL must start with k8s://.")

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.Utils
2222
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
23-
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND
23+
import org.apache.spark.deploy.k8s.integrationtest.config._
2424

2525
private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
@@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
3737
defaultClient
3838
}
3939

40-
override def name(): String = GCE_TEST_BACKEND
40+
override def dockerImageTag(): String = {
41+
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
42+
}
4143
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
2323
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
2424

2525
private[spark] trait IntegrationTestBackend {
26-
def name(): String
2726
def initialize(): Unit
2827
def getKubernetesClient: DefaultKubernetesClient
28+
def dockerImageTag(): String
2929
def cleanUp(): Unit = {}
3030
}
3131

3232
private[spark] object IntegrationTestBackendFactory {
3333
def getTestBackend(): IntegrationTestBackend = {
3434
Option(System.getProperty("spark.kubernetes.test.master"))
3535
.map(new GCETestBackend(_))
36-
.getOrElse(new MinikubeTestBackend())
36+
.getOrElse(MinikubeTestBackend)
3737
}
3838
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,74 +20,38 @@ import java.nio.file.Paths
2020

2121
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2222

23-
import org.apache.commons.lang3.SystemUtils
2423
import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils}
2524

2625
// TODO support windows
2726
private[spark] object Minikube extends Logging {
28-
private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) {
29-
Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile
30-
} else if (SystemUtils.IS_OS_WINDOWS) {
31-
throw new IllegalStateException("Executing Minikube based integration tests not yet " +
32-
" available on Windows.")
33-
} else {
34-
Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile
35-
}
36-
37-
private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " +
38-
s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}"
39-
4027
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
4128

42-
// NOTE: This and the following methods are synchronized to prevent deleteMinikube from
43-
// destroying the minikube VM while other methods try to use the VM.
44-
// Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox.
45-
def startMinikube(): Unit = synchronized {
46-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
47-
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
48-
executeMinikube("start", "--memory", "6000", "--cpus", "8")
49-
} else {
50-
logInfo("Minikube is already started.")
51-
}
52-
}
53-
54-
def getMinikubeIp: String = synchronized {
55-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
29+
def getMinikubeIp: String = {
5630
val outputs = executeMinikube("ip")
5731
.filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
5832
assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
5933
outputs.head
6034
}
6135

62-
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
63-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
36+
def getMinikubeStatus: MinikubeStatus.Value = {
6437
val statusString = executeMinikube("status")
65-
.filter(_.contains("minikube: "))
38+
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
6639
.head
40+
.replaceFirst("minikubeVM: ", "")
6741
.replaceFirst("minikube: ", "")
6842
MinikubeStatus.unapply(statusString)
6943
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
7044
}
7145

72-
def getDockerEnv: Map[String, String] = synchronized {
73-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
46+
def getDockerEnv: Map[String, String] = {
7447
executeMinikube("docker-env", "--shell", "bash")
7548
.filter(_.startsWith("export"))
7649
.map(_.replaceFirst("export ", "").split('='))
7750
.map(arr => (arr(0), arr(1).replaceAllLiterally("\"", "")))
7851
.toMap
7952
}
8053

81-
def deleteMinikube(): Unit = synchronized {
82-
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
83-
if (getMinikubeStatus != MinikubeStatus.NONE) {
84-
executeMinikube("delete")
85-
} else {
86-
logInfo("Minikube was already not running.")
87-
}
88-
}
89-
90-
def getKubernetesClient: DefaultKubernetesClient = synchronized {
54+
def getKubernetesClient: DefaultKubernetesClient = {
9155
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
9256
val userHome = System.getProperty("user.home")
9357
val kubernetesConf = new ConfigBuilder()
@@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging {
10569
}
10670

10771
private def executeMinikube(action: String, args: String*): Seq[String] = {
108-
if (!MINIKUBE_EXECUTABLE_DEST.canExecute) {
109-
if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) {
110-
throw new IllegalStateException("Failed to make the Minikube binary executable.")
111-
}
112-
}
113-
ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args,
114-
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
72+
ProcessUtils.executeProcess(
73+
Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
11574
}
11675
}
11776

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,33 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
1919
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
22-
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
23-
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
22+
import org.apache.spark.deploy.k8s.integrationtest.config._
23+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2424

25-
private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
25+
private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
27+
private val userProvidedDockerImageTag = Option(
28+
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
29+
private val dockerManager = new KubernetesSuiteDockerManager(
30+
Minikube.getDockerEnv, userProvidedDockerImageTag)
2731

2832
override def initialize(): Unit = {
29-
Minikube.startMinikube()
30-
if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) {
31-
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
32-
}
33+
val minikubeStatus = Minikube.getMinikubeStatus
34+
require(minikubeStatus == MinikubeStatus.RUNNING,
35+
s"Minikube must be running before integration tests can execute. Current status" +
36+
s" is: $minikubeStatus")
37+
dockerManager.buildSparkDockerImages()
3338
defaultClient = Minikube.getKubernetesClient
3439
}
3540

36-
override def getKubernetesClient(): DefaultKubernetesClient = {
37-
defaultClient
41+
override def cleanUp(): Unit = {
42+
super.cleanUp()
43+
dockerManager.deleteImages()
3844
}
3945

40-
override def cleanUp(): Unit = {
41-
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
42-
Minikube.deleteMinikube()
43-
}
46+
override def getKubernetesClient(): DefaultKubernetesClient = {
47+
defaultClient
4448
}
4549

46-
override def name(): String = MINIKUBE_TEST_BACKEND
50+
override def dockerImageTag(): String = dockerManager.dockerImageTag()
4751
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
package object config {
20+
val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag"
21+
val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.container.image"
22+
val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.container.image"
23+
val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.container.image"
24+
}

0 commit comments

Comments
 (0)