Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 372ae41

Browse files
tangzhankunerikerlandson
authored andcommitted
Allow configuration to set environment variables on driver and executor (#424)
* allow configuration to set environment variables on driver and executor as below: --conf spark.executorEnv.[EnvironmentVariableName] --conf spark.driverEnv.[EnvironmentVariableName] * change the driver environment key prefix to spark.kubernetes.driverEnv.
1 parent 24cd9ee commit 372ae41

File tree

5 files changed

+36
-3
lines changed

5 files changed

+36
-3
lines changed

docs/running-on-kubernetes.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,22 @@ from the other deployment modes. See the [configuration page](configuration.html
768768
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
769769
</td>
770770
</tr>
771+
<tr>
772+
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
773+
<td>(none)</td>
774+
<td>
775+
Add the environment variable specified by <code>EnvironmentVariableName</code> to
776+
the Executor process. The user can specify multiple of these to set multiple environment variables.
777+
</td>
778+
</tr>
779+
<tr>
780+
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
781+
<td>(none)</td>
782+
<td>
783+
Add the environment variable specified by <code>EnvironmentVariableName</code> to
784+
the Driver process. The user can specify multiple of these to set multiple environment variables.
785+
</td>
786+
</tr>
771787
</table>
772788

773789

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ package object config extends Logging {
126126
.stringConf
127127
.createOptional
128128

129+
private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
130+
129131
private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
130132
ConfigBuilder("spark.kubernetes.driver.annotations")
131133
.doc("Custom annotations that will be added to the driver pod. This should be a" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep(
7272
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
7373
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
7474
s" Spark bookkeeping operations.")
75+
76+
val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
77+
.map(env => new EnvVarBuilder()
78+
.withName(env._1)
79+
.withValue(env._2)
80+
.build())
81+
7582
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
7683
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
7784
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
@@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep(
9198
.withName(DRIVER_CONTAINER_NAME)
9299
.withImage(driverDockerImage)
93100
.withImagePullPolicy(dockerImagePullPolicy)
101+
.addAllToEnv(driverCustomEnvs.asJava)
94102
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
95103
.addNewEnv()
96104
.withName(ENV_DRIVER_MEMORY)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
455455
.withValue(cp)
456456
.build()
457457
}
458-
val requiredEnv = Seq(
458+
val requiredEnv = (Seq(
459459
(ENV_EXECUTOR_PORT, executorPort.toString),
460460
(ENV_DRIVER_URL, driverUrl),
461461
// Executor backend expects integral value for executor cores, so round it up to an int.
462462
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
463463
(ENV_EXECUTOR_MEMORY, executorMemoryString),
464464
(ENV_APPLICATION_ID, applicationId()),
465465
(ENV_EXECUTOR_ID, executorId),
466-
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*"))
466+
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq)
467467
.map(env => new EnvVarBuilder()
468468
.withName(env._1)
469469
.withValue(env._2)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
3636
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
3737
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
3838
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"
39+
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
40+
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
3941

4042
test("Set all possible configurations from the user.") {
4143
val sparkConf = new SparkConf()
@@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
4951
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
5052
.set("spark.kubernetes.driver.annotations",
5153
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
54+
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
55+
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
56+
5257
val submissionStep = new BaseDriverConfigurationStep(
5358
APP_ID,
5459
RESOURCE_NAME_PREFIX,
@@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
7479
.asScala
7580
.map(env => (env.getName, env.getValue))
7681
.toMap
77-
assert(envs.size === 4)
82+
assert(envs.size === 6)
7883
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar")
7984
assert(envs(ENV_DRIVER_MEMORY) === "456m")
8085
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
8186
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
87+
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
88+
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
8289
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
8390
val requests = resourceRequirements.getRequests.asScala
8491
assert(requests("cpu").getAmount === "2")

0 commit comments

Comments
 (0)