Skip to content

Add initial integration test code #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
<spark-distro-tgz>YOUR-SPARK-DISTRO-TARBALL-HERE</spark-distro-tgz>
<spark-dockerfiles-dir>YOUR-DOCKERFILES-DIR-HERE</spark-dockerfiles-dir>
<test.exclude.tags></test.exclude.tags>
</properties>
<packaging>jar</packaging>
<name>Spark Project Kubernetes Integration Tests</name>
Expand Down Expand Up @@ -85,6 +86,12 @@
<version>2.2.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.24</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -118,7 +125,7 @@
<executable>/bin/sh</executable>
<arguments>
<argument>-c</argument>
<argument>mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp</argument>
<argument>rm -rf spark-distro; mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp</argument>
</arguments>
</configuration>
</execution>
Expand All @@ -139,7 +146,7 @@
</configuration>
</execution>
<execution>
<!-- TODO: Remove this hack once the upstream is fixed -->
<!-- TODO: Remove this hack once upstream is fixed by SPARK-22777 -->
<id>set-exec-bit-on-docker-entrypoint-sh</id>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just set it in the Dockerfile using: RUN chmod +x /opt/entrypoint.sh.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's better to be done by the upstream code. It's hard for this integration code to surgically do in-place edit of Dockerfile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR has merged now.

<phase>pre-integration-test</phase>
<goals>
Expand Down Expand Up @@ -192,6 +199,19 @@
the test phase. -->
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
<argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=512m ${extraScalaTestArgs}</argLine>
<stderr/>
<systemProperties>
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
<java.awt.headless>true</java.awt.headless>
</systemProperties>
<tagsToExclude>${test.exclude.tags}</tagsToExclude>
</configuration>
<executions>
<execution>
<id>test</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.file.Paths
import java.util.UUID
import java.util.regex.Pattern

import com.google.common.io.{Files, PatternFilenameFilter}
import com.google.common.io.PatternFilenameFilter
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
Expand All @@ -31,6 +31,7 @@ import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKE
import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH

private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {

import KubernetesSuite._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an empty line before the import.

private val testBackend = IntegrationTestBackendFactory.getTestBackend()
private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
Expand All @@ -48,8 +49,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit

before {
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
.set("spark.kubernetes.initcontainer.docker.image", "spark-init:latest")
.set("spark.kubernetes.driver.docker.image", "spark-driver:latest")
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
kubernetesTestComponents.createNamespace()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.nio.file.Paths
import java.util.UUID

import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually
import scala.collection.mutable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala packages should be in a group after java packages.

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually

import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH

private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
Expand Down Expand Up @@ -56,6 +57,8 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
new SparkAppConf()
.set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
.set("spark.kubernetes.namespace", namespace)
// TODO: apache/spark#19995 is changing docker.image to container.image in these properties.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: the PR has been merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'll have to update my distro, make this change and test it again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done by commit 989a371.

// Update them once the PR is merged.
.set("spark.kubernetes.driver.docker.image",
System.getProperty("spark.docker.test.driverImage", "spark-driver:latest"))
.set("spark.kubernetes.executor.docker.image",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import java.io.{BufferedReader, InputStreamReader}
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ArrayBuffer
import scala.io.Source

object ProcessUtils extends Logging {
/**
Expand All @@ -32,17 +32,13 @@ object ProcessUtils extends Logging {
val proc = pb.start()
val outputLines = new ArrayBuffer[String]

Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput =>
Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) =>
var line: String = null
do {
line = bufferedOutput.readLine()
if (line != null) {
logInfo(line)
outputLines += line
}
} while (line != null)
Utils.tryWithResource(Source.fromInputStream(proc.getInputStream, "UTF-8")) { output =>
for (line <- output.getLines) {
logInfo(line)
outputLines += line
}
}{
output => output.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is calling close necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is because output is not a subclass of Closeable. See the extra version of Utils.tryWithResource that I added for details.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems Source is Closeable, https://www.scala-lang.org/api/current/scala/io/Source.html. So you don't need to call close explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that is interesting. I think it became Closeable in scala version 2.12. Source in scala 2.11 is not Closeable. From https://github.com/scala/scala/blob/v2.11.8/src/library/scala/io/Source.scala#L190:

abstract class Source extends Iterator[Char] {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually managed to get rid of the close call by using the input stream as resource. PTAL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

}
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
s"Timed out while executing ${fullCommand.mkString(" ")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,49 @@
package org.apache.spark.deploy.k8s.integrationtest

import java.io.Closeable
import java.net.URI

object Utils {
object Utils extends Logging {

def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
}

def tryWithResource[R, T](createResource: => R)(f: R => T)(closeResource: R => T): T = {
val resource = createResource
try f.apply(resource) finally closeResource(resource)
}

def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
require(rawMasterURL.startsWith("k8s://"),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)

// To handle master URLs, e.g., k8s://host:port.
if (!masterWithoutK8sPrefix.contains("://")) {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s://$resolvedURL"
}

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
val resolvedURL = masterScheme.toLowerCase match {
case "https" =>
masterWithoutK8sPrefix
case "http" =>
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
masterWithoutK8sPrefix
case null =>
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
resolvedURL
case _ =>
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}

s"k8s://$resolvedURL"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.GCE

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}

import org.apache.spark.deploy.k8s.integrationtest.Utils
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND

private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _

override def initialize(): Unit = {
var k8ConfBuilder = new ConfigBuilder()
val k8sConf = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master.replaceFirst("k8s://", ""))
defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build)
.withMasterUrl(Utils.checkAndGetK8sMasterUrl(master).replaceFirst("k8s://", ""))
.build()
defaultClient = new DefaultKubernetesClient(k8sConf)
}

override def getKubernetesClient(): DefaultKubernetesClient = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ private[spark] object Minikube extends Logging {

private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60

// NOTE: This and the following methods are synchronized to prevent deleteMinikube from
// destroying the minikube VM while other methods try to use the VM.
// Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox.
def startMinikube(): Unit = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are this and the following methods synchronized ? executeMinikube cannot be called concurrently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah would know better. I guess we want to prevent deleteMinikube from destroying the minikube vm while other methods try to use the vm. Maybe such a race condition can corrupt the vm or some vm provisioning tools like VirtualBox?

Does this explanation make sense? If yes, I think we can leave a NOTE.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me.

assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ private[spark] class SparkDockerImageBuilder

private val originalDockerUri = URI.create(dockerHost)
private val httpsDockerUri = new URIBuilder()
.setHost(originalDockerUri.getHost)
.setPort(originalDockerUri.getPort)
.setScheme("https")
.build()
.setHost(originalDockerUri.getHost)
.setPort(originalDockerUri.getPort)
.setScheme("https")
.build()

private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH",
throw new IllegalStateException("DOCKER_CERT_PATH env not found."))

private val dockerClient = new DefaultDockerClient.Builder()
.uri(httpsDockerUri)
.dockerCertificates(DockerCertificates
.builder()
.dockerCertPath(Paths.get(dockerCerts))
.build().get())
.dockerCertificates(DockerCertificates.builder()
.dockerCertPath(Paths.get(dockerCerts))
.build()
.get())
.build()

def buildSparkDockerImages(): Unit = {
Expand Down