Skip to content

Commit b095232

Browse files
Marcelo Vanzinerikerlandson
authored andcommitted
[SPARK-29865][K8S] Ensure client-mode executors have same name prefix
This basically does what BasicDriverFeatureStep already does to achieve the same thing in cluster mode; but since that class (or any other feature) is not invoked in client mode, it needs to be done elsewhere. I also modified the client mode integration test to check the executor name prefix; while there I had to fix the minikube backend to parse the output from newer minikube versions (I have 1.5.2). Closes apache#26488 from vanzin/SPARK-29865. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Erik Erlandson <[email protected]>
1 parent fca0a6c commit b095232

File tree

3 files changed

+50
-8
lines changed

3 files changed

+50
-8
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.cache.CacheBuilder
2323
import io.fabric8.kubernetes.client.Config
2424

2525
import org.apache.spark.SparkContext
26-
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory}
26+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory}
2727
import org.apache.spark.deploy.k8s.Config._
2828
import org.apache.spark.deploy.k8s.Constants._
2929
import org.apache.spark.internal.Logging
@@ -61,6 +61,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
6161
None)
6262
}
6363

64+
// If KUBERNETES_EXECUTOR_POD_NAME_PREFIX is not set, initialize it so that all executors have
65+
// the same prefix. This is needed for client mode, where the feature steps code that sets this
66+
// configuration is not used.
67+
//
68+
// If/when feature steps are executed in client mode, they should instead take care of this,
69+
// and this code should be removed.
70+
if (!sc.conf.contains(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)) {
71+
sc.conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
72+
KubernetesConf.getResourceNamePrefix(sc.conf.get("spark.app.name")))
73+
}
74+
6475
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
6576
apiServerUri,
6677
Some(sc.conf.get(KUBERNETES_NAMESPACE)),

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
2727
val labels = Map("spark-app-selector" -> driverPodName)
2828
val driverPort = 7077
2929
val blockManagerPort = 10000
30+
val executorLabel = "spark-client-it"
3031
val driverService = testBackend
3132
.getKubernetesClient
3233
.services()
@@ -78,10 +79,11 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
7879
"/var/run/secrets/kubernetes.io/serviceaccount/token")
7980
.addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" +
8081
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
81-
.addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName")
8282
.addToArgs("--conf", "spark.executor.memory=500m")
8383
.addToArgs("--conf", "spark.executor.cores=1")
84-
.addToArgs("--conf", "spark.executor.instances=1")
84+
.addToArgs("--conf", "spark.executor.instances=2")
85+
.addToArgs("--conf", "spark.kubernetes.executor.deleteOnTermination=false")
86+
.addToArgs("--conf", s"spark.kubernetes.executor.label.$executorLabel=$executorLabel")
8587
.addToArgs("--conf",
8688
s"spark.driver.host=" +
8789
s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc")
@@ -99,13 +101,34 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
99101
.getLog
100102
.contains("Pi is roughly 3"), "The application did not complete.")
101103
}
104+
105+
val executors = kubernetesTestComponents
106+
.kubernetesClient
107+
.pods()
108+
.inNamespace(kubernetesTestComponents.namespace)
109+
.withLabel(executorLabel, executorLabel)
110+
.list()
111+
.getItems()
112+
assert(executors.size === 2)
113+
val prefixes = executors.asScala.map { pod =>
114+
val name = pod.getMetadata().getName()
115+
name.substring(0, name.lastIndexOf("-"))
116+
}.toSet
117+
assert(prefixes.size === 1, s"Executor prefixes did not match: $prefixes")
102118
} finally {
103119
// Have to delete the service manually since it doesn't have an owner reference
104120
kubernetesTestComponents
105121
.kubernetesClient
106122
.services()
107123
.inNamespace(kubernetesTestComponents.namespace)
108124
.delete(driverService)
125+
// Delete all executors, since the test explicitly asks them not to be deleted by the app.
126+
kubernetesTestComponents
127+
.kubernetesClient
128+
.pods()
129+
.inNamespace(kubernetesTestComponents.namespace)
130+
.withLabel(executorLabel, executorLabel)
131+
.delete()
109132
}
110133
}
111134

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ private[spark] object Minikube extends Logging {
3030
private val KUBELET_PREFIX = "kubelet:"
3131
private val APISERVER_PREFIX = "apiserver:"
3232
private val KUBECTL_PREFIX = "kubectl:"
33+
private val KUBECONFIG_PREFIX = "kubeconfig:"
3334
private val MINIKUBE_VM_PREFIX = "minikubeVM: "
3435
private val MINIKUBE_PREFIX = "minikube: "
3536
private val MINIKUBE_PATH = ".minikube"
@@ -86,18 +87,23 @@ private[spark] object Minikube extends Logging {
8687
val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
8788
val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
8889
val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))
90+
val kubeconfigString = statusString.find(_.contains(s"$KUBECONFIG_PREFIX "))
91+
val hasConfigStatus = kubectlString.isDefined || kubeconfigString.isDefined
8992

90-
if (hostString.isEmpty || kubeletString.isEmpty
91-
|| apiserverString.isEmpty || kubectlString.isEmpty) {
93+
if (hostString.isEmpty || kubeletString.isEmpty || apiserverString.isEmpty ||
94+
!hasConfigStatus) {
9295
MinikubeStatus.NONE
9396
} else {
9497
val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
9598
val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
9699
val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
97-
val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
98-
if (!status4.contains("Correctly Configured:")) {
99-
MinikubeStatus.NONE
100+
val isConfigured = if (kubectlString.isDefined) {
101+
val cfgStatus = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
102+
cfgStatus.contains("Correctly Configured:")
100103
} else {
104+
kubeconfigString.get.replaceFirst(s"$KUBECONFIG_PREFIX ", "") == "Configured"
105+
}
106+
if (isConfigured) {
101107
val stats = List(status1, status2, status3)
102108
.map(MinikubeStatus.unapply)
103109
.map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString")))
@@ -106,6 +112,8 @@ private[spark] object Minikube extends Logging {
106112
} else {
107113
MinikubeStatus.RUNNING
108114
}
115+
} else {
116+
MinikubeStatus.NONE
109117
}
110118
}
111119
}

0 commit comments

Comments
 (0)