Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 0abf0b9

Browse files
liyinan926foxish
authored andcommitted
Use the new initContainers field instead of the deprecated annotation (#528)
* Use the new initContainers field in Kubernetes 1.8 * Fixed the integration tests
1 parent f94499b commit 0abf0b9

File tree

8 files changed

+30
-44
lines changed

8 files changed

+30
-44
lines changed

resource-managers/kubernetes/core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<name>Spark Project Kubernetes</name>
3030
<properties>
3131
<sbt.project.name>kubernetes</sbt.project.name>
32-
<kubernetes.client.version>2.2.13</kubernetes.client.version>
32+
<kubernetes.client.version>3.0.0</kubernetes.client.version>
3333
</properties>
3434

3535
<dependencies>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ package object constants {
7474
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
7575

7676
// Bootstrapping dependencies with the init-container
77-
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
7877
private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH =
7978
"/mnt/secrets/spark-init"
8079
private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY =

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/InitContainerUtil.scala

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,15 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit
1818

19-
import com.fasterxml.jackson.databind.ObjectMapper
20-
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2119
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
22-
import scala.collection.JavaConverters._
23-
24-
import org.apache.spark.deploy.k8s.constants._
2520

2621
private[spark] object InitContainerUtil {
2722

28-
private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
29-
3023
def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
31-
val resolvedInitContainers = originalPodSpec
32-
.getMetadata
33-
.getAnnotations
34-
.asScala
35-
.get(INIT_CONTAINER_ANNOTATION)
36-
.map { existingInitContainerAnnotation =>
37-
val existingInitContainers = OBJECT_MAPPER.readValue(
38-
existingInitContainerAnnotation, classOf[List[Container]])
39-
existingInitContainers ++ Seq(initContainer)
40-
}.getOrElse(Seq(initContainer))
41-
val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers)
4224
new PodBuilder(originalPodSpec)
43-
.editMetadata()
44-
.removeFromAnnotations(INIT_CONTAINER_ANNOTATION)
45-
.addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers)
46-
.endMetadata()
25+
.editOrNewSpec()
26+
.addToInitContainers(initContainer)
27+
.endSpec()
4728
.build()
4829
}
4930
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
1818

1919
import java.util.concurrent.{CountDownLatch, TimeUnit}
2020

21-
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
21+
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
2222
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424
import scala.collection.JavaConverters._
@@ -109,15 +109,15 @@ private[k8s] class LoggingPodStatusWatcherImpl(
109109
("namespace", pod.getMetadata.getNamespace()),
110110
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
111111
("pod uid", pod.getMetadata.getUid),
112-
("creation time", pod.getMetadata.getCreationTimestamp()),
112+
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),
113113

114114
// spec details
115115
("service account name", pod.getSpec.getServiceAccountName()),
116116
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
117117
("node name", pod.getSpec.getNodeName()),
118118

119119
// status
120-
("start time", pod.getStatus.getStartTime),
120+
("start time", formatTime(pod.getStatus.getStartTime)),
121121
("container images",
122122
pod.getStatus.getContainerStatuses()
123123
.asScala
@@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
162162
case running: ContainerStateRunning =>
163163
Seq(
164164
("Container state", "Running"),
165-
("Container started at", running.getStartedAt))
165+
("Container started at", formatTime(running.getStartedAt)))
166166
case waiting: ContainerStateWaiting =>
167167
Seq(
168168
("Container state", "Waiting"),
@@ -175,4 +175,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(
175175
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
176176
}.getOrElse(Seq(("Container state", "N/A")))
177177
}
178+
179+
private def formatTime(time: Time): String = {
180+
if (time != null) time.getTime else "N/A"
181+
}
178182
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ import org.apache.spark.deploy.k8s.constants._
3131
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
3232
import org.apache.spark.util.Utils
3333

34-
private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
34+
private[spark] class InitContainerBootstrapStepSuite extends SparkFunSuite {
3535

36-
private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
3736
private val CONFIG_MAP_NAME = "spark-init-config-map"
3837
private val CONFIG_MAP_KEY = "spark-init-config-map-key"
3938

@@ -59,12 +58,9 @@ private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
5958
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvKey)
6059
assert(additionalDriverEnv.head.getValue ===
6160
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvValue)
62-
val driverAnnotations = preparedDriverSpec.driverPod.getMetadata.getAnnotations.asScala
63-
assert(driverAnnotations.size === 1)
64-
val initContainers = OBJECT_MAPPER.readValue(
65-
driverAnnotations(INIT_CONTAINER_ANNOTATION), classOf[Array[Container]])
66-
assert(initContainers.length === 1)
67-
val initContainerEnv = initContainers.head.getEnv.asScala
61+
val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
62+
assert(initContainers.size() === 1)
63+
val initContainerEnv = initContainers.get(0).getEnv.asScala
6864
assert(initContainerEnv.size === 1)
6965
assert(initContainerEnv.head.getName ===
7066
SecondTestInitContainerConfigurationStep$.additionalInitContainerEnvKey)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
179179
verify(nodeAffinityExecutorPodModifier, times(1))
180180
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))
181181

182-
assert(executor.getMetadata.getAnnotations.size() === 1)
183-
assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION))
182+
assert(executor.getSpec.getInitContainers.size() === 1)
184183
checkOwnerReferences(executor, driverPodUid)
185184
}
186185

resource-managers/kubernetes/integration-tests/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@
351351
<goal>wget</goal>
352352
</goals>
353353
<configuration>
354-
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64</url>
354+
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
355355
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
356356
<outputFileName>minikube</outputFileName>
357357
</configuration>
@@ -363,7 +363,7 @@
363363
<goal>wget</goal>
364364
</goals>
365365
<configuration>
366-
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64</url>
366+
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
367367
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
368368
<outputFileName>minikube</outputFileName>
369369
</configuration>

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ private[spark] object Minikube extends Logging {
6060
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
6161
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
6262
val statusString = executeMinikube("status")
63-
.filter(_.contains("minikubeVM: "))
63+
.filter(_.contains("minikube: "))
6464
.head
65-
.replaceFirst("minikubeVM: ", "")
65+
.replaceFirst("minikube: ", "")
6666
MinikubeStatus.unapply(statusString)
6767
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
6868
}
@@ -78,7 +78,7 @@ private[spark] object Minikube extends Logging {
7878

7979
def deleteMinikube(): Unit = synchronized {
8080
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
81-
if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) {
81+
if (getMinikubeStatus != MinikubeStatus.NONE) {
8282
executeMinikube("delete")
8383
} else {
8484
logInfo("Minikube was already not running.")
@@ -115,10 +115,17 @@ private[spark] object Minikube extends Logging {
115115

116116
private[spark] object MinikubeStatus extends Enumeration {
117117

118+
// The following states are listed according to
119+
// https://github.com/docker/machine/blob/master/libmachine/state/state.go.
120+
val STARTING = status("Starting")
118121
val RUNNING = status("Running")
122+
val PAUSED = status("Paused")
123+
val STOPPING = status("Stopping")
119124
val STOPPED = status("Stopped")
120-
val DOES_NOT_EXIST = status("Does Not Exist")
125+
val ERROR = status("Error")
126+
val TIMEOUT = status("Timeout")
121127
val SAVED = status("Saved")
128+
val NONE = status("")
122129

123130
def status(value: String): Value = new Val(nextId, value)
124131
def unapply(s: String): Option[Value] = values.find(s == _.toString)

0 commit comments

Comments
 (0)