Skip to content

Commit 6c32d8f

Browse files
committed
Merge remote-tracking branch 'ifilonenko/master' into HEAD
2 parents 047b2a2 + 3a2b4d8 commit 6c32d8f

File tree

10 files changed

+138
-119
lines changed

10 files changed

+138
-119
lines changed

README.md

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,17 @@ $ ./e2e/runner.sh -m https://xyz -i test -r https://github.com/my-spark/spark -d
4646

4747
## Running the tests using maven
4848

49+
Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
50+
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
51+
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.
52+
4953
Running the integration tests requires a Spark distribution package tarball that
5054
contains Spark jars, submission clients, etc. You can download a tarball from
5155
http://spark.apache.org/downloads.html. Or, you can create a distribution from
5256
source code using `make-distribution.sh`. For example:
5357

5458
```
55-
$ git clone git@github.com:apache/spark.git
59+
$ https://github.com/apache/spark.git
5660
$ cd spark
5761
$ ./dev/make-distribution.sh --tgz \
5862
-Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver
@@ -82,37 +86,20 @@ In order to run against any cluster, use the following:
8286
```sh
8387
$ mvn clean integration-test \
8488
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
85-
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"
86-
```
87-
88-
## Preserve the Minikube VM
89-
90-
The integration tests make use of
91-
[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual
92-
machine and setup a single-node kubernetes cluster within it. By default the vm
93-
is destroyed after the tests are finished. If you want to preserve the vm, e.g.
94-
to reduce the running time of tests during development, you can pass the
95-
property `spark.docker.test.persistMinikube` to the test process:
96-
97-
```
98-
$ mvn clean integration-test \
99-
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
100-
-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true
101-
```
89+
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master>
10290
10391
## Reuse the previous Docker images
10492
10593
The integration tests build a number of Docker images, which takes some time.
10694
By default, the images are built every time the tests run. You may want to skip
10795
re-building those images during development, if the distribution package did not
10896
change since the last run. You can pass the property
109-
`spark.docker.test.skipBuildImages` to the test process. This will work only if
110-
you have been setting the property `spark.docker.test.persistMinikube`, in the
111-
previous run since the docker daemon run inside the minikube environment. Here
112-
is an example:
97+
`spark.kubernetes.test.imageDockerTag` to the test process and specify the Docker
98+
image tag that is appropriate.
99+
Here is an example:
113100
114101
```
115102
$ mvn clean integration-test \
116103
-Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \
117-
"-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true"
104+
-Dspark.kubernetes.test.imageDockerTag=latest
118105
```

integration-test/pom.xml

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
<slf4j-log4j12.version>1.7.24</slf4j-log4j12.version>
4141
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
4242
<spark-distro-tgz>YOUR-SPARK-DISTRO-TARBALL-HERE</spark-distro-tgz>
43-
<spark-dockerfiles-dir>YOUR-DOCKERFILES-DIR-HERE</spark-dockerfiles-dir>
4443
<test.exclude.tags></test.exclude.tags>
4544
</properties>
4645
<packaging>jar</packaging>
@@ -141,37 +140,6 @@
141140
</execution>
142141
</executions>
143142
</plugin>
144-
<plugin>
145-
<groupId>com.googlecode.maven-download-plugin</groupId>
146-
<artifactId>download-maven-plugin</artifactId>
147-
<version>${download-maven-plugin.version}</version>
148-
<executions>
149-
<execution>
150-
<id>download-minikube-linux</id>
151-
<phase>pre-integration-test</phase>
152-
<goals>
153-
<goal>wget</goal>
154-
</goals>
155-
<configuration>
156-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
157-
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
158-
<outputFileName>minikube</outputFileName>
159-
</configuration>
160-
</execution>
161-
<execution>
162-
<id>download-minikube-darwin</id>
163-
<phase>pre-integration-test</phase>
164-
<goals>
165-
<goal>wget</goal>
166-
</goals>
167-
<configuration>
168-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
169-
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
170-
<outputFileName>minikube</outputFileName>
171-
</configuration>
172-
</execution>
173-
</executions>
174-
</plugin>
175143
<plugin>
176144
<!-- Triggers scalatest plugin in the integration-test phase instead of
177145
the test phase. -->

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
3030
import org.scalatest.time.{Minutes, Seconds, Span}
3131

3232
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
33-
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
33+
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
34+
import org.apache.spark.deploy.k8s.integrationtest.constants._
35+
import org.apache.spark.deploy.k8s.integrationtest.config._
3436

3537
private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
3638

@@ -59,6 +61,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
5961
.set("spark.kubernetes.driver.container.image", driverImage)
6062
.set("spark.kubernetes.executor.container.image", executorImage)
6163
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
64+
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
65+
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
66+
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
6267
.set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL)
6368
kubernetesTestComponents.createNamespace()
6469
}
@@ -68,15 +73,18 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
6873
}
6974

7075
test("Run SparkPi with no resources") {
76+
doMinikubeCheck
7177
runSparkPiAndVerifyCompletion()
7278
}
7379

7480
test("Run SparkPi with a very long application name.") {
81+
doMinikubeCheck
7582
sparkAppConf.set("spark.app.name", "long" * 40)
7683
runSparkPiAndVerifyCompletion()
7784
}
7885

7986
test("Run SparkPi with a master URL without a scheme.") {
87+
doMinikubeCheck
8088
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
8189
val k8sMasterUrl = if (url.getPort < 0) {
8290
s"k8s://${url.getHost}"
@@ -97,6 +105,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
97105
}
98106

99107
test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") {
108+
doMinikubeCheck
100109
sparkAppConf
101110
.set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi")
102111
.set("spark.kubernetes.driver.label.label1", "label1-value")
@@ -217,6 +226,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
217226
}
218227
}
219228
}
229+
private def doMinikubeCheck(): Unit = {
230+
assume(testBackend == MinikubeTestBackend)
231+
}
232+
private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
220233

221234
private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
222235
assert(driverPod.getSpec.getContainers.get(0).getImage === driverImage)

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,41 @@ package org.apache.spark.deploy.k8s.integrationtest
1919
import java.io.Closeable
2020
import java.net.URI
2121

22+
import java.io.{IOException,InputStream,OutputStream}
23+
2224
object Utils extends Logging {
2325

2426
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
2527
val resource = createResource
2628
try f.apply(resource) finally resource.close()
2729
}
2830

31+
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
32+
var originalThrowable: Throwable = null
33+
try {
34+
block
35+
} catch {
36+
case t: Throwable =>
37+
// Purposefully not using NonFatal, because even fatal exceptions
38+
// we don't want to have our finallyBlock suppress
39+
originalThrowable = t
40+
throw originalThrowable
41+
} finally {
42+
try {
43+
finallyBlock
44+
} catch {
45+
case t: Throwable =>
46+
if (originalThrowable != null) {
47+
originalThrowable.addSuppressed(t)
48+
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
49+
throw originalThrowable
50+
} else {
51+
throw t
52+
}
53+
}
54+
}
55+
}
56+
2957
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
3058
require(rawMasterURL.startsWith("k8s://"),
3159
"Kubernetes master URL must start with k8s://.")
@@ -57,4 +85,30 @@ object Utils extends Logging {
5785

5886
s"k8s://$resolvedURL"
5987
}
88+
89+
class RedirectThread(
90+
in: InputStream,
91+
out: OutputStream,
92+
name: String,
93+
propagateEof: Boolean = false) extends Thread(name) {
94+
setDaemon(true)
95+
override def run() {
96+
scala.util.control.Exception.ignoring(classOf[IOException]) {
97+
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
98+
Utils.tryWithSafeFinally {
99+
val buf = new Array[Byte](1024)
100+
var len = in.read(buf)
101+
while (len != -1) {
102+
out.write(buf, 0, len)
103+
out.flush()
104+
len = in.read(buf)
105+
}
106+
} {
107+
if (propagateEof) {
108+
out.close()
109+
}
110+
}
111+
}
112+
}
113+
}
60114
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.Utils
2222
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
23-
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND
23+
import org.apache.spark.deploy.k8s.integrationtest.config._
2424

2525
private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
@@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
3737
defaultClient
3838
}
3939

40-
override def name(): String = GCE_TEST_BACKEND
40+
override def dockerImageTag(): String = {
41+
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
42+
}
4143
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
2323
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
2424

2525
private[spark] trait IntegrationTestBackend {
26-
def name(): String
2726
def initialize(): Unit
2827
def getKubernetesClient: DefaultKubernetesClient
28+
def dockerImageTag(): String
2929
def cleanUp(): Unit = {}
3030
}
3131

3232
private[spark] object IntegrationTestBackendFactory {
3333
def getTestBackend(): IntegrationTestBackend = {
3434
Option(System.getProperty("spark.kubernetes.test.master"))
3535
.map(new GCETestBackend(_))
36-
.getOrElse(new MinikubeTestBackend())
36+
.getOrElse(MinikubeTestBackend)
3737
}
3838
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,73 +20,37 @@ import java.nio.file.Paths
2020

2121
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2222

23-
import org.apache.commons.lang3.SystemUtils
2423
import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils}
2524

2625
// TODO support windows
2726
private[spark] object Minikube extends Logging {
28-
private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) {
29-
Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile
30-
} else if (SystemUtils.IS_OS_WINDOWS) {
31-
throw new IllegalStateException("Executing Minikube based integration tests not yet " +
32-
" available on Windows.")
33-
} else {
34-
Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile
35-
}
36-
37-
private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " +
38-
s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}"
39-
4027
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
4128

42-
// NOTE: This and the following methods are synchronized to prevent deleteMinikube from
43-
// destroying the minikube VM while other methods try to use the VM.
44-
// Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox.
45-
def startMinikube(): Unit = synchronized {
46-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
47-
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
48-
executeMinikube("start", "--memory", "6000", "--cpus", "8")
49-
} else {
50-
logInfo("Minikube is already started.")
51-
}
52-
}
53-
5429
def getMinikubeIp: String = synchronized {
55-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
5630
val outputs = executeMinikube("ip")
5731
.filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
5832
assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
5933
outputs.head
6034
}
6135

6236
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
63-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
6437
val statusString = executeMinikube("status")
65-
.filter(_.contains("minikube: "))
38+
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
6639
.head
40+
.replaceFirst("minikubeVM: ", "")
6741
.replaceFirst("minikube: ", "")
6842
MinikubeStatus.unapply(statusString)
6943
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
7044
}
7145

7246
def getDockerEnv: Map[String, String] = synchronized {
73-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
7447
executeMinikube("docker-env", "--shell", "bash")
7548
.filter(_.startsWith("export"))
7649
.map(_.replaceFirst("export ", "").split('='))
7750
.map(arr => (arr(0), arr(1).replaceAllLiterally("\"", "")))
7851
.toMap
7952
}
8053

81-
def deleteMinikube(): Unit = synchronized {
82-
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
83-
if (getMinikubeStatus != MinikubeStatus.NONE) {
84-
executeMinikube("delete")
85-
} else {
86-
logInfo("Minikube was already not running.")
87-
}
88-
}
89-
9054
def getKubernetesClient: DefaultKubernetesClient = synchronized {
9155
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
9256
val userHome = System.getProperty("user.home")
@@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging {
10569
}
10670

10771
private def executeMinikube(action: String, args: String*): Seq[String] = {
108-
if (!MINIKUBE_EXECUTABLE_DEST.canExecute) {
109-
if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) {
110-
throw new IllegalStateException("Failed to make the Minikube binary executable.")
111-
}
112-
}
113-
ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args,
114-
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
72+
ProcessUtils.executeProcess(
73+
Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
11574
}
11675
}
11776

0 commit comments

Comments
 (0)