Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit e5623b7

Browse files
mccheahfoxish
authored andcommitted
Allow custom executor labels and annotations (#321)
* Allow custom executor labels and annotations * Address comments. * Fix scalastyle.
1 parent 9be8f20 commit e5623b7

File tree

4 files changed

+62
-28
lines changed

4 files changed

+62
-28
lines changed

docs/running-on-kubernetes.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,23 @@ from the other deployment modes. See the [configuration page](configuration.html
476476
pairs, where each annotation is in the format <code>key=value</code>.
477477
</td>
478478
</tr>
479+
<tr>
480+
<td><code>spark.kubernetes.executor.labels</code></td>
481+
<td>(none)</td>
482+
<td>
483+
Custom labels that will be added to the executor pods. This should be a comma-separated list of label key-value
484+
pairs, where each label is in the format <code>key=value</code>. Note that Spark also adds its own labels to the
485+
executor pods for bookkeeping purposes.
486+
</td>
487+
</tr>
488+
<tr>
489+
<td><code>spark.kubernetes.executor.annotations</code></td>
490+
<td>(none)</td>
491+
<td>
492+
Custom annotations that will be added to the executor pods. This should be a comma-separated list of annotation
493+
key-value pairs, where each annotation is in the format <code>key=value</code>.
494+
</td>
495+
</tr>
479496
<tr>
480497
<td><code>spark.kubernetes.driver.pod.name</code></td>
481498
<td>(none)</td>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,22 @@ package object config extends Logging {
211211
.stringConf
212212
.createOptional
213213

214+
private[spark] val KUBERNETES_EXECUTOR_LABELS =
215+
ConfigBuilder("spark.kubernetes.executor.labels")
216+
.doc("Custom labels that will be added to the executor pods. This should be a" +
217+
" comma-separated list of label key-value pairs, where each label is in the format" +
218+
" key=value.")
219+
.stringConf
220+
.createOptional
221+
222+
private[spark] val KUBERNETES_EXECUTOR_ANNOTATIONS =
223+
ConfigBuilder("spark.kubernetes.executor.annotations")
224+
.doc("Custom annotations that will be added to the executor pods. This should be a" +
225+
" comma-separated list of annotation key-value pairs, where each annotation is in the" +
226+
" format key=value.")
227+
.stringConf
228+
.createOptional
229+
214230
private[spark] val KUBERNETES_DRIVER_POD_NAME =
215231
ConfigBuilder("spark.kubernetes.driver.pod.name")
216232
.doc("Name of the driver pod.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.util.Collections
2222
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder}
2323
import scala.collection.JavaConverters._
2424

25-
import org.apache.spark.{SparkConf, SparkException}
25+
import org.apache.spark.SparkConf
26+
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2627
import org.apache.spark.deploy.kubernetes.config._
2728
import org.apache.spark.deploy.kubernetes.constants._
2829
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
@@ -75,18 +76,16 @@ private[spark] class Client(
7576
def run(): Unit = {
7677
validateNoDuplicateFileNames(sparkJars)
7778
validateNoDuplicateFileNames(sparkFiles)
78-
val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key,
79-
"labels")
79+
val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs(
80+
customLabels, KUBERNETES_DRIVER_LABELS.key, "labels")
8081
require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
8182
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
8283
require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" +
8384
s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
8485
val allLabels = parsedCustomLabels ++
8586
Map(SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName)
86-
val parsedCustomAnnotations = parseKeyValuePairs(
87-
customAnnotations,
88-
KUBERNETES_DRIVER_ANNOTATIONS.key,
89-
"annotations")
87+
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
88+
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
9089
Utils.tryWithResource(kubernetesClientProvider.get) { kubernetesClient =>
9190
val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
9291
new EnvVarBuilder()
@@ -237,24 +236,6 @@ private[spark] class Client(
237236
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
238237
}
239238
}
240-
241-
private def parseKeyValuePairs(
242-
maybeKeyValues: Option[String],
243-
configKey: String,
244-
keyValueType: String): Map[String, String] = {
245-
maybeKeyValues.map(keyValues => {
246-
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
247-
keyValue.split("=", 2).toSeq match {
248-
case Seq(k, v) =>
249-
(k, v)
250-
case _ =>
251-
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
252-
s" comma-separated list of key-value pairs, with format <key>=<value>." +
253-
s" Got value: $keyValue. All values: $keyValues")
254-
}
255-
}).toMap
256-
}).getOrElse(Map.empty[String, String])
257-
}
258239
}
259240

260241
private[spark] object Client {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
5555
private val executorExtraClasspath = conf.get(
5656
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
5757
private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
58+
59+
private val executorLabels = ConfigurationUtils.parseKeyValuePairs(
60+
conf.get(KUBERNETES_EXECUTOR_LABELS),
61+
KUBERNETES_EXECUTOR_LABELS.key,
62+
"executor labels")
63+
require(
64+
!executorLabels.contains(SPARK_APP_ID_LABEL),
65+
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
66+
require(
67+
!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
68+
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
69+
s" Spark.")
70+
private val executorAnnotations = ConfigurationUtils.parseKeyValuePairs(
71+
conf.get(KUBERNETES_EXECUTOR_ANNOTATIONS),
72+
KUBERNETES_EXECUTOR_ANNOTATIONS.key,
73+
"executor annotations")
74+
5875
private var shufflePodCache: Option[ShufflePodCache] = None
5976
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
6077
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
@@ -250,8 +267,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
250267
// executorId and applicationId
251268
val hostname = name.substring(Math.max(0, name.length - 63))
252269

253-
val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
254-
SPARK_APP_ID_LABEL -> applicationId()).asJava
270+
val resolvedExecutorLabels = Map(
271+
SPARK_EXECUTOR_ID_LABEL -> executorId,
272+
SPARK_APP_ID_LABEL -> applicationId()) ++
273+
executorLabels
255274
val executorMemoryQuantity = new QuantityBuilder(false)
256275
.withAmount(s"${executorMemoryMb}M")
257276
.build()
@@ -300,7 +319,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
300319
val basePodBuilder = new PodBuilder()
301320
.withNewMetadata()
302321
.withName(name)
303-
.withLabels(selectors)
322+
.withLabels(resolvedExecutorLabels.asJava)
323+
.withAnnotations(executorAnnotations.asJava)
304324
.withOwnerReferences()
305325
.addNewOwnerReference()
306326
.withController(true)

0 commit comments

Comments
 (0)