Skip to content

Commit 792476f

Browse files
committed
Address more comments.
1 parent 6c32d8f commit 792476f

File tree

4 files changed

+42
-64
lines changed

4 files changed

+42
-64
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ http://spark.apache.org/downloads.html. Or, you can create a distribution from
5656
source code using `make-distribution.sh`. For example:
5757

5858
```
59-
$ https://github.com/apache/spark.git
59+
$ git clone git@github.com/apache/spark.git
6060
$ cd spark
6161
$ ./dev/make-distribution.sh --tgz \
6262
-Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ package org.apache.spark.deploy.k8s.integrationtest
1818

1919
import java.io.Closeable
2020
import java.net.URI
21+
import java.io.{IOException, InputStream, OutputStream}
2122

22-
import java.io.{IOException,InputStream,OutputStream}
23+
import com.google.common.io.ByteStreams
2324

2425
object Utils extends Logging {
2526

@@ -85,30 +86,4 @@ object Utils extends Logging {
8586

8687
s"k8s://$resolvedURL"
8788
}
88-
89-
class RedirectThread(
90-
in: InputStream,
91-
out: OutputStream,
92-
name: String,
93-
propagateEof: Boolean = false) extends Thread(name) {
94-
setDaemon(true)
95-
override def run() {
96-
scala.util.control.Exception.ignoring(classOf[IOException]) {
97-
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
98-
Utils.tryWithSafeFinally {
99-
val buf = new Array[Byte](1024)
100-
var len = in.read(buf)
101-
while (len != -1) {
102-
out.write(buf, 0, len)
103-
out.flush()
104-
len = in.read(buf)
105-
}
106-
} {
107-
if (propagateEof) {
108-
out.close()
109-
}
110-
}
111-
}
112-
}
113-
}
11489
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
1818

19-
import java.util.UUID
20-
2119
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2220

2321
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
@@ -28,32 +26,26 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
2826
private var defaultClient: DefaultKubernetesClient = _
2927
private val userProvidedDockerImageTag = Option(
3028
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
31-
private val resolvedDockerImageTag =
32-
userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
3329
private val dockerManager = new KubernetesSuiteDockerManager(
34-
Minikube.getDockerEnv, resolvedDockerImageTag)
30+
Minikube.getDockerEnv, userProvidedDockerImageTag)
3531

3632
override def initialize(): Unit = {
3733
val minikubeStatus = Minikube.getMinikubeStatus
3834
require(minikubeStatus == MinikubeStatus.RUNNING,
3935
s"Minikube must be running before integration tests can execute. Current status" +
4036
s" is: $minikubeStatus")
41-
if (userProvidedDockerImageTag.isEmpty) {
42-
dockerManager.buildSparkDockerImages()
43-
}
37+
dockerManager.buildSparkDockerImages()
4438
defaultClient = Minikube.getKubernetesClient
4539
}
4640

4741
override def cleanUp(): Unit = {
4842
super.cleanUp()
49-
if (userProvidedDockerImageTag.isEmpty) {
50-
dockerManager.deleteImages()
51-
}
43+
dockerManager.deleteImages()
5244
}
5345

5446
override def getKubernetesClient(): DefaultKubernetesClient = {
5547
defaultClient
5648
}
5749

58-
override def dockerImageTag(): String = resolvedDockerImageTag
50+
override def dockerImageTag(): String = dockerManager.dockerImageTag()
5951
}

integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.integrationtest.docker
1919
import java.io.{File, PrintWriter}
2020
import java.net.URI
2121
import java.nio.file.Paths
22+
import java.util.UUID
2223

2324
import com.google.common.base.Charsets
2425
import com.google.common.io.Files
@@ -33,10 +34,10 @@ import scala.collection.JavaConverters._
3334
import org.apache.spark.deploy.k8s.integrationtest.constants._
3435
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite
3536
import org.apache.spark.deploy.k8s.integrationtest.Logging
36-
import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWithResource}
37+
import org.apache.spark.deploy.k8s.integrationtest.Utils.tryWithResource
3738

3839
private[spark] class KubernetesSuiteDockerManager(
39-
dockerEnv: Map[String, String], dockerTag: String) extends Logging {
40+
dockerEnv: Map[String, String], userProvidedDockerImageTag: Option[String]) extends Logging {
4041

4142
private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH
4243
// Dockerfile paths must be relative to the build path.
@@ -47,9 +48,11 @@ private[spark] class KubernetesSuiteDockerManager(
4748
private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile"
4849
private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
4950
private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
51+
52+
private val resolvedDockerImageTag =
53+
userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
5054
private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST",
5155
throw new IllegalStateException("DOCKER_HOST env not found."))
52-
5356
private val originalDockerUri = URI.create(dockerHost)
5457
private val httpsDockerUri = new URIBuilder()
5558
.setHost(originalDockerUri.getHost)
@@ -69,31 +72,39 @@ private[spark] class KubernetesSuiteDockerManager(
6972
.build()
7073

7174
def buildSparkDockerImages(): Unit = {
72-
Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() }
73-
buildImage("spark-base", BASE_DOCKER_FILE)
74-
buildImage("spark-driver", DRIVER_DOCKER_FILE)
75-
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
76-
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
75+
if (userProvidedDockerImageTag.isEmpty) {
76+
Eventually.eventually(TIMEOUT, INTERVAL) {
77+
dockerClient.ping()
78+
}
79+
buildImage("spark-base", BASE_DOCKER_FILE)
80+
buildImage("spark-driver", DRIVER_DOCKER_FILE)
81+
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
82+
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
83+
}
7784
}
7885

7986
def deleteImages(): Unit = {
80-
removeRunningContainers()
81-
deleteImage("spark-base")
82-
deleteImage("spark-driver")
83-
deleteImage("spark-executor")
84-
deleteImage("spark-init")
87+
if (userProvidedDockerImageTag.isEmpty) {
88+
removeRunningContainers()
89+
deleteImage("spark-base")
90+
deleteImage("spark-driver")
91+
deleteImage("spark-executor")
92+
deleteImage("spark-init")
93+
}
8594
}
8695

96+
def dockerImageTag(): String = resolvedDockerImageTag
97+
8798
private def buildImage(name: String, dockerFile: String): Unit = {
88-
logInfo(s"Building Docker image - $name:$dockerTag")
99+
logInfo(s"Building Docker image - $name:$resolvedDockerImageTag")
89100
val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve(
90-
s"$dockerFile-$dockerTag").toAbsolutePath.toString)
101+
s"$dockerFile-$resolvedDockerImageTag").toAbsolutePath.toString)
91102
dockerFileWithBaseTag.deleteOnExit()
92103
try {
93104
val originalDockerFileText = Files.readLines(
94105
DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala
95106
val dockerFileTextWithProperBaseImage = originalDockerFileText.map(
96-
_.replace("FROM spark-base", s"FROM spark-base:$dockerTag"))
107+
_.replace("FROM spark-base", s"FROM spark-base:$resolvedDockerImageTag"))
97108
tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter =>
98109
tryWithResource(new PrintWriter(fileWriter)) { printWriter =>
99110
for (line <- dockerFileTextWithProperBaseImage) {
@@ -105,8 +116,8 @@ private[spark] class KubernetesSuiteDockerManager(
105116
}
106117
dockerClient.build(
107118
DOCKER_BUILD_PATH,
108-
s"$name:$dockerTag",
109-
s"$dockerFile-$dockerTag",
119+
s"$name:$resolvedDockerImageTag",
120+
s"$dockerFile-$resolvedDockerImageTag",
110121
new LoggingBuildHandler())
111122
} finally {
112123
dockerFileWithBaseTag.delete()
@@ -119,15 +130,15 @@ private[spark] class KubernetesSuiteDockerManager(
119130
private def removeRunningContainers(): Unit = {
120131
val imageIds = dockerClient.listImages(ListImagesParam.allImages())
121132
.asScala
122-
.filter(image => image.repoTags().asScala.exists(_.endsWith(s":$dockerTag")))
133+
.filter(image => image.repoTags().asScala.exists(_.endsWith(s":$resolvedDockerImageTag")))
123134
.map(_.id())
124135
.toSet
125136
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
126137
val runningContainersWithImageTag = stopRunningContainers(imageIds)
127138
require(
128139
runningContainersWithImageTag.isEmpty,
129140
s"${runningContainersWithImageTag.size} containers found still running" +
130-
s" with the image tag $dockerTag")
141+
s" with the image tag $resolvedDockerImageTag")
131142
}
132143
dockerClient.listContainers(ListContainersParam.allContainers())
133144
.asScala
@@ -139,7 +150,7 @@ private[spark] class KubernetesSuiteDockerManager(
139150
.asScala
140151
.filter(container => imageIds.contains(container.imageId()))
141152
require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" +
142-
s" found with image tag $dockerTag.")
153+
s" found with image tag $resolvedDockerImageTag.")
143154
}
144155

145156
}
@@ -148,7 +159,7 @@ private[spark] class KubernetesSuiteDockerManager(
148159
val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds)
149160
if (runningContainersWithImageTag.nonEmpty) {
150161
logInfo(s"Found ${runningContainersWithImageTag.size} containers running with" +
151-
s" an image with the tag $dockerTag. Attempting to remove these containers," +
162+
s" an image with the tag $resolvedDockerImageTag. Attempting to remove these containers," +
152163
s" and then will stall for 2 seconds.")
153164
runningContainersWithImageTag.foreach { container =>
154165
dockerClient.stopContainer(container.id(), 5)
@@ -168,10 +179,10 @@ private[spark] class KubernetesSuiteDockerManager(
168179

169180
private def deleteImage(name: String): Unit = {
170181
try {
171-
dockerClient.removeImage(s"$name:$dockerTag")
182+
dockerClient.removeImage(s"$name:$resolvedDockerImageTag")
172183
} catch {
173184
case e: RuntimeException =>
174-
logWarning(s"Failed to delete image $name:$dockerTag. There may be images leaking in the" +
185+
logWarning(s"Failed to delete image $name:$resolvedDockerImageTag. There may be images leaking in the" +
175186
s" docker environment which are now stale and unused.", e)
176187
}
177188
}

0 commit comments

Comments
 (0)