Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6dbd32e

Browse files
sandfleeash211
authored andcommitted
Add node selectors for driver and executor pods (#355)
1 parent 8751a9a commit 6dbd32e

File tree

5 files changed

+35
-0
lines changed

5 files changed

+35
-0
lines changed

docs/running-on-kubernetes.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,16 @@ from the other deployment modes. See the [configuration page](configuration.html
758758
Specify the hard cpu limit for a single executor pod
759759
</td>
760760
</tr>
761+
<tr>
762+
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
763+
<td>(none)</td>
764+
<td>
765+
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
766+
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
767+
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
768+
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
769+
</td>
770+
</tr>
761771
</table>
762772

763773

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,18 @@ object ConfigurationUtils extends Logging {
6565
}
6666
combined.toMap
6767
}
68+
69+
def parsePrefixedKeyValuePairs(
70+
sparkConf: SparkConf,
71+
prefix: String,
72+
configType: String): Map[String, String] = {
73+
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
74+
fromPrefix.groupBy(_._1).foreach {
75+
case (key, values) =>
76+
require(values.size == 1,
77+
s"Cannot have multiple values for a given $configType key, got key $key with" +
78+
s" values $values")
79+
}
80+
fromPrefix.toMap
81+
}
6882
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,8 @@ package object config extends Logging {
497497
.stringConf
498498
.createOptional
499499

500+
private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
501+
500502
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
501503
if (!rawMasterString.startsWith("k8s://")) {
502504
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ private[spark] class BaseDriverConfigurationStep(
7373
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
7474
s" Spark bookkeeping operations.")
7575
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
76+
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
77+
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
7678
val driverCpuQuantity = new QuantityBuilder(false)
7779
.withAmount(driverCpuCores)
7880
.build()
@@ -117,6 +119,7 @@ private[spark] class BaseDriverConfigurationStep(
117119
.endMetadata()
118120
.withNewSpec()
119121
.withRestartPolicy("Never")
122+
.withNodeSelector(nodeSelector.asJava)
120123
.endSpec()
121124
.build()
122125
val resolvedSparkConf = driverSpec.driverSparkConf.clone()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
8484
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
8585
KUBERNETES_EXECUTOR_ANNOTATIONS,
8686
"executor annotation")
87+
private val nodeSelector =
88+
ConfigurationUtils.parsePrefixedKeyValuePairs(
89+
conf,
90+
KUBERNETES_NODE_SELECTOR_PREFIX,
91+
"node-selector")
8792
private var shufflePodCache: Option[ShufflePodCache] = None
8893
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
8994
private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY)
@@ -449,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
449454
.endMetadata()
450455
.withNewSpec()
451456
.withHostname(hostname)
457+
.withNodeSelector(nodeSelector.asJava)
452458
.endSpec()
453459
.build()
454460

0 commit comments

Comments
 (0)