16
16
*/
17
17
package org .apache .spark .deploy .k8s .integrationtest .docker
18
18
19
+ import java .io .{File , PrintWriter }
19
20
import java .net .URI
20
- import java .net .URLEncoder
21
21
import java .nio .file .Paths
22
22
23
- import com .spotify .docker .client .{DockerClient , DefaultDockerClient , DockerCertificates , LoggingBuildHandler }
23
+ import com .google .common .base .Charsets
24
+ import com .google .common .io .Files
25
+ import com .spotify .docker .client .{DefaultDockerClient , DockerCertificates , LoggingBuildHandler }
26
+ import com .spotify .docker .client .DockerClient .{ListContainersParam , ListImagesParam , RemoveContainerParam }
27
+ import com .spotify .docker .client .messages .Container
24
28
import org .apache .http .client .utils .URIBuilder
25
29
import org .scalatest .concurrent .{Eventually , PatienceConfiguration }
26
30
import org .scalatest .time .{Minutes , Seconds , Span }
31
+ import scala .collection .JavaConverters ._
27
32
28
- import org .apache .spark .deploy .k8s .integrationtest .constants .SPARK_DISTRO_PATH
33
+ import org .apache .spark .deploy .k8s .integrationtest .constants ._
34
+ import org .apache .spark .deploy .k8s .integrationtest .KubernetesSuite
29
35
import org .apache .spark .deploy .k8s .integrationtest .Logging
36
+ import org .apache .spark .deploy .k8s .integrationtest .Utils .{RedirectThread , tryWithResource }
30
37
31
- private [spark] class SparkDockerImageBuilder
32
- ( private val dockerEnv : Map [String , String ]) extends Logging {
38
+ private [spark] class KubernetesSuiteDockerManager (
39
+ dockerEnv : Map [String , String ], dockerTag : String ) extends Logging {
33
40
34
41
private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH
35
42
// Dockerfile paths must be relative to the build path.
@@ -41,7 +48,7 @@ private[spark] class SparkDockerImageBuilder
41
48
private val TIMEOUT = PatienceConfiguration .Timeout (Span (2 , Minutes ))
42
49
private val INTERVAL = PatienceConfiguration .Interval (Span (2 , Seconds ))
43
50
private val dockerHost = dockerEnv.getOrElse(" DOCKER_HOST" ,
44
- throw new IllegalStateException (" DOCKER_HOST env not found." ))
51
+ throw new IllegalStateException (" DOCKER_HOST env not found." ))
45
52
46
53
private val originalDockerUri = URI .create(dockerHost)
47
54
private val httpsDockerUri = new URIBuilder ()
@@ -51,44 +58,121 @@ private[spark] class SparkDockerImageBuilder
51
58
.build()
52
59
53
60
private val dockerCerts = dockerEnv.getOrElse(" DOCKER_CERT_PATH" ,
54
- throw new IllegalStateException (" DOCKER_CERT_PATH env not found." ))
61
+ throw new IllegalStateException (" DOCKER_CERT_PATH env not found." ))
55
62
56
63
private val dockerClient = new DefaultDockerClient .Builder ()
57
64
.uri(httpsDockerUri)
58
- .dockerCertificates(DockerCertificates .builder()
65
+ .dockerCertificates(DockerCertificates
66
+ .builder()
59
67
.dockerCertPath(Paths .get(dockerCerts))
60
- .build()
61
- .get())
68
+ .build().get())
62
69
.build()
63
70
64
71
def buildSparkDockerImages (): Unit = {
65
72
Eventually .eventually(TIMEOUT , INTERVAL ) { dockerClient.ping() }
66
- buildImage(" spark-base" , BASE_DOCKER_FILE ,
67
- Some (" {\" spark_jars\" :\" jars\" ,\" img_path\" :\" kubernetes/dockerfiles\" }" ))
73
+ buildImage(" spark-base" , BASE_DOCKER_FILE )
68
74
buildImage(" spark-driver" , DRIVER_DOCKER_FILE )
69
75
buildImage(" spark-executor" , EXECUTOR_DOCKER_FILE )
70
76
buildImage(" spark-init" , INIT_CONTAINER_DOCKER_FILE )
71
77
}
72
78
73
- private def buildImage (
74
- name : String ,
75
- dockerFile : String ,
76
- buildArgs : Option [String ] = None ): Unit = {
77
- if (buildArgs.nonEmpty) {
78
- dockerClient.build(
79
- DOCKER_BUILD_PATH ,
80
- name,
81
- dockerFile,
82
- new LoggingBuildHandler (),
83
- DockerClient .BuildParam .create(" buildargs" , URLEncoder .encode(buildArgs.get, " UTF-8" )))
84
- } else {
79
+ def deleteImages (): Unit = {
80
+ removeRunningContainers()
81
+ deleteImage(" spark-base" )
82
+ deleteImage(" spark-driver" )
83
+ deleteImage(" spark-executor" )
84
+ deleteImage(" spark-init" )
85
+ }
86
+
87
+ private def buildImage (name : String , dockerFile : String ): Unit = {
88
+ logInfo(s " Building Docker image - $name: $dockerTag" )
89
+ val dockerFileWithBaseTag = new File (DOCKER_BUILD_PATH .resolve(
90
+ s " $dockerFile- $dockerTag" ).toAbsolutePath.toString)
91
+ dockerFileWithBaseTag.deleteOnExit()
92
+ try {
93
+ val originalDockerFileText = Files .readLines(
94
+ DOCKER_BUILD_PATH .resolve(dockerFile).toFile, Charsets .UTF_8 ).asScala
95
+ val dockerFileTextWithProperBaseImage = originalDockerFileText.map(
96
+ _.replace(" FROM spark-base" , s " FROM spark-base: $dockerTag" ))
97
+ tryWithResource(Files .newWriter(dockerFileWithBaseTag, Charsets .UTF_8 )) { fileWriter =>
98
+ tryWithResource(new PrintWriter (fileWriter)) { printWriter =>
99
+ for (line <- dockerFileTextWithProperBaseImage) {
100
+ // scalastyle:off println
101
+ printWriter.println(line)
102
+ // scalastyle:on println
103
+ }
104
+ }
105
+ }
85
106
dockerClient.build(
86
107
DOCKER_BUILD_PATH ,
87
- name,
88
- dockerFile,
108
+ s " $ name: $dockerTag " ,
109
+ s " $ dockerFile- $dockerTag " ,
89
110
new LoggingBuildHandler ())
111
+ } finally {
112
+ dockerFileWithBaseTag.delete()
113
+ }
114
+ }
115
+
116
+ /**
117
+ * Forces all containers running an image with the configured tag to halt and be removed.
118
+ */
119
+ private def removeRunningContainers (): Unit = {
120
+ val imageIds = dockerClient.listImages(ListImagesParam .allImages())
121
+ .asScala
122
+ .filter(image => image.repoTags().asScala.exists(_.endsWith(s " : $dockerTag" )))
123
+ .map(_.id())
124
+ .toSet
125
+ Eventually .eventually(KubernetesSuite .TIMEOUT , KubernetesSuite .INTERVAL ) {
126
+ val runningContainersWithImageTag = stopRunningContainers(imageIds)
127
+ require(
128
+ runningContainersWithImageTag.isEmpty,
129
+ s " ${runningContainersWithImageTag.size} containers found still running " +
130
+ s " with the image tag $dockerTag" )
131
+ }
132
+ dockerClient.listContainers(ListContainersParam .allContainers())
133
+ .asScala
134
+ .filter(container => imageIds.contains(container.imageId()))
135
+ .foreach(container => dockerClient.removeContainer(
136
+ container.id(), RemoveContainerParam .forceKill(true )))
137
+ Eventually .eventually(KubernetesSuite .TIMEOUT , KubernetesSuite .INTERVAL ) {
138
+ val containersWithImageTag = dockerClient.listContainers(ListContainersParam .allContainers())
139
+ .asScala
140
+ .filter(container => imageIds.contains(container.imageId()))
141
+ require(containersWithImageTag.isEmpty, s " ${containersWithImageTag.size} containers still " +
142
+ s " found with image tag $dockerTag. " )
90
143
}
91
144
92
- logInfo(s " Built $name docker image " )
145
+ }
146
+
147
+ private def stopRunningContainers (imageIds : Set [String ]): Iterable [Container ] = {
148
+ val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds)
149
+ if (runningContainersWithImageTag.nonEmpty) {
150
+ logInfo(s " Found ${runningContainersWithImageTag.size} containers running with " +
151
+ s " an image with the tag $dockerTag. Attempting to remove these containers, " +
152
+ s " and then will stall for 2 seconds. " )
153
+ runningContainersWithImageTag.foreach { container =>
154
+ dockerClient.stopContainer(container.id(), 5 )
155
+ }
156
+ }
157
+ runningContainersWithImageTag
158
+ }
159
+
160
+ private def getRunningContainersWithImageIds (imageIds : Set [String ]): Iterable [Container ] = {
161
+ dockerClient
162
+ .listContainers(
163
+ ListContainersParam .allContainers(),
164
+ ListContainersParam .withStatusRunning())
165
+ .asScala
166
+ .filter(container => imageIds.contains(container.imageId()))
167
+ }
168
+
169
+ private def deleteImage (name : String ): Unit = {
170
+ try {
171
+ dockerClient.removeImage(s " $name: $dockerTag" )
172
+ } catch {
173
+ case e : RuntimeException =>
174
+ logWarning(s " Failed to delete image $name: $dockerTag. There may be images leaking in the " +
175
+ s " docker environment which are now stale and unused. " , e)
176
+ }
93
177
}
94
178
}
0 commit comments