Skip to content

Commit 5a14d01

Browse files
committed
add spec template configs
start from template for driver start from template for executor wip volume executor podspec template move logic to apply functions find containers style remove import compiles tests pass adding TemplateVolumeStepSuite DriverBuilder test WIP trying to write tests for KubernetesDriverBuilder constructor fix test fix test, and move loading logic to util method validate that the executor pod template is good in the driver cleaning redo mounting file rename to TemplateConfigMapStep Pass initialPod constructor instead of Spec constructor make driver and executor container names configurable create temp file correctly? executor initial pod test add docs addressing some comments integration tests attempt 1 fix up docs rename a variable fix style? fix docs to remove container name conf and further clarify actually add the pod template test remove containerName confs test tag and indent extension use resources for integartion tests templates rat fix path prevent having duplicate containers do not use broken removeContainer nits inline integration test methods, add volume to executor builder unit tests
1 parent 8080c02 commit 5a14d01

22 files changed

+795
-39
lines changed

docs/running-on-kubernetes.md

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,21 @@ To use a secret through an environment variable use the following options to the
185185
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
186186
```
187187

188+
## Pod Template
189+
Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
190+
Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
191+
To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
192+
to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
193+
file, the file will be automatically mounted onto a volume in the driver pod when it's created.
194+
195+
It is important to note that Spark is opinionated about certain pod configurations so there are values in the
196+
pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
197+
the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
198+
For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.
199+
200+
Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
201+
the list will be the driver or executor container.
202+
188203
## Introspection and Debugging
189204

190205
These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
@@ -775,4 +790,168 @@ specific to Spark on Kubernetes.
775790
This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
776791
</td>
777792
</tr>
793+
<tr>
794+
<td><code>spark.kubernetes.driver.podTemplateFile</code></td>
795+
<td>(none)</td>
796+
<td>
797+
Specify the local file that contains the driver [pod template](#pod-template). For example
798+
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
799+
</td>
800+
</tr>
801+
<tr>
802+
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
803+
<td>(none)</td>
804+
<td>
805+
Specify the local file that contains the executor [pod template](#pod-template). For example
806+
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
807+
</td>
808+
</tr>
809+
</table>
810+
811+
#### Pod template properties
812+
813+
See the below table for the full list of pod specifications that will be overwritten by spark.
814+
815+
### Pod Metadata
816+
817+
<table class="table">
818+
<tr><th>Pod metadata key</th><th>Modified value</th><th>Description</th></tr>
819+
<tr>
820+
<td>name</td>
821+
<td>Value of <code>spark.kubernetes.driver.pod.name</code></td>
822+
<td>
823+
The driver pod name will be overwritten with either the configured or default value of
824+
<code>spark.kubernetes.driver.pod.name</code>. The executor pod names will be unaffected.
825+
</td>
826+
</tr>
827+
<tr>
828+
<td>namespace</td>
829+
<td>Value of <code>spark.kubernetes.namespace</code></td>
830+
<td>
831+
Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
832+
be replaced by either the configured or default spark conf value.
833+
</td>
834+
</tr>
835+
<tr>
836+
<td>labels</td>
837+
<td>Adds the labels from <code>spark.kubernetes.{driver,executor}.label.*</code></td>
838+
<td>
839+
Spark will add additional labels specified by the spark configuration.
840+
</td>
841+
</tr>
842+
<tr>
843+
<td>annotations</td>
844+
<td>Adds the annotations from <code>spark.kubernetes.{driver,executor}.annotation.*</code></td>
845+
<td>
846+
Spark will add additional labels specified by the spark configuration.
847+
</td>
848+
</tr>
849+
</table>
850+
851+
### Pod Spec
852+
853+
<table class="table">
854+
<tr><th>Pod spec key</th><th>Modified value</th><th>Description</th></tr>
855+
<tr>
856+
<td>imagePullSecrets</td>
857+
<td>Adds image pull secrets from <code>spark.kubernetes.container.image.pullSecrets</code></td>
858+
<td>
859+
Additional pull secrets will be added from the spark configuration to both executor pods.
860+
</td>
861+
</tr>
862+
<tr>
863+
<td>nodeSelector</td>
864+
<td>Adds node selectors from <code>spark.kubernetes.node.selector.*</code></td>
865+
<td>
866+
Additional node selectors will be added from the spark configuration to both executor pods.
867+
</td>
868+
</tr>
869+
<tr>
870+
<td>restartPolicy</td>
871+
<td><code>"never"</code></td>
872+
<td>
873+
Spark assumes that both drivers and executors never restart.
874+
</td>
875+
</tr>
876+
<tr>
877+
<td>serviceAccount</td>
878+
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
879+
<td>
880+
Spark will override <code>serviceAccount</code> with the value of the spark configuration for only
881+
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
882+
</td>
883+
</tr>
884+
<tr>
885+
<td>serviceAccountName</td>
886+
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
887+
<td>
888+
Spark will override <code>serviceAccountName</code> with the value of the spark configuration for only
889+
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
890+
</td>
891+
</tr>
892+
<tr>
893+
<td>volumes</td>
894+
<td>Adds volumes from <code>spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path</code></td>
895+
<td>
896+
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
897+
spark conf and pod template files.
898+
</td>
899+
</tr>
778900
</table>
901+
902+
### Container spec
903+
904+
The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.
905+
906+
<table class="table">
907+
<tr><th>Container spec key</th><th>Modified value</th><th>Description</th></tr>
908+
<tr>
909+
<td>env</td>
910+
<td>Adds env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
911+
<td>
912+
Spark will add driver env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code>, and
913+
executor env variables from <code>spark.executorEnv.[EnvironmentVariableName]</code>.
914+
</td>
915+
</tr>
916+
<tr>
917+
<td>image</td>
918+
<td>Value of <code>spark.kubernetes.{driver,executor}.container.image</code></td>
919+
<td>
920+
The image will be defined by the spark configurations.
921+
</td>
922+
</tr>
923+
<tr>
924+
<td>imagePullPolicy</td>
925+
<td>Value of <code>spark.kubernetes.container.image.pullPolicy</code></td>
926+
<td>
927+
Spark will override the pull policy for both driver and executors.
928+
</td>
929+
</tr>
930+
<tr>
931+
<td>name</td>
932+
<td>See description.</code></td>
933+
<td>
934+
The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
935+
"executor" for each executor container) if not defined by the pod template. If the container is defined by the
936+
template, the template's name will be used.
937+
</td>
938+
</tr>
939+
<tr>
940+
<td>resources</td>
941+
<td>See description</td>
942+
<td>
943+
The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
944+
<code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
945+
<code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.
946+
947+
</td>
948+
</tr>
949+
<tr>
950+
<td>volumeMounts</td>
951+
<td>Add volumes from <code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly}</code></td>
952+
<td>
953+
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
954+
spark conf and pod template files.
955+
</td>
956+
</tr>
957+
</table>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,18 @@ private[spark] object Config extends Logging {
235235
"Ensure that major Python version is either Python2 or Python3")
236236
.createWithDefault("2")
237237

238+
val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
239+
ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
240+
.doc("File containing a template pod spec for the driver")
241+
.stringConf
242+
.createOptional
243+
244+
val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
245+
ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
246+
.doc("File containing a template pod spec for executors")
247+
.stringConf
248+
.createOptional
249+
238250
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
239251
"spark.kubernetes.authenticate.submission"
240252

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,16 @@ private[spark] object Constants {
7777
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
7878
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
7979

80+
// Pod spec templates
81+
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
82+
val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH = "/opt/spark/pod-template"
83+
val POD_TEMPLATE_VOLUME = "podspec-volume"
84+
val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
85+
val POD_TEMPLATE_KEY = "podspec-configmap-key"
86+
8087
// Miscellaneous
8188
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
82-
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
89+
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
90+
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
8391
val MEMORY_OVERHEAD_MIN_MIB = 384L
8492
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ private[spark] case class KubernetesDriverSpec(
2424
systemProperties: Map[String, String])
2525

2626
private[spark] object KubernetesDriverSpec {
27-
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
28-
SparkPod.initialPod(),
29-
Seq.empty,
30-
initialProps)
27+
def initialSpec(initialConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec =
28+
KubernetesDriverSpec(
29+
SparkPod.initialPod(),
30+
Seq.empty,
31+
initialConf.sparkConf.getAll.toMap)
3132
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@ package org.apache.spark.deploy.k8s
1818

1919
import java.io.File
2020

21-
import org.apache.spark.SparkConf
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
22+
import io.fabric8.kubernetes.client.KubernetesClient
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.{SparkConf, SparkException}
26+
import org.apache.spark.internal.Logging
2227
import org.apache.spark.util.Utils
2328

24-
private[spark] object KubernetesUtils {
29+
private[spark] object KubernetesUtils extends Logging {
2530

2631
/**
2732
* Extract and parse Spark configuration properties with a given name prefix and
@@ -72,5 +77,28 @@ private[spark] object KubernetesUtils {
7277
}
7378
}
7479

80+
def loadPodFromTemplate(
81+
kubernetesClient: KubernetesClient,
82+
templateFile: File): SparkPod = {
83+
try {
84+
val pod = kubernetesClient.pods().load(templateFile).get()
85+
pod.getSpec.getContainers.asScala.toList match {
86+
case first :: rest => SparkPod(
87+
new PodBuilder(pod)
88+
.editSpec()
89+
.withContainers(rest.asJava)
90+
.endSpec()
91+
.build(),
92+
first)
93+
case Nil => SparkPod(pod, new ContainerBuilder().build())
94+
}
95+
} catch {
96+
case e: Exception =>
97+
logError(
98+
s"Encountered exception while attempting to load initial pod spec from file", e)
99+
throw new SparkException("Could not load driver pod from template file.", e)
100+
}
101+
}
102+
75103
def parseMasterUrl(url: String): String = url.substring("k8s://".length)
76104
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
8080
)
8181
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
8282
val driverContainer = new ContainerBuilder(pod.container)
83-
.withName(DRIVER_CONTAINER_NAME)
83+
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
8484
.withImage(driverContainerImage)
8585
.withImagePullPolicy(conf.imagePullPolicy())
8686
.addNewPort()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private[spark] class BasicExecutorFeatureStep(
129129
}
130130

131131
val executorContainer = new ContainerBuilder(pod.container)
132-
.withName("executor")
132+
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
133133
.withImage(executorContainerImage)
134134
.withImagePullPolicy(kubernetesConf.imagePullPolicy())
135135
.withNewResources()
@@ -163,8 +163,8 @@ private[spark] class BasicExecutorFeatureStep(
163163
val executorPod = new PodBuilder(pod.pod)
164164
.editOrNewMetadata()
165165
.withName(name)
166-
.withLabels(kubernetesConf.roleLabels.asJava)
167-
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
166+
.addToLabels(kubernetesConf.roleLabels.asJava)
167+
.addToAnnotations(kubernetesConf.roleAnnotations.asJava)
168168
.addToOwnerReferences(ownerReference.toSeq: _*)
169169
.endMetadata()
170170
.editOrNewSpec()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import java.io.File
20+
import java.nio.charset.StandardCharsets
21+
22+
import com.google.common.io.Files
23+
import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder}
24+
25+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
26+
import org.apache.spark.deploy.k8s.Config._
27+
import org.apache.spark.deploy.k8s.Constants._
28+
29+
private[spark] class PodTemplateConfigMapStep(
30+
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
31+
extends KubernetesFeatureConfigStep {
32+
def configurePod(pod: SparkPod): SparkPod = {
33+
val podWithVolume = new PodBuilder(pod.pod)
34+
.editSpec()
35+
.addNewVolume()
36+
.withName(POD_TEMPLATE_VOLUME)
37+
.withNewConfigMap()
38+
.withName(POD_TEMPLATE_CONFIGMAP)
39+
.addNewItem()
40+
.withKey(POD_TEMPLATE_KEY)
41+
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
42+
.endItem()
43+
.endConfigMap()
44+
.endVolume()
45+
.endSpec()
46+
.build()
47+
48+
val containerWithVolume = new ContainerBuilder(pod.container)
49+
.addNewVolumeMount()
50+
.withName(POD_TEMPLATE_VOLUME)
51+
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH)
52+
.endVolumeMount()
53+
.build()
54+
SparkPod(podWithVolume, containerWithVolume)
55+
}
56+
57+
def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
58+
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
59+
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
60+
61+
def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
62+
require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
63+
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
64+
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
65+
Seq(new ConfigMapBuilder()
66+
.withNewMetadata()
67+
.withName(POD_TEMPLATE_CONFIGMAP)
68+
.endMetadata()
69+
.addToData(POD_TEMPLATE_KEY, podTemplateString)
70+
.build())
71+
}
72+
}

0 commit comments

Comments
 (0)