diff --git a/README.md b/README.md index 19b41ca..c2f507d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,90 @@ -# spark-integration -Integration tests for Spark +--- +layout: global +title: Spark on Kubernetes Integration Tests +--- + +# Running the Kubernetes Integration Tests + +Note that the integration test framework is currently being heavily revised and +is subject to change. + +Note that currently the integration tests only run with Java 8. + +Running the integration tests requires a Spark distribution package tarball that +contains Spark jars, submission clients, etc. You can download a tarball from +http://spark.apache.org/downloads.html. Or, you can create a distribution from +source code using `make-distribution.sh`. For example: + +``` +$ git clone git@github.com:apache/spark.git +$ cd spark +$ ./dev/make-distribution.sh --tgz \ + -Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver +``` + +The above command will create a tarball like spark-2.3.0-SNAPSHOT-bin.tgz in the +top-level dir. For more details, see the related section in +[building-spark.md](https://github.com/apache/spark/blob/master/docs/building-spark.md#building-a-runnable-distribution) + + +The integration tests also need a local path to the directory that +contains `Dockerfile`s. In the main spark repo, the path is +`/spark/resource-managers/kubernetes/docker/src/main/dockerfiles`. + +Once you prepare the inputs, the integration tests can be executed with Maven or +your IDE. Note that when running tests from an IDE, the `pre-integration-test` +phase must be run every time the Spark main code changes. When running tests +from the command line, the `pre-integration-test` phase should automatically be +invoked if the `integration-test` phase is run. + +With Maven, the integration test can be run using the following command: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles +``` + +# Running against an arbitrary cluster + +In order to run against any cluster, use the following: +```sh +$ mvn clean integration-test \ + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" +``` + +# Preserve the Minikube VM + +The integration tests make use of +[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual +machine and setup a single-node kubernetes cluster within it. By default the vm +is destroyed after the tests are finished. If you want to preserve the vm, e.g. +to reduce the running time of tests during development, you can pass the +property `spark.docker.test.persistMinikube` to the test process: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true +``` + +# Reuse the previous Docker images + +The integration tests build a number of Docker images, which takes some time. +By default, the images are built every time the tests run. You may want to skip +re-building those images during development, if the distribution package did not +change since the last run. You can pass the property +`spark.docker.test.skipBuildImages` to the test process. This will work only if +you have been setting the property `spark.docker.test.persistMinikube`, in the +previous run since the docker daemon run inside the minikube environment. Here +is an example: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles + "-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true" +``` diff --git a/integration-test/pom.xml b/integration-test/pom.xml new file mode 100644 index 0000000..0440203 --- /dev/null +++ b/integration-test/pom.xml @@ -0,0 +1,250 @@ + + + + 4.0.0 + + spark-kubernetes-integration-tests_2.11 + spark-kubernetes-integration-tests + 0.1-SNAPSHOT + + 3.5 + 1.1.1 + 5.0.2 + 1.3.0 + 1.4.0 + + 18.0 + 1.3.9 + 3.0.0 + 1.2.17 + 2.11.8 + 2.11 + 3.2.2 + 2.2.6 + 1.0 + 1.7.24 + kubernetes-integration-tests + YOUR-SPARK-DISTRO-TARBALL-HERE + YOUR-DOCKERFILES-DIR-HERE + + + jar + Spark Project Kubernetes Integration Tests + + + + commons-logging + commons-logging + ${commons-logging.version} + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + com.google.guava + guava + test + + ${guava.version} + + + com.spotify + docker-client + ${docker-client.version} + test + + + io.fabric8 + kubernetes-client + ${kubernetes-client.version} + + + log4j + log4j + ${log4j.version} + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + org.scala-lang + scala-library + ${scala.version} + + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + + org.slf4j + slf4j-log4j12 + ${slf4j-log4j12.version} + test + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + + compile + testCompile + + + + + + org.codehaus.mojo + exec-maven-plugin + ${exec-maven-plugin.version} + + + unpack-spark-distro + pre-integration-test + + exec + + + ${project.build.directory} + /bin/sh + + -c + rm -rf spark-distro; mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp + + + + + + copy-dockerfiles-if-missing + pre-integration-test + + exec + + + ${project.build.directory}/spark-distro + /bin/sh + + -c + test -d dockerfiles || cp -pr ${spark-dockerfiles-dir} dockerfiles + + + + + + set-exec-bit-on-docker-entrypoint-sh + pre-integration-test + + exec + + + ${project.build.directory}/spark-distro/dockerfiles + /bin/chmod + + +x + spark-base/entrypoint.sh + + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + ${download-maven-plugin.version} + + + download-minikube-linux + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64 + ${project.build.directory}/minikube-bin/linux-amd64 + minikube + + + + download-minikube-darwin + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64 + ${project.build.directory}/minikube-bin/darwin-amd64 + minikube + + + + + + + org.scalatest + scalatest-maven-plugin + ${scalatest-maven-plugin.version} + + ${project.build.directory}/surefire-reports + . + SparkTestSuite.txt + -ea -Xmx3g -XX:ReservedCodeCacheSize=512m ${extraScalaTestArgs} + + + file:src/test/resources/log4j.properties + true + + ${test.exclude.tags} + + + + test + + test + + + + (?<!Suite) + + + + integration-test + integration-test + + test + + + + + + + + + diff --git a/integration-test/src/test/resources/log4j.properties b/integration-test/src/test/resources/log4j.properties new file mode 100644 index 0000000..866126b --- /dev/null +++ b/integration-test/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/integration-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/integration-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala new file mode 100644 index 0000000..03ea99c --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File +import java.nio.file.Paths +import java.util.UUID +import java.util.regex.Pattern + +import com.google.common.io.PatternFilenameFilter +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory +import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH + +private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { + + import KubernetesSuite._ + private val testBackend = IntegrationTestBackendFactory.getTestBackend() + private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") + private var kubernetesTestComponents: KubernetesTestComponents = _ + private var sparkAppConf: SparkAppConf = _ + + override def beforeAll(): Unit = { + testBackend.initialize() + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) + } + + override def afterAll(): Unit = { + testBackend.cleanUp() + } + + before { + sparkAppConf = kubernetesTestComponents.newSparkAppConf() + .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) + kubernetesTestComponents.createNamespace() + } + + after { + kubernetesTestComponents.deleteNamespace() + } + + test("Run SparkPi with no resources") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a very long application name.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkAppConf.set("spark.app.name", "long" * 40) + runSparkPiAndVerifyCompletion() + } + + private def runSparkPiAndVerifyCompletion( + appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_PI_MAIN_CLASS, + Seq("Pi is roughly 3"), + Array.empty[String]) + } + + private def runSparkApplicationAndVerifyCompletion( + appResource: String, + mainClass: String, + expectedLogOnCompletion: Seq[String], + appArgs: Array[String]): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = appResource, + mainClass = mainClass) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .list() + .getItems + .get(0) + Eventually.eventually(TIMEOUT, INTERVAL) { + expectedLogOnCompletion.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } + } + } +} + +private[spark] object KubernetesSuite { + + val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val SPARK_DISTRO_EXAMPLES_JAR_FILE: File = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath, + "examples", "jars") + .toFile + .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) + val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/jars/" + + s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}" + val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" + + case object ShuffleNotReadyException extends Exception +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala new file mode 100644 index 0000000..f1852ba --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.nio.file.Paths +import java.util.UUID + +import scala.collection.mutable +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.scalatest.concurrent.Eventually + +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH + +private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { + + val namespace = UUID.randomUUID().toString.replaceAll("-", "") + val kubernetesClient = defaultClient.inNamespace(namespace) + val clientConfig = kubernetesClient.getConfiguration + + def createNamespace(): Unit = { + defaultClient.namespaces.createNew() + .withNewMetadata() + .withName(namespace) + .endMetadata() + .done() + } + + def deleteNamespace(): Unit = { + defaultClient.namespaces.withName(namespace).delete() + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val namespaceList = defaultClient + .namespaces() + .list() + .getItems() + .asScala + require(!namespaceList.exists(_.getMetadata.getName == namespace)) + } + } + + def newSparkAppConf(): SparkAppConf = { + new SparkAppConf() + .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") + .set("spark.kubernetes.namespace", namespace) + .set("spark.kubernetes.driver.container.image", + System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) + .set("spark.kubernetes.executor.container.image", + System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-test-app") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set("spark.kubernetes.submission.waitAppCompletion", "false") + } +} + +private[spark] class SparkAppConf { + + private val map = mutable.Map[String, String]() + + def set(key:String, value: String): SparkAppConf = { + map.put(key, value) + this + } + + def get(key: String): String = map.getOrElse(key, "") + + def setJars(jars: Seq[String]) = set("spark.jars", jars.mkString(",")) + + override def toString: String = map.toString + + def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}")) +} + +private[spark] case class SparkAppArguments( + mainAppResource: String, + mainClass: String) + +private[spark] object SparkAppLauncher extends Logging { + + private val SPARK_SUBMIT_EXECUTABLE_DEST = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath, + "bin", "spark-submit").toFile + + def launch(appArguments: SparkAppArguments, appConf: SparkAppConf, timeoutSecs: Int): Unit = { + logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") + val commandLine = Array(SPARK_SUBMIT_EXECUTABLE_DEST.getAbsolutePath, + "--deploy-mode", "cluster", + "--class", appArguments.mainClass, + "--master", appConf.get("spark.master") + ) ++ appConf.toStringArray :+ appArguments.mainAppResource + logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") + ProcessUtils.executeProcess(commandLine, timeoutSecs) + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala new file mode 100644 index 0000000..459c0a4 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.log4j.{Logger, LogManager, Priority} + +trait Logging { + + private val log: Logger = LogManager.getLogger(this.getClass) + + protected def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg) + + protected def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg) + + protected def logWarning(msg: => String) = if (log.isEnabledFor(Priority.WARN)) log.warn(msg) + + protected def logWarning(msg: => String, throwable: Throwable) = + if (log.isEnabledFor(Priority.WARN)) log.warn(msg, throwable) + + protected def logError(msg: => String) = if (log.isEnabledFor(Priority.ERROR)) log.error(msg) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala new file mode 100644 index 0000000..e9f143c --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +object ProcessUtils extends Logging { + /** + * executeProcess is used to run a command and return the output if it + * completes within timeout seconds. + */ + def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + Utils.tryWithResource(proc.getInputStream)( + Source.fromInputStream(_, "UTF-8").getLines().foreach { line => + logInfo(line) + outputLines += line + }) + assert(proc.waitFor(timeout, TimeUnit.SECONDS), + s"Timed out while executing ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + outputLines.toSeq + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala new file mode 100644 index 0000000..f1fd6dc --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] { + + private val signal = SettableFuture.create[Boolean] + + override def eventReceived(action: Action, resource: T): Unit = { + if ((action == Action.MODIFIED || action == Action.ADDED) && + Readiness.isReady(resource)) { + signal.set(true) + } + } + + override def onClose(cause: KubernetesClientException): Unit = {} + + def waitUntilReady(): Boolean = signal.get(60, TimeUnit.SECONDS) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala new file mode 100644 index 0000000..911b3a9 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.Closeable +import java.net.URI + +object Utils extends Logging { + + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { + val resource = createResource + try f.apply(resource) finally resource.close() + } + + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { + require(rawMasterURL.startsWith("k8s://"), + "Kubernetes master URL must start with k8s://.") + val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) + + // To handle master URLs, e.g., k8s://host:port. + if (!masterWithoutK8sPrefix.contains("://")) { + val resolvedURL = s"https://$masterWithoutK8sPrefix" + logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + + s"URL is $resolvedURL.") + return s"k8s://$resolvedURL" + } + + val masterScheme = new URI(masterWithoutK8sPrefix).getScheme + val resolvedURL = masterScheme.toLowerCase match { + case "https" => + masterWithoutK8sPrefix + case "http" => + logWarning("Kubernetes master URL uses HTTP instead of HTTPS.") + masterWithoutK8sPrefix + case null => + val resolvedURL = s"https://$masterWithoutK8sPrefix" + logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + + s"URL is $resolvedURL.") + resolvedURL + case _ => + throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme) + } + + s"k8s://$resolvedURL" + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala new file mode 100644 index 0000000..cbb98fa --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.GCE + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.k8s.integrationtest.Utils +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND + +private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + val k8sConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(Utils.checkAndGetK8sMasterUrl(master).replaceFirst("k8s://", "")) + .build() + defaultClient = new DefaultKubernetesClient(k8sConf) + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def name(): String = GCE_TEST_BACKEND +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000..51f8b96 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend + +private[spark] trait IntegrationTestBackend { + def name(): String + def initialize(): Unit + def getKubernetesClient(): DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + def getTestBackend(): IntegrationTestBackend = { + Option(System.getProperty("spark.kubernetes.test.master")) + .map(new GCETestBackend(_)) + .getOrElse(new MinikubeTestBackend()) + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala new file mode 100644 index 0000000..c04bd75 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import java.nio.file.Paths + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.commons.lang3.SystemUtils +import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils} + +// TODO support windows +private[spark] object Minikube extends Logging { + private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) { + Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile + } else if (SystemUtils.IS_OS_WINDOWS) { + throw new IllegalStateException("Executing Minikube based integration tests not yet " + + " available on Windows.") + } else { + Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile + } + + private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + + s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" + + private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + + // NOTE: This and the following methods are synchronized to prevent deleteMinikube from + // destroying the minikube VM while other methods try to use the VM. + // Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox. + def startMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.RUNNING) { + executeMinikube("start", "--memory", "6000", "--cpus", "8") + } else { + logInfo("Minikube is already started.") + } + } + + def getMinikubeIp: String = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val outputs = executeMinikube("ip") + .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) + assert(outputs.size == 1, "Unexpected amount of output from minikube ip") + outputs.head + } + + def getMinikubeStatus: MinikubeStatus.Value = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val statusString = executeMinikube("status") + .filter(_.contains("minikube: ")) + .head + .replaceFirst("minikube: ", "") + MinikubeStatus.unapply(statusString) + .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } + + def getDockerEnv: Map[String, String] = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + executeMinikube("docker-env", "--shell", "bash") + .filter(_.startsWith("export")) + .map(_.replaceFirst("export ", "").split('=')) + .map(arr => (arr(0), arr(1).replaceAllLiterally("\"", ""))) + .toMap + } + + def deleteMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.NONE) { + executeMinikube("delete") + } else { + logInfo("Minikube was already not running.") + } + } + + def getKubernetesClient: DefaultKubernetesClient = synchronized { + val kubernetesMaster = s"https://${getMinikubeIp}:8443" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + new DefaultKubernetesClient(kubernetesConf) + } + + def executeMinikubeSsh(command: String): Unit = { + executeMinikube("ssh", command) + } + + private def executeMinikube(action: String, args: String*): Seq[String] = { + if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { + if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { + throw new IllegalStateException("Failed to make the Minikube binary executable.") + } + } + ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, + MINIKUBE_STARTUP_TIMEOUT_SECONDS) + } +} + +private[spark] object MinikubeStatus extends Enumeration { + + // The following states are listed according to + // https://github.com/docker/machine/blob/master/libmachine/state/state.go. + val STARTING = status("Starting") + val RUNNING = status("Running") + val PAUSED = status("Paused") + val STOPPING = status("Stopping") + val STOPPED = status("Stopped") + val ERROR = status("Error") + val TIMEOUT = status("Timeout") + val SAVED = status("Saved") + val NONE = status("") + + def status(value: String): Value = new Val(nextId, value) + def unapply(s: String): Option[Value] = values.find(s == _.toString) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000..7a1433e --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder + +private[spark] class MinikubeTestBackend extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + Minikube.startMinikube() + if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) { + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + } + defaultClient = Minikube.getKubernetesClient + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def cleanUp(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + override def name(): String = MINIKUBE_TEST_BACKEND +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala new file mode 100644 index 0000000..9137199 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.nio.file.Paths + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" + val SPARK_DISTRO_PATH = Paths.get("target", "spark-distro") +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala new file mode 100644 index 0000000..0ae0f3e --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.docker + +import java.net.URI +import java.nio.file.Paths + +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH +import org.apache.spark.deploy.k8s.integrationtest.Logging + +private[spark] class SparkDockerImageBuilder + (private val dockerEnv: Map[String, String]) extends Logging{ + + private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH + // Dockerfile paths must be relative to the build path. + private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" + private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" + private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates.builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build() + .get()) + .build() + + def buildSparkDockerImages(): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + buildImage("spark-base", BASE_DOCKER_FILE) + buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + } + + private def buildImage(name: String, dockerFile: String): Unit = { + dockerClient.build( + DOCKER_BUILD_PATH, + name, + dockerFile, + new LoggingBuildHandler()) + logInfo(s"Built $name docker image") + } +}