Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit fa02fb1

Browse files
mccheahash211
authored andcommitted
Move executor pod construction to a separate class. (#452)
* Move executor pod construction to a separate class. This is the first of several measures to make KubernetesClusterSchedulerBackend feasible to test. * Revert change to README * Address comments. * Resolve merge conflicts. Move MiB change to ExecutorPodFactory.
1 parent bc845c3 commit fa02fb1

File tree

5 files changed

+420
-278
lines changed

5 files changed

+420
-278
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.kubernetes
18+
19+
import java.util.concurrent.atomic.AtomicLong
20+
21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
22+
import org.apache.commons.io.FilenameUtils
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.{SparkConf, SparkException}
26+
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
27+
import org.apache.spark.deploy.kubernetes.config._
28+
import org.apache.spark.deploy.kubernetes.constants._
29+
import org.apache.spark.deploy.kubernetes.submit.{InitContainerUtil, MountSmallFilesBootstrap}
30+
import org.apache.spark.util.Utils
31+
32+
// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
33+
// common across all executors. Then, pass in dynamic parameters into createExecutorPod.
34+
private[spark] trait ExecutorPodFactory {
35+
def createExecutorPod(
36+
executorId: String,
37+
applicationId: String,
38+
driverUrl: String,
39+
executorEnvs: Seq[(String, String)],
40+
shuffleServiceConfig: Option[ShuffleServiceConfig],
41+
driverPod: Pod,
42+
nodeToLocalTaskCount: Map[String, Int]): Pod
43+
}
44+
45+
private[spark] class ExecutorPodFactoryImpl(
46+
sparkConf: SparkConf,
47+
nodeAffinityExecutorPodModifier: NodeAffinityExecutorPodModifier,
48+
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
49+
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
50+
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
51+
extends ExecutorPodFactory {
52+
53+
import ExecutorPodFactoryImpl._
54+
55+
private val executorExtraClasspath = sparkConf.get(
56+
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
57+
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
58+
59+
private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
60+
sparkConf,
61+
KUBERNETES_EXECUTOR_LABEL_PREFIX,
62+
KUBERNETES_EXECUTOR_LABELS,
63+
"executor label")
64+
require(
65+
!executorLabels.contains(SPARK_APP_ID_LABEL),
66+
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
67+
require(
68+
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
69+
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
70+
s" Spark.")
71+
72+
private val executorAnnotations =
73+
ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
74+
sparkConf,
75+
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
76+
KUBERNETES_EXECUTOR_ANNOTATIONS,
77+
"executor annotation")
78+
private val nodeSelector =
79+
ConfigurationUtils.parsePrefixedKeyValuePairs(
80+
sparkConf,
81+
KUBERNETES_NODE_SELECTOR_PREFIX,
82+
"node selector")
83+
84+
private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
85+
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
86+
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
87+
private val blockmanagerPort = sparkConf
88+
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
89+
private val kubernetesDriverPodName = sparkConf
90+
.get(KUBERNETES_DRIVER_POD_NAME)
91+
.getOrElse(throw new SparkException("Must specify the driver pod name"))
92+
93+
private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
94+
95+
private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
96+
private val executorMemoryString = sparkConf.get(
97+
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
98+
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
99+
100+
private val memoryOverheadMiB = sparkConf
101+
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
102+
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
103+
MEMORY_OVERHEAD_MIN_MIB))
104+
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
105+
106+
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d)
107+
private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
108+
109+
override def createExecutorPod(
110+
executorId: String,
111+
applicationId: String,
112+
driverUrl: String,
113+
executorEnvs: Seq[(String, String)],
114+
shuffleServiceConfig: Option[ShuffleServiceConfig],
115+
driverPod: Pod,
116+
nodeToLocalTaskCount: Map[String, Int]): Pod = {
117+
val name = s"$executorPodNamePrefix-exec-$executorId"
118+
119+
// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
120+
// name as the hostname. This preserves uniqueness since the end of name contains
121+
// executorId and applicationId
122+
val hostname = name.substring(Math.max(0, name.length - 63))
123+
val resolvedExecutorLabels = Map(
124+
SPARK_EXECUTOR_ID_LABEL -> executorId,
125+
SPARK_APP_ID_LABEL -> applicationId,
126+
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
127+
executorLabels
128+
val executorMemoryQuantity = new QuantityBuilder(false)
129+
.withAmount(s"${executorMemoryMiB}Mi")
130+
.build()
131+
val executorMemoryLimitQuantity = new QuantityBuilder(false)
132+
.withAmount(s"${executorMemoryWithOverhead}Mi")
133+
.build()
134+
val executorCpuQuantity = new QuantityBuilder(false)
135+
.withAmount(executorCores.toString)
136+
.build()
137+
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
138+
new EnvVarBuilder()
139+
.withName(ENV_EXECUTOR_EXTRA_CLASSPATH)
140+
.withValue(cp)
141+
.build()
142+
}
143+
val executorExtraJavaOptionsEnv = sparkConf
144+
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
145+
.map { opts =>
146+
val delimitedOpts = Utils.splitCommandString(opts)
147+
delimitedOpts.zipWithIndex.map {
148+
case (opt, index) =>
149+
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
150+
}
151+
}.getOrElse(Seq.empty[EnvVar])
152+
val executorEnv = (Seq(
153+
(ENV_EXECUTOR_PORT, executorPort.toString),
154+
(ENV_DRIVER_URL, driverUrl),
155+
// Executor backend expects integral value for executor cores, so round it up to an int.
156+
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
157+
(ENV_EXECUTOR_MEMORY, executorMemoryString),
158+
(ENV_APPLICATION_ID, applicationId),
159+
(ENV_EXECUTOR_ID, executorId),
160+
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs)
161+
.map(env => new EnvVarBuilder()
162+
.withName(env._1)
163+
.withValue(env._2)
164+
.build()
165+
) ++ Seq(
166+
new EnvVarBuilder()
167+
.withName(ENV_EXECUTOR_POD_IP)
168+
.withValueFrom(new EnvVarSourceBuilder()
169+
.withNewFieldRef("v1", "status.podIP")
170+
.build())
171+
.build()
172+
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
173+
val requiredPorts = Seq(
174+
(EXECUTOR_PORT_NAME, executorPort),
175+
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
176+
.map(port => {
177+
new ContainerPortBuilder()
178+
.withName(port._1)
179+
.withContainerPort(port._2)
180+
.build()
181+
})
182+
183+
val executorContainer = new ContainerBuilder()
184+
.withName(s"executor")
185+
.withImage(executorDockerImage)
186+
.withImagePullPolicy(dockerImagePullPolicy)
187+
.withNewResources()
188+
.addToRequests("memory", executorMemoryQuantity)
189+
.addToLimits("memory", executorMemoryLimitQuantity)
190+
.addToRequests("cpu", executorCpuQuantity)
191+
.endResources()
192+
.addAllToEnv(executorEnv.asJava)
193+
.withPorts(requiredPorts.asJava)
194+
.build()
195+
196+
val executorPod = new PodBuilder()
197+
.withNewMetadata()
198+
.withName(name)
199+
.withLabels(resolvedExecutorLabels.asJava)
200+
.withAnnotations(executorAnnotations.asJava)
201+
.withOwnerReferences()
202+
.addNewOwnerReference()
203+
.withController(true)
204+
.withApiVersion(driverPod.getApiVersion)
205+
.withKind(driverPod.getKind)
206+
.withName(driverPod.getMetadata.getName)
207+
.withUid(driverPod.getMetadata.getUid)
208+
.endOwnerReference()
209+
.endMetadata()
210+
.withNewSpec()
211+
.withHostname(hostname)
212+
.withRestartPolicy("Never")
213+
.withNodeSelector(nodeSelector.asJava)
214+
.endSpec()
215+
.build()
216+
217+
val containerWithExecutorLimitCores = executorLimitCores.map {
218+
limitCores =>
219+
val executorCpuLimitQuantity = new QuantityBuilder(false)
220+
.withAmount(limitCores)
221+
.build()
222+
new ContainerBuilder(executorContainer)
223+
.editResources()
224+
.addToLimits("cpu", executorCpuLimitQuantity)
225+
.endResources()
226+
.build()
227+
}.getOrElse(executorContainer)
228+
229+
val withMaybeShuffleConfigExecutorContainer = shuffleServiceConfig.map { config =>
230+
config.shuffleDirs.foldLeft(containerWithExecutorLimitCores) { (container, dir) =>
231+
new ContainerBuilder(container)
232+
.addNewVolumeMount()
233+
.withName(FilenameUtils.getBaseName(dir))
234+
.withMountPath(dir)
235+
.endVolumeMount()
236+
.build()
237+
}
238+
}.getOrElse(containerWithExecutorLimitCores)
239+
val withMaybeShuffleConfigPod = shuffleServiceConfig.map { config =>
240+
config.shuffleDirs.foldLeft(executorPod) { (builder, dir) =>
241+
new PodBuilder(builder)
242+
.editSpec()
243+
.addNewVolume()
244+
.withName(FilenameUtils.getBaseName(dir))
245+
.withNewHostPath()
246+
.withPath(dir)
247+
.endHostPath()
248+
.endVolume()
249+
.endSpec()
250+
.build()
251+
}
252+
}.getOrElse(executorPod)
253+
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
254+
mountSmallFilesBootstrap.map { bootstrap =>
255+
bootstrap.mountSmallFilesSecret(
256+
withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer)
257+
}.getOrElse((withMaybeShuffleConfigPod, withMaybeShuffleConfigExecutorContainer))
258+
val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
259+
executorInitContainerBootstrap.map { bootstrap =>
260+
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(
261+
PodWithDetachedInitContainer(
262+
withMaybeSmallFilesMountedPod,
263+
new ContainerBuilder().build(),
264+
withMaybeSmallFilesMountedContainer))
265+
266+
val resolvedInitContainer = executorMountInitContainerSecretPlugin.map { plugin =>
267+
plugin.mountResourceStagingServerSecretIntoInitContainer(
268+
podWithDetachedInitContainer.initContainer)
269+
}.getOrElse(podWithDetachedInitContainer.initContainer)
270+
271+
val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
272+
podWithDetachedInitContainer.pod, resolvedInitContainer)
273+
274+
val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin =>
275+
plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer)
276+
}.getOrElse(podWithAttachedInitContainer)
277+
278+
(resolvedPodWithMountedSecret, podWithDetachedInitContainer.mainContainer)
279+
}.getOrElse((withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer))
280+
281+
val executorPodWithNodeAffinity =
282+
nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful(
283+
executorPodWithInitContainer, nodeToLocalTaskCount)
284+
new PodBuilder(executorPodWithNodeAffinity)
285+
.editSpec()
286+
.addToContainers(initBootstrappedExecutorContainer)
287+
.endSpec()
288+
.build()
289+
}
290+
}
291+
292+
private object ExecutorPodFactoryImpl {
293+
private val DEFAULT_STATIC_PORT = 10000
294+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterManager.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
9696
sparkConf,
9797
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
9898
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
99+
val executorPodFactory = new ExecutorPodFactoryImpl(
100+
sparkConf,
101+
NodeAffinityExecutorPodModifierImpl,
102+
mountSmallFilesBootstrap,
103+
executorInitContainerbootStrap,
104+
executorInitContainerSecretVolumePlugin)
99105
new KubernetesClusterSchedulerBackend(
100106
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl],
101107
sc,
102-
executorInitContainerbootStrap,
103-
executorInitContainerSecretVolumePlugin,
104-
mountSmallFilesBootstrap,
108+
executorPodFactory,
105109
kubernetesClient)
106110
}
107111

0 commit comments

Comments
 (0)