17
17
package org .apache .spark .scheduler .cluster .kubernetes
18
18
19
19
import java .io .Closeable
20
+ import java .net .InetAddress
20
21
import java .util .concurrent .TimeUnit
21
22
import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
22
23
24
+ import com .fasterxml .jackson .databind .ObjectMapper
25
+ import com .fasterxml .jackson .module .scala .DefaultScalaModule
23
26
import io .fabric8 .kubernetes .api .model .{ContainerPortBuilder , EnvVarBuilder , EnvVarSourceBuilder , Pod , PodBuilder , QuantityBuilder }
24
27
import io .fabric8 .kubernetes .client .{KubernetesClient , KubernetesClientException , Watcher }
25
28
import io .fabric8 .kubernetes .client .Watcher .Action
@@ -177,16 +180,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
177
180
.newDaemonSingleThreadScheduledExecutor(" kubernetes-pod-allocator" )
178
181
179
182
private val allocatorRunnable : Runnable = new Runnable {
183
+
180
184
override def run (): Unit = {
181
185
if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
182
186
logDebug(" Waiting for pending executors before scaling" )
183
187
} else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
184
188
logDebug(" Maximum allowed executor limit reached. Not scaling up further." )
185
189
} else {
190
+ val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
186
191
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
187
192
for (i <- 0 until math.min(
188
193
totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
189
- runningExecutorPods += allocateNewExecutorPod()
194
+ runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount )
190
195
logInfo(
191
196
s " Requesting a new executor, total executors is now ${runningExecutorPods.size}" )
192
197
}
@@ -195,6 +200,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
195
200
}
196
201
}
197
202
203
+ private val objectMapper = new ObjectMapper ().registerModule(DefaultScalaModule )
204
+
198
205
private def getShuffleClient (): KubernetesExternalShuffleClient = {
199
206
new KubernetesExternalShuffleClient (
200
207
SparkTransportConf .fromSparkConf(conf, " shuffle" ),
@@ -283,7 +290,70 @@ private[spark] class KubernetesClusterSchedulerBackend(
283
290
}
284
291
}
285
292
286
- private def allocateNewExecutorPod (): (String , Pod ) = {
293
+ /**
294
+ * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
295
+ * locality if an executor launches on the cluster node.
296
+ */
297
+ private def getNodesWithLocalTaskCounts () : Map [String , Int ] = {
298
+ val executorPodsWithIPs = EXECUTOR_PODS_BY_IPS_LOCK .synchronized {
299
+ executorPodsByIPs.values.toList // toList makes a defensive copy.
300
+ }
301
+ val nodeToLocalTaskCount = mutable.Map [String , Int ]() ++
302
+ KubernetesClusterSchedulerBackend .this .synchronized {
303
+ hostToLocalTaskCount
304
+ }
305
+ for (pod <- executorPodsWithIPs) {
306
+ // Remove cluster nodes that are running our executors already.
307
+ // TODO: This prefers spreading out executors across nodes. In case users want
308
+ // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
309
+ // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
310
+ nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
311
+ nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
312
+ nodeToLocalTaskCount.remove(
313
+ InetAddress .getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
314
+ }
315
+ nodeToLocalTaskCount.toMap[String , Int ]
316
+ }
317
+
318
+ private def addNodeAffinityAnnotationIfUseful (basePodBuilder : PodBuilder ,
319
+ nodeToTaskCount : Map [String , Int ]): PodBuilder = {
320
+ def scaleToRange (value : Int , baseMin : Double , baseMax : Double ,
321
+ rangeMin : Double , rangeMax : Double ): Int =
322
+ (((rangeMax - rangeMin) * (value - baseMin) / (baseMax - baseMin)) + rangeMin).toInt
323
+
324
+ if (nodeToTaskCount.nonEmpty) {
325
+ val taskTotal = nodeToTaskCount.foldLeft(0 )(_ + _._2)
326
+ // Normalize to node affinity weights in 1 to 100 range.
327
+ val nodeToWeight = nodeToTaskCount.map{
328
+ case (node, taskCount) =>
329
+ (node, scaleToRange(taskCount, 1 , taskTotal, rangeMin = 1 , rangeMax = 100 ))}
330
+ val weightToNodes = nodeToWeight.groupBy(_._2).mapValues(_.keys)
331
+ // @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
332
+ val nodeAffinityJson = objectMapper.writeValueAsString(SchedulerAffinity (NodeAffinity (
333
+ preferredDuringSchedulingIgnoredDuringExecution =
334
+ for ((weight, nodes) <- weightToNodes) yield
335
+ WeightedPreference (weight,
336
+ Preference (Array (MatchExpression (" kubernetes.io/hostname" , " In" , nodes))))
337
+ )))
338
+ // TODO: Use non-annotation syntax when we switch to K8s version 1.6.
339
+ logDebug(s " Adding nodeAffinity as annotation $nodeAffinityJson" )
340
+ basePodBuilder.editMetadata()
341
+ .addToAnnotations(ANNOTATION_EXECUTOR_NODE_AFFINITY , nodeAffinityJson)
342
+ .endMetadata()
343
+ } else {
344
+ basePodBuilder
345
+ }
346
+ }
347
+
348
+ /**
349
+ * Allocates a new executor pod
350
+ *
351
+ * @param nodeToLocalTaskCount A map of K8s cluster nodes to the number of tasks that could
352
+ * benefit from data locality if an executor launches on the cluster
353
+ * node.
354
+ * @return A tuple of the new executor name and the Pod data structure.
355
+ */
356
+ private def allocateNewExecutorPod (nodeToLocalTaskCount : Map [String , Int ]): (String , Pod ) = {
287
357
val executorId = EXECUTOR_ID_COUNTER .incrementAndGet().toString
288
358
val name = s " $executorPodNamePrefix-exec- $executorId"
289
359
@@ -393,14 +463,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
393
463
.endSpec()
394
464
}
395
465
}.getOrElse(basePodBuilder)
396
- val resolvedExecutorPod = executorInitContainerBootstrap.map { bootstrap =>
397
- bootstrap.bootstrapInitContainerAndVolumes(
398
- " executor" ,
399
- withMaybeShuffleConfigPodBuilder)
400
- }.getOrElse(withMaybeShuffleConfigPodBuilder)
466
+
467
+ val executorInitContainerPodBuilder = executorInitContainerBootstrap.map {
468
+ bootstrap =>
469
+ bootstrap.bootstrapInitContainerAndVolumes(
470
+ " executor" ,
471
+ withMaybeShuffleConfigPodBuilder)
472
+ }.getOrElse(withMaybeShuffleConfigPodBuilder)
473
+
474
+ val resolvedExecutorPodBuilder = addNodeAffinityAnnotationIfUseful(
475
+ executorInitContainerPodBuilder, nodeToLocalTaskCount)
401
476
402
477
try {
403
- (executorId, kubernetesClient.pods.create(resolvedExecutorPod .build()))
478
+ (executorId, kubernetesClient.pods.create(resolvedExecutorPodBuilder .build()))
404
479
} catch {
405
480
case throwable : Throwable =>
406
481
logError(" Failed to allocate executor pod." , throwable)
@@ -521,3 +596,15 @@ private object KubernetesClusterSchedulerBackend {
521
596
private val DEFAULT_STATIC_PORT = 10000
522
597
private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
523
598
}
599
+
600
+ /**
601
+ * These case classes model K8s node affinity syntax for
602
+ * preferredDuringSchedulingIgnoredDuringExecution.
603
+ * @see https://kubernetes.io/docs/concepts/configuration/assign-pod-node
604
+ */
605
+ case class SchedulerAffinity (nodeAffinity : NodeAffinity )
606
+ case class NodeAffinity (preferredDuringSchedulingIgnoredDuringExecution :
607
+ Iterable [WeightedPreference ])
608
+ case class WeightedPreference (weight : Int , preference : Preference )
609
+ case class Preference (matchExpressions : Array [MatchExpression ])
610
+ case class MatchExpression (key : String , operator : String , values : Iterable [String ])
0 commit comments