Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit eb45ae5

Browse files
hustcatfoxish
authored andcommitted
Add parameter for driver pod name (#258)
* Add parameter for driver pod name * Mark KUBERNETES_DRIVER_POD_NAME not being internal. Update docment. * Add test case for driver pod name * Diff driver pod name with appid * replace 'spark.kubernetes.driver.pod.name` with KUBERNETES_DRIVER_POD_NAME * Update readme to complete item
1 parent 546f09c commit eb45ae5

File tree

5 files changed

+34
-8
lines changed

5 files changed

+34
-8
lines changed

docs/running-on-kubernetes.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,13 @@ from the other deployment modes. See the [configuration page](configuration.html
350350
resource.
351351
</td>
352352
</tr>
353+
<tr>
354+
<td><code>spark.kubernetes.driver.pod.name</code></td>
355+
<td><code>(none)</code></td>
356+
<td>
357+
Name of the driver pod. If not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp to avoid name conflicts.
358+
</td>
359+
</tr>
353360
<tr>
354361
<td><code>spark.kubernetes.submission.waitAppCompletion</code></td>
355362
<td><code>true</code></td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,6 @@ package object config extends Logging {
267267
private[spark] val KUBERNETES_DRIVER_POD_NAME =
268268
ConfigBuilder("spark.kubernetes.driver.pod.name")
269269
.doc("Name of the driver pod.")
270-
.internal()
271270
.stringConf
272271
.createOptional
273272

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ private[spark] class Client(
5151
private val appName = sparkConf.getOption("spark.app.name")
5252
.getOrElse("spark")
5353
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
54+
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
55+
.getOrElse(kubernetesAppId)
5456
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
5557
private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId"
5658
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
@@ -150,7 +152,7 @@ private[spark] class Client(
150152
loggingInterval)
151153
Utils.tryWithResource(kubernetesClient
152154
.pods()
153-
.withName(kubernetesAppId)
155+
.withName(kubernetesDriverPodName)
154156
.watch(loggingWatch)) { _ =>
155157
val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() =>
156158
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
@@ -247,7 +249,7 @@ private[spark] class Client(
247249
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
248250
s" overridden as $kubernetesAppId")
249251
}
250-
sparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
252+
sparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
251253
sparkConf.set(KUBERNETES_DRIVER_SERVICE_NAME, driverService.getMetadata.getName)
252254
sparkConf.set("spark.app.id", kubernetesAppId)
253255
sparkConf.setIfMissing("spark.app.name", appName)
@@ -314,7 +316,7 @@ private[spark] class Client(
314316
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
315317
Utils.tryWithResource(kubernetesClient
316318
.pods()
317-
.withName(kubernetesAppId)
319+
.withName(kubernetesDriverPodName)
318320
.watch(podWatcher)) { _ =>
319321
Utils.tryWithResource(kubernetesClient
320322
.services()
@@ -445,7 +447,7 @@ private[spark] class Client(
445447
.build()
446448
val driverPod = kubernetesClient.pods().createNew()
447449
.withNewMetadata()
448-
.withName(kubernetesAppId)
450+
.withName(kubernetesDriverPodName)
449451
.withLabels(driverKubernetesSelectors.asJava)
450452
.withAnnotations(customAnnotations.asJava)
451453
.endMetadata()
@@ -571,7 +573,7 @@ private[spark] class Client(
571573
kubernetesClient: KubernetesClient,
572574
e: Throwable): String = {
573575
val driverPod = try {
574-
kubernetesClient.pods().withName(kubernetesAppId).get()
576+
kubernetesClient.pods().withName(kubernetesDriverPodName).get()
575577
} catch {
576578
case throwable: Throwable =>
577579
logError(s"Timed out while waiting $driverSubmitTimeoutSecs seconds for the" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ private[spark] class Client(
5858
private val appName = sparkConf.getOption("spark.app.name")
5959
.getOrElse("spark")
6060
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
61+
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
62+
.getOrElse(kubernetesAppId)
6163
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
6264
private val maybeStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
6365
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
@@ -123,7 +125,7 @@ private[spark] class Client(
123125
.build()
124126
val basePod = new PodBuilder()
125127
.withNewMetadata()
126-
.withName(kubernetesAppId)
128+
.withName(kubernetesDriverPodName)
127129
.addToLabels(allLabels.asJava)
128130
.addToAnnotations(parsedCustomAnnotations.asJava)
129131
.endMetadata()
@@ -176,7 +178,7 @@ private[spark] class Client(
176178
if (resolvedFiles.nonEmpty) {
177179
resolvedSparkConf.set("spark.files", resolvedFiles.mkString(","))
178180
}
179-
resolvedSparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
181+
resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
180182
resolvedSparkConf.set("spark.app.id", kubernetesAppId)
181183
// We don't need this anymore since we just set the JVM options on the environment
182184
resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,22 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
171171
"Unexpected value for annotation2")
172172
}
173173

174+
test("Run with driver pod name") {
175+
sparkConf.set(KUBERNETES_DRIVER_POD_NAME, "spark-pi")
176+
new Client(
177+
sparkConf = sparkConf,
178+
mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS,
179+
mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
180+
appArgs = Array.empty[String]).run()
181+
val driverPodMetadata = kubernetesTestComponents.kubernetesClient
182+
.pods()
183+
.withName("spark-pi")
184+
.get()
185+
.getMetadata()
186+
val driverName = driverPodMetadata.getName
187+
assert(driverName === "spark-pi", "Unexpected driver pod name.")
188+
}
189+
174190
test("Enable SSL on the driver submit server") {
175191
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
176192

0 commit comments

Comments
 (0)