Skip to content

Commit 2990ec3

Browse files
committed
Merge remote-tracking branch 'origin/reuse-minikube' into HEAD
2 parents 9008b8c + 792476f commit 2990ec3

File tree

12 files changed

+315
-220
lines changed

12 files changed

+315
-220
lines changed

README.md

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@ $ ./e2e/runner.sh -m https://xyz -i test -r https://github.com/my-spark/spark -b
4646

4747
## Running the tests using maven
4848

49+
Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
50+
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
51+
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.
52+
4953
Running the integration tests requires a Spark distribution package tarball that
5054
contains Spark jars, submission clients, etc. You can download a tarball from
5155
http://spark.apache.org/downloads.html. Or, you can create a distribution from
5256
source code using `make-distribution.sh`. For example:
5357

5458
```
55-
$ git clone [email protected]:apache/spark.git
59+
$ git clone [email protected]/apache/spark.git
5660
$ cd spark
5761
$ ./dev/make-distribution.sh --tgz \
5862
-Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver
@@ -82,37 +86,20 @@ In order to run against any cluster, use the following:
8286
```sh
8387
$ mvn clean integration-test \
8488
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
85-
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"
86-
```
87-
88-
## Preserve the Minikube VM
89-
90-
The integration tests make use of
91-
[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual
92-
machine and setup a single-node kubernetes cluster within it. By default the vm
93-
is destroyed after the tests are finished. If you want to preserve the vm, e.g.
94-
to reduce the running time of tests during development, you can pass the
95-
property `spark.docker.test.persistMinikube` to the test process:
96-
97-
```
98-
$ mvn clean integration-test \
99-
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
100-
-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true
101-
```
89+
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master>
10290
10391
## Reuse the previous Docker images
10492
10593
The integration tests build a number of Docker images, which takes some time.
10694
By default, the images are built every time the tests run. You may want to skip
10795
re-building those images during development, if the distribution package did not
10896
change since the last run. You can pass the property
109-
`spark.docker.test.skipBuildImages` to the test process. This will work only if
110-
you have been setting the property `spark.docker.test.persistMinikube`, in the
111-
previous run since the docker daemon run inside the minikube environment. Here
112-
is an example:
97+
`spark.kubernetes.test.imageDockerTag` to the test process and specify the Docker
98+
image tag that is appropriate.
99+
Here is an example:
113100
114101
```
115102
$ mvn clean integration-test \
116103
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
117-
"-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true"
104+
-Dspark.kubernetes.test.imageDockerTag=latest
118105
```

integration-test/pom.xml

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
<slf4j-log4j12.version>1.7.24</slf4j-log4j12.version>
4141
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
4242
<spark-distro-tgz>YOUR-SPARK-DISTRO-TARBALL-HERE</spark-distro-tgz>
43-
<spark-dockerfiles-dir>YOUR-DOCKERFILES-DIR-HERE</spark-dockerfiles-dir>
4443
<test.exclude.tags></test.exclude.tags>
4544
</properties>
4645
<packaging>jar</packaging>
@@ -141,37 +140,6 @@
141140
</execution>
142141
</executions>
143142
</plugin>
144-
<plugin>
145-
<groupId>com.googlecode.maven-download-plugin</groupId>
146-
<artifactId>download-maven-plugin</artifactId>
147-
<version>${download-maven-plugin.version}</version>
148-
<executions>
149-
<execution>
150-
<id>download-minikube-linux</id>
151-
<phase>pre-integration-test</phase>
152-
<goals>
153-
<goal>wget</goal>
154-
</goals>
155-
<configuration>
156-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
157-
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
158-
<outputFileName>minikube</outputFileName>
159-
</configuration>
160-
</execution>
161-
<execution>
162-
<id>download-minikube-darwin</id>
163-
<phase>pre-integration-test</phase>
164-
<goals>
165-
<goal>wget</goal>
166-
</goals>
167-
<configuration>
168-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
169-
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
170-
<outputFileName>minikube</outputFileName>
171-
</configuration>
172-
</execution>
173-
</executions>
174-
</plugin>
175143
<plugin>
176144
<!-- Triggers scalatest plugin in the integration-test phase instead of
177145
the test phase. -->

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
3030
import org.scalatest.time.{Minutes, Seconds, Span}
3131

3232
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
33-
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
33+
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
34+
import org.apache.spark.deploy.k8s.integrationtest.constants._
35+
import org.apache.spark.deploy.k8s.integrationtest.config._
3436

3537
private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
3638

@@ -65,6 +67,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
6567
.set("spark.kubernetes.driver.container.image", driverImage)
6668
.set("spark.kubernetes.executor.container.image", executorImage)
6769
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
70+
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
71+
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
72+
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
6873
.set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL)
6974
kubernetesTestComponents.createNamespace()
7075
remoteExamplesJarUri = SparkExamplesFileServerRunner
@@ -76,15 +81,18 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
7681
}
7782

7883
test("Run SparkPi with no resources") {
84+
doMinikubeCheck
7985
runSparkPiAndVerifyCompletion()
8086
}
8187

8288
test("Run SparkPi with a very long application name.") {
89+
doMinikubeCheck
8390
sparkAppConf.set("spark.app.name", "long" * 40)
8491
runSparkPiAndVerifyCompletion()
8592
}
8693

8794
test("Run SparkPi with a master URL without a scheme.") {
95+
doMinikubeCheck
8896
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
8997
val k8sMasterUrl = if (url.getPort < 0) {
9098
s"k8s://${url.getHost}"
@@ -105,6 +113,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
105113
}
106114

107115
test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") {
116+
doMinikubeCheck
108117
sparkAppConf
109118
.set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi")
110119
.set("spark.kubernetes.driver.label.label1", "label1-value")
@@ -251,6 +260,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
251260
}
252261
}
253262
}
263+
private def doMinikubeCheck(): Unit = {
264+
assume(testBackend == MinikubeTestBackend)
265+
}
266+
private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
254267

255268
private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
256269
assert(driverPod.getSpec.getContainers.get(0).getImage === driverImage)

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkExamplesFileServerRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.http.client.utils.URIBuilder
2626
private[spark] object SparkExamplesFileServerRunner {
2727

2828
private val fileServerImage = System.getProperty(
29-
"spark.docker.test.fileServerImage", "spark-examples-file-server:latest")
29+
"spark.docker.test.fileServerImage", "spark-examples-file-server:latest")
3030
private val fileServerExampleJarsDir = Paths.get("docker-file-server", "jars")
3131
require(
3232
fileServerExampleJarsDir

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ package org.apache.spark.deploy.k8s.integrationtest
1818

1919
import java.io.Closeable
2020
import java.net.URI
21+
import java.io.{IOException, InputStream, OutputStream}
22+
23+
import com.google.common.io.ByteStreams
2124

2225
object Utils extends Logging {
2326

@@ -26,6 +29,32 @@ object Utils extends Logging {
2629
try f.apply(resource) finally resource.close()
2730
}
2831

32+
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
33+
var originalThrowable: Throwable = null
34+
try {
35+
block
36+
} catch {
37+
case t: Throwable =>
38+
// Purposefully not using NonFatal, because even fatal exceptions
39+
// we don't want to have our finallyBlock suppress
40+
originalThrowable = t
41+
throw originalThrowable
42+
} finally {
43+
try {
44+
finallyBlock
45+
} catch {
46+
case t: Throwable =>
47+
if (originalThrowable != null) {
48+
originalThrowable.addSuppressed(t)
49+
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
50+
throw originalThrowable
51+
} else {
52+
throw t
53+
}
54+
}
55+
}
56+
}
57+
2958
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
3059
require(rawMasterURL.startsWith("k8s://"),
3160
"Kubernetes master URL must start with k8s://.")

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.Utils
2222
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
23-
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND
23+
import org.apache.spark.deploy.k8s.integrationtest.config._
2424

2525
private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
@@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
3737
defaultClient
3838
}
3939

40-
override def name(): String = GCE_TEST_BACKEND
40+
override def dockerImageTag(): String = {
41+
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
42+
}
4143
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
2323
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
2424

2525
private[spark] trait IntegrationTestBackend {
26-
def name(): String
2726
def initialize(): Unit
2827
def getKubernetesClient: DefaultKubernetesClient
28+
def dockerImageTag(): String
2929
def cleanUp(): Unit = {}
3030
}
3131

3232
private[spark] object IntegrationTestBackendFactory {
3333
def getTestBackend(): IntegrationTestBackend = {
3434
Option(System.getProperty("spark.kubernetes.test.master"))
3535
.map(new GCETestBackend(_))
36-
.getOrElse(new MinikubeTestBackend())
36+
.getOrElse(MinikubeTestBackend)
3737
}
3838
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,73 +20,37 @@ import java.nio.file.Paths
2020

2121
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2222

23-
import org.apache.commons.lang3.SystemUtils
2423
import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils}
2524

2625
// TODO support windows
2726
private[spark] object Minikube extends Logging {
28-
private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) {
29-
Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile
30-
} else if (SystemUtils.IS_OS_WINDOWS) {
31-
throw new IllegalStateException("Executing Minikube based integration tests not yet " +
32-
" available on Windows.")
33-
} else {
34-
Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile
35-
}
36-
37-
private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " +
38-
s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}"
39-
4027
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
4128

42-
// NOTE: This and the following methods are synchronized to prevent deleteMinikube from
43-
// destroying the minikube VM while other methods try to use the VM.
44-
// Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox.
45-
def startMinikube(): Unit = synchronized {
46-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
47-
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
48-
executeMinikube("start", "--memory", "6000", "--cpus", "8")
49-
} else {
50-
logInfo("Minikube is already started.")
51-
}
52-
}
53-
5429
def getMinikubeIp: String = synchronized {
55-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
5630
val outputs = executeMinikube("ip")
5731
.filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
5832
assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
5933
outputs.head
6034
}
6135

6236
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
63-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
6437
val statusString = executeMinikube("status")
65-
.filter(_.contains("minikube: "))
38+
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
6639
.head
40+
.replaceFirst("minikubeVM: ", "")
6741
.replaceFirst("minikube: ", "")
6842
MinikubeStatus.unapply(statusString)
6943
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
7044
}
7145

7246
def getDockerEnv: Map[String, String] = synchronized {
73-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
7447
executeMinikube("docker-env", "--shell", "bash")
7548
.filter(_.startsWith("export"))
7649
.map(_.replaceFirst("export ", "").split('='))
7750
.map(arr => (arr(0), arr(1).replaceAllLiterally("\"", "")))
7851
.toMap
7952
}
8053

81-
def deleteMinikube(): Unit = synchronized {
82-
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
83-
if (getMinikubeStatus != MinikubeStatus.NONE) {
84-
executeMinikube("delete")
85-
} else {
86-
logInfo("Minikube was already not running.")
87-
}
88-
}
89-
9054
def getKubernetesClient: DefaultKubernetesClient = synchronized {
9155
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
9256
val userHome = System.getProperty("user.home")
@@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging {
10569
}
10670

10771
private def executeMinikube(action: String, args: String*): Seq[String] = {
108-
if (!MINIKUBE_EXECUTABLE_DEST.canExecute) {
109-
if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) {
110-
throw new IllegalStateException("Failed to make the Minikube binary executable.")
111-
}
112-
}
113-
ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args,
114-
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
72+
ProcessUtils.executeProcess(
73+
Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
11574
}
11675
}
11776

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,33 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
1919
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
22-
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
23-
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
22+
import org.apache.spark.deploy.k8s.integrationtest.config._
23+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2424

25-
private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
25+
private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
27+
private val userProvidedDockerImageTag = Option(
28+
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
29+
private val dockerManager = new KubernetesSuiteDockerManager(
30+
Minikube.getDockerEnv, userProvidedDockerImageTag)
2731

2832
override def initialize(): Unit = {
29-
Minikube.startMinikube()
30-
if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) {
31-
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
32-
}
33+
val minikubeStatus = Minikube.getMinikubeStatus
34+
require(minikubeStatus == MinikubeStatus.RUNNING,
35+
s"Minikube must be running before integration tests can execute. Current status" +
36+
s" is: $minikubeStatus")
37+
dockerManager.buildSparkDockerImages()
3338
defaultClient = Minikube.getKubernetesClient
3439
}
3540

36-
override def getKubernetesClient(): DefaultKubernetesClient = {
37-
defaultClient
41+
override def cleanUp(): Unit = {
42+
super.cleanUp()
43+
dockerManager.deleteImages()
3844
}
3945

40-
override def cleanUp(): Unit = {
41-
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
42-
Minikube.deleteMinikube()
43-
}
46+
override def getKubernetesClient(): DefaultKubernetesClient = {
47+
defaultClient
4448
}
4549

46-
override def name(): String = MINIKUBE_TEST_BACKEND
50+
override def dockerImageTag(): String = dockerManager.dockerImageTag()
4751
}

0 commit comments

Comments
 (0)