Skip to content

Commit bcde111

Browse files
tangzhankunash211
authored andcommitted
Allow configuration to set environment variables on driver and executor
* allow configuration to set environment variables on driver and executor as below: --conf spark.executorEnv.[EnvironmentVariableName] --conf spark.driverEnv.[EnvironmentVariableName] * change the driver environment key prefix to spark.kubernetes.driverEnv.
1 parent 1bb713b commit bcde111

File tree

5 files changed

+36
-3
lines changed

5 files changed

+36
-3
lines changed

docs/running-on-kubernetes.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,22 @@ from the other deployment modes. See the [configuration page](configuration.html
770770
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
771771
</td>
772772
</tr>
773+
<tr>
774+
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
775+
<td>(none)</td>
776+
<td>
777+
Add the environment variable specified by <code>EnvironmentVariableName</code> to
778+
the Executor process. The user can specify multiple of these to set multiple environment variables.
779+
</td>
780+
</tr>
781+
<tr>
782+
<td><code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
783+
<td>(none)</td>
784+
<td>
785+
Add the environment variable specified by <code>EnvironmentVariableName</code> to
786+
the Driver process. The user can specify multiple of these to set multiple environment variables.
787+
</td>
788+
</tr>
773789
</table>
774790

775791

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ package object config extends Logging {
126126
.stringConf
127127
.createOptional
128128

129+
private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
130+
129131
private[spark] val KUBERNETES_DRIVER_ANNOTATIONS =
130132
ConfigBuilder("spark.kubernetes.driver.annotations")
131133
.doc("Custom annotations that will be added to the driver pod. This should be a" +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ private[spark] class BaseDriverConfigurationStep(
7272
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
7373
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
7474
s" Spark bookkeeping operations.")
75+
76+
val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
77+
.map(env => new EnvVarBuilder()
78+
.withName(env._1)
79+
.withValue(env._2)
80+
.build())
81+
7582
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
7683
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
7784
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
@@ -91,6 +98,7 @@ private[spark] class BaseDriverConfigurationStep(
9198
.withName(DRIVER_CONTAINER_NAME)
9299
.withImage(driverDockerImage)
93100
.withImagePullPolicy(dockerImagePullPolicy)
101+
.addAllToEnv(driverCustomEnvs.asJava)
94102
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
95103
.addNewEnv()
96104
.withName(ENV_DRIVER_MEMORY)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,15 +457,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
457457
.withValue(cp)
458458
.build()
459459
}
460-
val requiredEnv = Seq(
460+
val requiredEnv = (Seq(
461461
(ENV_EXECUTOR_PORT, executorPort.toString),
462462
(ENV_DRIVER_URL, driverUrl),
463463
// Executor backend expects integral value for executor cores, so round it up to an int.
464464
(ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
465465
(ENV_EXECUTOR_MEMORY, executorMemoryString),
466466
(ENV_APPLICATION_ID, applicationId()),
467467
(ENV_EXECUTOR_ID, executorId),
468-
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*"))
468+
(ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ sc.executorEnvs.toSeq)
469469
.map(env => new EnvVarBuilder()
470470
.withName(env._1)
471471
.withValue(env._2)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStepSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
3636
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
3737
private val DEPRECATED_CUSTOM_ANNOTATION_KEY = "customAnnotationDeprecated"
3838
private val DEPRECATED_CUSTOM_ANNOTATION_VALUE = "customAnnotationDeprecatedValue"
39+
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
40+
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"
3941

4042
test("Set all possible configurations from the user.") {
4143
val sparkConf = new SparkConf()
@@ -49,6 +51,9 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
4951
.set(s"spark.kubernetes.driver.annotation.$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
5052
.set("spark.kubernetes.driver.annotations",
5153
s"$DEPRECATED_CUSTOM_ANNOTATION_KEY=$DEPRECATED_CUSTOM_ANNOTATION_VALUE")
54+
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
55+
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
56+
5257
val submissionStep = new BaseDriverConfigurationStep(
5358
APP_ID,
5459
RESOURCE_NAME_PREFIX,
@@ -74,11 +79,13 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
7479
.asScala
7580
.map(env => (env.getName, env.getValue))
7681
.toMap
77-
assert(envs.size === 4)
82+
assert(envs.size === 6)
7883
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-exmaples.jar")
7984
assert(envs(ENV_DRIVER_MEMORY) === "456m")
8085
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
8186
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
87+
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
88+
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
8289
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
8390
val requests = resourceRequirements.getRequests.asScala
8491
assert(requests("cpu").getAmount === "2")

0 commit comments

Comments
 (0)