Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit cd77c2a

Browse files
committed
Force containers to halt before removing images.
1 parent 8e1e381 commit cd77c2a

File tree

3 files changed

+60
-12
lines changed

3 files changed

+60
-12
lines changed

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
2121

2222
import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend
2323
import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
24-
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageManager
24+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2525

2626
private[spark] trait IntegrationTestBackend {
2727
def initialize(): Unit

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
2222

2323
import org.apache.spark.deploy.k8s.config.KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY
2424
import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
25-
import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageManager
25+
import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager
2626

2727
private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
2828

@@ -31,7 +31,7 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
3131
System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY))
3232
private val resolvedDockerImageTag =
3333
userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
34-
private val dockerImageManager = new SparkDockerImageManager(
34+
private val dockerManager = new KubernetesSuiteDockerManager(
3535
Minikube.getDockerEnv, resolvedDockerImageTag)
3636

3737
override def initialize(): Unit = {
@@ -40,15 +40,15 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
4040
s"Minikube must be running before integration tests can execute. Current status" +
4141
s" is: $minikubeStatus")
4242
if (userProvidedDockerImageTag.isEmpty) {
43-
dockerImageManager.buildSparkDockerImages()
43+
dockerManager.buildSparkDockerImages()
4444
}
4545
defaultClient = Minikube.getKubernetesClient
4646
}
4747

4848
override def cleanUp(): Unit = {
4949
super.cleanUp()
5050
if (userProvidedDockerImageTag.isEmpty) {
51-
dockerImageManager.deleteImages()
51+
dockerManager.deleteImages()
5252
}
5353
}
5454

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ import java.nio.file.Paths
2323
import com.google.common.base.Charsets
2424
import com.google.common.io.Files
2525
import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler}
26+
import com.spotify.docker.client.DockerClient.{ListContainersParam, RemoveContainerParam}
27+
import com.spotify.docker.client.messages.Container
2628
import org.apache.http.client.utils.URIBuilder
2729
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
2830
import org.scalatest.time.{Minutes, Seconds, Span}
2931
import scala.collection.JavaConverters._
3032

33+
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite
3134
import org.apache.spark.internal.Logging
3235
import org.apache.spark.util.{RedirectThread, Utils}
3336

34-
private[spark] class SparkDockerImageManager(
37+
private[spark] class KubernetesSuiteDockerManager(
3538
dockerEnv: Map[String, String], dockerTag: String) extends Logging {
3639

3740
private val DOCKER_BUILD_PATH = Paths.get("target", "docker")
@@ -62,12 +65,12 @@ private[spark] class SparkDockerImageManager(
6265
throw new IllegalStateException("DOCKER_CERT_PATH env not found."))
6366

6467
private val dockerClient = new DefaultDockerClient.Builder()
65-
.uri(httpsDockerUri)
66-
.dockerCertificates(DockerCertificates
67-
.builder()
68-
.dockerCertPath(Paths.get(dockerCerts))
69-
.build().get())
70-
.build()
68+
.uri(httpsDockerUri)
69+
.dockerCertificates(DockerCertificates
70+
.builder()
71+
.dockerCertPath(Paths.get(dockerCerts))
72+
.build().get())
73+
.build()
7174

7275
def buildSparkDockerImages(): Unit = {
7376
Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() }
@@ -97,6 +100,7 @@ private[spark] class SparkDockerImageManager(
97100
}
98101

99102
def deleteImages(): Unit = {
103+
removeRunningContainers()
100104
deleteImage("spark-driver")
101105
deleteImage("spark-driver-py")
102106
deleteImage("spark-executor")
@@ -137,6 +141,50 @@ private[spark] class SparkDockerImageManager(
137141
}
138142
}
139143

144+
/**
145+
* Forces all containers running an image with the configured tag to halt and be removed.
146+
*/
147+
private def removeRunningContainers(): Unit = {
148+
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
149+
val runningContainersWithImageTag = stopRunningContainers()
150+
require(
151+
runningContainersWithImageTag.nonEmpty,
152+
s"${runningContainersWithImageTag.size} containers found still running" +
153+
s" with the image tag $dockerTag")
154+
}
155+
dockerClient.listContainers(ListContainersParam.allContainers())
156+
.asScala
157+
.filter(containerHasImageWithTag(_))
158+
.foreach(container => dockerClient.removeContainer(container.id())
159+
160+
}
161+
162+
private def stopRunningContainers(): Iterable[Container] = {
163+
val runningContainersWithImageTag = getRunningContainersWithImageTag()
164+
if (runningContainersWithImageTag.nonEmpty) {
165+
log.info(s"Found ${runningContainersWithImageTag.size} containers running with" +
166+
s" an image with the tag $dockerTag. Attempting to remove these containers," +
167+
s" and then will stall for 2 seconds.")
168+
runningContainersWithImageTag.foreach { container =>
169+
dockerClient.stopContainer(container.id(), 5)
170+
}
171+
}
172+
runningContainersWithImageTag
173+
}
174+
175+
private def getRunningContainersWithImageTag(): Iterable[Container] = {
176+
dockerClient
177+
.listContainers(
178+
ListContainersParam.allContainers(),
179+
ListContainersParam.withStatusRunning())
180+
.asScala
181+
.filter(containerHasImageWithTag(_)
182+
}
183+
184+
private def containerHasImageWithTag(container: Container): Boolean = {
185+
container.image().endsWith(s":$dockerTag")
186+
}
187+
140188
private def deleteImage(name: String): Unit = {
141189
try {
142190
dockerClient.removeImage(s"$name:$dockerTag")

0 commit comments

Comments
 (0)