Skip to content

Commit 3d152c9

Browse files
committed
initial logic needed for porting PR 521
1 parent b3583fb commit 3d152c9

File tree

11 files changed

+343
-172
lines changed

11 files changed

+343
-172
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ is subject to change.
1010

1111
Note that currently the integration tests only run with Java 8.
1212

13+
Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on
14+
your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions
15+
on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster.
16+
1317
Running the integration tests requires a Spark distribution package tarball that
1418
contains Spark jars, submission clients, etc. You can download a tarball from
1519
http://spark.apache.org/downloads.html. Or, you can create a distribution from

integration-test/pom.xml

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
4040
<slf4j-log4j12.version>1.7.24</slf4j-log4j12.version>
4141
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
42+
<spark.version></spark.version>
4243
<spark-distro-tgz>YOUR-SPARK-DISTRO-TARBALL-HERE</spark-distro-tgz>
4344
<spark-dockerfiles-dir>YOUR-DOCKERFILES-DIR-HERE</spark-dockerfiles-dir>
4445
<test.exclude.tags></test.exclude.tags>
@@ -141,37 +142,6 @@
141142
</execution>
142143
</executions>
143144
</plugin>
144-
<plugin>
145-
<groupId>com.googlecode.maven-download-plugin</groupId>
146-
<artifactId>download-maven-plugin</artifactId>
147-
<version>${download-maven-plugin.version}</version>
148-
<executions>
149-
<execution>
150-
<id>download-minikube-linux</id>
151-
<phase>pre-integration-test</phase>
152-
<goals>
153-
<goal>wget</goal>
154-
</goals>
155-
<configuration>
156-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
157-
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
158-
<outputFileName>minikube</outputFileName>
159-
</configuration>
160-
</execution>
161-
<execution>
162-
<id>download-minikube-darwin</id>
163-
<phase>pre-integration-test</phase>
164-
<goals>
165-
<goal>wget</goal>
166-
</goals>
167-
<configuration>
168-
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
169-
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
170-
<outputFileName>minikube</outputFileName>
171-
</configuration>
172-
</execution>
173-
</executions>
174-
</plugin>
175145
<plugin>
176146
<!-- Triggers scalatest plugin in the integration-test phase instead of
177147
the test phase. -->

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
2727
import org.scalatest.time.{Minutes, Seconds, Span}
2828

2929
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory
30-
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
31-
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
30+
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
31+
import org.apache.spark.deploy.k8s.integrationtest.constants._
32+
import org.apache.spark.deploy.k8s.integrationtest.config._
33+
3234

3335
private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
3436

@@ -50,6 +52,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
5052
before {
5153
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
5254
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
55+
.set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init"))
56+
.set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver"))
57+
.set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor"))
5358
kubernetesTestComponents.createNamespace()
5459
}
5560

@@ -58,10 +63,12 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
5863
}
5964

6065
test("Run SparkPi with no resources") {
66+
doMinikubeCheck
6167
runSparkPiAndVerifyCompletion()
6268
}
6369

6470
test("Run SparkPi with a very long application name.") {
71+
doMinikubeCheck
6572
sparkAppConf.set("spark.app.name", "long" * 40)
6673
runSparkPiAndVerifyCompletion()
6774
}
@@ -100,6 +107,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
100107
}
101108
}
102109
}
110+
private def doMinikubeCheck(): Unit = {
111+
assume(testBackend == MinikubeTestBackend)
112+
}
113+
private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}"
103114
}
104115

105116
private[spark] object KubernetesSuite {

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,41 @@ package org.apache.spark.deploy.k8s.integrationtest
1919
import java.io.Closeable
2020
import java.net.URI
2121

22+
import java.io.{IOException,InputStream,OutputStream}
23+
2224
object Utils extends Logging {
2325

2426
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
2527
val resource = createResource
2628
try f.apply(resource) finally resource.close()
2729
}
2830

31+
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
32+
var originalThrowable: Throwable = null
33+
try {
34+
block
35+
} catch {
36+
case t: Throwable =>
37+
// Purposefully not using NonFatal, because even fatal exceptions
38+
// we don't want to have our finallyBlock suppress
39+
originalThrowable = t
40+
throw originalThrowable
41+
} finally {
42+
try {
43+
finallyBlock
44+
} catch {
45+
case t: Throwable =>
46+
if (originalThrowable != null) {
47+
originalThrowable.addSuppressed(t)
48+
logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
49+
throw originalThrowable
50+
} else {
51+
throw t
52+
}
53+
}
54+
}
55+
}
56+
2957
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
3058
require(rawMasterURL.startsWith("k8s://"),
3159
"Kubernetes master URL must start with k8s://.")
@@ -57,4 +85,30 @@ object Utils extends Logging {
5785

5886
s"k8s://$resolvedURL"
5987
}
88+
89+
class RedirectThread(
90+
in: InputStream,
91+
out: OutputStream,
92+
name: String,
93+
propagateEof: Boolean = false) extends Thread(name) {
94+
setDaemon(true)
95+
override def run() {
96+
scala.util.control.Exception.ignoring(classOf[IOException]) {
97+
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
98+
Utils.tryWithSafeFinally {
99+
val buf = new Array[Byte](1024)
100+
var len = in.read(buf)
101+
while (len != -1) {
102+
out.write(buf, 0, len)
103+
out.flush()
104+
len = in.read(buf)
105+
}
106+
} {
107+
if (propagateEof) {
108+
out.close()
109+
}
110+
}
111+
}
112+
}
113+
}
60114
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2020

2121
import org.apache.spark.deploy.k8s.integrationtest.Utils
2222
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
23-
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND
23+
import org.apache.spark.deploy.k8s.integrationtest.config._
2424

2525
private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
2626
private var defaultClient: DefaultKubernetesClient = _
@@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB
3737
defaultClient
3838
}
3939

40-
override def name(): String = GCE_TEST_BACKEND
40+
override def dockerImageTag(): String = {
41+
return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest")
42+
}
4143
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,19 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
2121

2222
import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
2323
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
24+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2425

2526
private[spark] trait IntegrationTestBackend {
26-
def name(): String
2727
def initialize(): Unit
2828
def getKubernetesClient(): DefaultKubernetesClient
29+
def dockerImageTag(): String
2930
def cleanUp(): Unit = {}
3031
}
3132

3233
private[spark] object IntegrationTestBackendFactory {
3334
def getTestBackend(): IntegrationTestBackend = {
3435
Option(System.getProperty("spark.kubernetes.test.master"))
3536
.map(new GCETestBackend(_))
36-
.getOrElse(new MinikubeTestBackend())
37+
.getOrElse(MinikubeTestBackend)
3738
}
3839
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,73 +20,37 @@ import java.nio.file.Paths
2020

2121
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
2222

23-
import org.apache.commons.lang3.SystemUtils
2423
import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils}
2524

2625
// TODO support windows
2726
private[spark] object Minikube extends Logging {
28-
private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) {
29-
Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile
30-
} else if (SystemUtils.IS_OS_WINDOWS) {
31-
throw new IllegalStateException("Executing Minikube based integration tests not yet " +
32-
" available on Windows.")
33-
} else {
34-
Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile
35-
}
36-
37-
private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " +
38-
s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}"
39-
4027
private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
4128

42-
// NOTE: This and the following methods are synchronized to prevent deleteMinikube from
43-
// destroying the minikube VM while other methods try to use the VM.
44-
// Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox.
45-
def startMinikube(): Unit = synchronized {
46-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
47-
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
48-
executeMinikube("start", "--memory", "6000", "--cpus", "8")
49-
} else {
50-
logInfo("Minikube is already started.")
51-
}
52-
}
53-
5429
def getMinikubeIp: String = synchronized {
55-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
5630
val outputs = executeMinikube("ip")
5731
.filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
5832
assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
5933
outputs.head
6034
}
6135

6236
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
63-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
6437
val statusString = executeMinikube("status")
65-
.filter(_.contains("minikube: "))
38+
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
6639
.head
40+
.replaceFirst("minikubeVM: ", "")
6741
.replaceFirst("minikube: ", "")
6842
MinikubeStatus.unapply(statusString)
6943
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
7044
}
7145

7246
def getDockerEnv: Map[String, String] = synchronized {
73-
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
7447
executeMinikube("docker-env", "--shell", "bash")
7548
.filter(_.startsWith("export"))
7649
.map(_.replaceFirst("export ", "").split('='))
7750
.map(arr => (arr(0), arr(1).replaceAllLiterally("\"", "")))
7851
.toMap
7952
}
8053

81-
def deleteMinikube(): Unit = synchronized {
82-
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
83-
if (getMinikubeStatus != MinikubeStatus.NONE) {
84-
executeMinikube("delete")
85-
} else {
86-
logInfo("Minikube was already not running.")
87-
}
88-
}
89-
9054
def getKubernetesClient: DefaultKubernetesClient = synchronized {
9155
val kubernetesMaster = s"https://${getMinikubeIp}:8443"
9256
val userHome = System.getProperty("user.home")
@@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging {
10569
}
10670

10771
private def executeMinikube(action: String, args: String*): Seq[String] = {
108-
if (!MINIKUBE_EXECUTABLE_DEST.canExecute) {
109-
if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) {
110-
throw new IllegalStateException("Failed to make the Minikube binary executable.")
111-
}
112-
}
113-
ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args,
114-
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
72+
ProcessUtils.executeProcess(
73+
Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
11574
}
11675
}
11776

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,43 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
1818

19+
import java.util.UUID
20+
1921
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2022

2123
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
22-
import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND
23-
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder
24+
import org.apache.spark.deploy.k8s.integrationtest.config._
25+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2426

25-
private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
27+
private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
2628
private var defaultClient: DefaultKubernetesClient = _
27-
29+
private val userProvidedDockerImageTag = Option(
30+
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
31+
private val resolvedDockerImageTag =
32+
userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
33+
private val dockerManager = new KubernetesSuiteDockerManager(
34+
Minikube.getDockerEnv, resolvedDockerImageTag)
2835
override def initialize(): Unit = {
29-
Minikube.startMinikube()
30-
if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) {
31-
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
36+
val minikubeStatus = Minikube.getMinikubeStatus
37+
require(minikubeStatus == MinikubeStatus.RUNNING,
38+
s"Minikube must be running before integration tests can execute. Current status" +
39+
s" is: $minikubeStatus")
40+
if (userProvidedDockerImageTag.isEmpty) {
41+
dockerManager.buildSparkDockerImages()
3242
}
3343
defaultClient = Minikube.getKubernetesClient
3444
}
3545

36-
override def getKubernetesClient(): DefaultKubernetesClient = {
37-
defaultClient
38-
}
3946

4047
override def cleanUp(): Unit = {
41-
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
42-
Minikube.deleteMinikube()
48+
super.cleanUp()
49+
if (userProvidedDockerImageTag.isEmpty) {
50+
dockerManager.deleteImages()
4351
}
4452
}
4553

46-
override def name(): String = MINIKUBE_TEST_BACKEND
54+
override def getKubernetesClient(): DefaultKubernetesClient = {
55+
defaultClient
56+
}
57+
override def dockerImageTag(): String = resolvedDockerImageTag
4758
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
package object config {
20+
val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag"
21+
val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.docker.image"
22+
val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.docker.image"
23+
val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.docker.image"
24+
}

0 commit comments

Comments
 (0)