Skip to content

Commit a83ae0d

Browse files
mccheahfoxish
authored andcommitted
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs
## What changes were proposed in this pull request? Breaks down the construction of driver pods and executor pods in a way that uses a common abstraction for both spark-submit creating the driver and KubernetesClusterSchedulerBackend creating the executor. Encourages more code reuse and is more legible than the older approach. The high-level design is discussed in more detail on the JIRA ticket. This pull request is the implementation of that design with some minor changes in the implementation details. No user-facing behavior should break as a result of this change. ## How was this patch tested? Migrated all unit tests from the old submission steps architecture to the new architecture. Integration tests should not have to change and pass given that this shouldn't change any outward behavior. Author: mcheah <[email protected]> Closes apache#20910 from mccheah/spark-22839-incremental.
1 parent 0323e61 commit a83ae0d

File tree

41 files changed

+2289
-2081
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2289
-2081
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,5 +167,5 @@ private[spark] object Config extends Logging {
167167
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
168168
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
169169

170-
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
170+
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
171171
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
20+
21+
import org.apache.spark.SparkConf
22+
import org.apache.spark.deploy.k8s.Config._
23+
import org.apache.spark.deploy.k8s.Constants._
24+
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
25+
import org.apache.spark.internal.config.ConfigEntry
26+
27+
private[spark] sealed trait KubernetesRoleSpecificConf
28+
29+
/*
30+
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
31+
*/
32+
private[spark] case class KubernetesDriverSpecificConf(
33+
mainAppResource: Option[MainAppResource],
34+
mainClass: String,
35+
appName: String,
36+
appArgs: Seq[String]) extends KubernetesRoleSpecificConf
37+
38+
/*
39+
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
40+
*/
41+
private[spark] case class KubernetesExecutorSpecificConf(
42+
executorId: String,
43+
driverPod: Pod)
44+
extends KubernetesRoleSpecificConf
45+
46+
/**
47+
* Structure containing metadata for Kubernetes logic to build Spark pods.
48+
*/
49+
private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
50+
sparkConf: SparkConf,
51+
roleSpecificConf: T,
52+
appResourceNamePrefix: String,
53+
appId: String,
54+
roleLabels: Map[String, String],
55+
roleAnnotations: Map[String, String],
56+
roleSecretNamesToMountPaths: Map[String, String],
57+
roleEnvs: Map[String, String]) {
58+
59+
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
60+
61+
def sparkJars(): Seq[String] = sparkConf
62+
.getOption("spark.jars")
63+
.map(str => str.split(",").toSeq)
64+
.getOrElse(Seq.empty[String])
65+
66+
def sparkFiles(): Seq[String] = sparkConf
67+
.getOption("spark.files")
68+
.map(str => str.split(",").toSeq)
69+
.getOrElse(Seq.empty[String])
70+
71+
def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
72+
73+
def imagePullSecrets(): Seq[LocalObjectReference] = {
74+
sparkConf
75+
.get(IMAGE_PULL_SECRETS)
76+
.map(_.split(","))
77+
.getOrElse(Array.empty[String])
78+
.map(_.trim)
79+
.map { secret =>
80+
new LocalObjectReferenceBuilder().withName(secret).build()
81+
}
82+
}
83+
84+
def nodeSelector(): Map[String, String] =
85+
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
86+
87+
def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
88+
89+
def get(conf: String): String = sparkConf.get(conf)
90+
91+
def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
92+
93+
def getOption(key: String): Option[String] = sparkConf.getOption(key)
94+
}
95+
96+
private[spark] object KubernetesConf {
97+
def createDriverConf(
98+
sparkConf: SparkConf,
99+
appName: String,
100+
appResourceNamePrefix: String,
101+
appId: String,
102+
mainAppResource: Option[MainAppResource],
103+
mainClass: String,
104+
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
105+
val sparkConfWithMainAppJar = sparkConf.clone()
106+
mainAppResource.foreach {
107+
case JavaMainAppResource(res) =>
108+
val previousJars = sparkConf
109+
.getOption("spark.jars")
110+
.map(_.split(","))
111+
.getOrElse(Array.empty)
112+
if (!previousJars.contains(res)) {
113+
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
114+
}
115+
}
116+
117+
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
118+
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
119+
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
120+
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
121+
"operations.")
122+
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
123+
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
124+
"operations.")
125+
val driverLabels = driverCustomLabels ++ Map(
126+
SPARK_APP_ID_LABEL -> appId,
127+
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
128+
val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
129+
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
130+
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
131+
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
132+
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
133+
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
134+
135+
KubernetesConf(
136+
sparkConfWithMainAppJar,
137+
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
138+
appResourceNamePrefix,
139+
appId,
140+
driverLabels,
141+
driverAnnotations,
142+
driverSecretNamesToMountPaths,
143+
driverEnvs)
144+
}
145+
146+
def createExecutorConf(
147+
sparkConf: SparkConf,
148+
executorId: String,
149+
appId: String,
150+
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
151+
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
152+
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
153+
require(
154+
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
155+
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
156+
require(
157+
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
158+
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
159+
" Spark.")
160+
require(
161+
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
162+
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
163+
val executorLabels = Map(
164+
SPARK_EXECUTOR_ID_LABEL -> executorId,
165+
SPARK_APP_ID_LABEL -> appId,
166+
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
167+
executorCustomLabels
168+
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
169+
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
170+
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
171+
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
172+
val executorEnv = sparkConf.getExecutorEnv.toMap
173+
174+
KubernetesConf(
175+
sparkConf.clone(),
176+
KubernetesExecutorSpecificConf(executorId, driverPod),
177+
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
178+
appId,
179+
executorLabels,
180+
executorAnnotations,
181+
executorSecrets,
182+
executorEnv)
183+
}
184+
}
Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,16 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
import io.fabric8.kubernetes.api.model.LocalObjectReference
19+
import io.fabric8.kubernetes.api.model.HasMetadata
2020

21-
import org.apache.spark.SparkFunSuite
22-
23-
class KubernetesUtilsTest extends SparkFunSuite {
24-
25-
test("testParseImagePullSecrets") {
26-
val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
27-
assert(noSecrets === Nil)
28-
29-
val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
30-
assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)
31-
32-
val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2 , s3,s4"))
33-
assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
34-
}
21+
private[spark] case class KubernetesDriverSpec(
22+
pod: SparkPod,
23+
driverKubernetesResources: Seq[HasMetadata],
24+
systemProperties: Map[String, String])
3525

26+
private[spark] object KubernetesDriverSpec {
27+
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
28+
SparkPod.initialPod(),
29+
Seq.empty,
30+
initialProps)
3631
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,6 @@ private[spark] object KubernetesUtils {
3737
sparkConf.getAllWithPrefix(prefix).toMap
3838
}
3939

40-
/**
41-
* Parses comma-separated list of imagePullSecrets into K8s-understandable format
42-
*/
43-
def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
44-
imagePullSecrets match {
45-
case Some(secretsCommaSeparated) =>
46-
secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
47-
case None => Nil
48-
}
49-
}
50-
5140
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
5241
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
5342
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala

Lines changed: 0 additions & 72 deletions
This file was deleted.
Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,21 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.k8s.submit.steps
17+
package org.apache.spark.deploy.k8s
1818

19-
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}
2020

21-
/**
22-
* Represents a step in configuring the Spark driver pod.
23-
*/
24-
private[spark] trait DriverConfigurationStep {
21+
private[spark] case class SparkPod(pod: Pod, container: Container)
2522

26-
/**
27-
* Apply some transformation to the previous state of the driver to add a new feature to it.
28-
*/
29-
def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
23+
private[spark] object SparkPod {
24+
def initialPod(): SparkPod = {
25+
SparkPod(
26+
new PodBuilder()
27+
.withNewMetadata()
28+
.endMetadata()
29+
.withNewSpec()
30+
.endSpec()
31+
.build(),
32+
new ContainerBuilder().build())
33+
}
3034
}

0 commit comments

Comments
 (0)