Skip to content

Commit d172c9c

Browse files
geremihbulldozer-bot[bot]
authored andcommitted
Allow configuring conda env variables (apache-spark-on-k8s#468)
Allow passing env variables for conda so that we can enable instrumentation/other flags when required.
1 parent a26a7a2 commit d172c9c

File tree

4 files changed

+69
-12
lines changed

4 files changed

+69
-12
lines changed

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironment.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ final class CondaEnvironment(val manager: CondaEnvironmentManager,
3838
val envName: String,
3939
bootstrapPackages: Seq[String],
4040
bootstrapChannels: Seq[String],
41-
extraArgs: Seq[String] = Nil) extends Logging {
41+
extraArgs: Seq[String] = Nil,
42+
envVars: Map[String, String] = Map.empty) extends Logging {
4243

4344
import CondaEnvironment._
4445

@@ -75,7 +76,8 @@ final class CondaEnvironment(val manager: CondaEnvironmentManager,
7576
::: extraArgs.toList
7677
::: "--" :: packages.toList,
7778
description = s"install dependencies in conda env $condaEnvDir",
78-
channels = channels.iterator.map(_.url).toList
79+
channels = channels.iterator.map(_.url).toList,
80+
envVars = envVars
7981
)
8082

8183
this.packages ++= packages
@@ -96,7 +98,7 @@ final class CondaEnvironment(val manager: CondaEnvironmentManager,
9698
* This is for sending the instructions to the executors so they can replicate the same steps.
9799
*/
98100
def buildSetupInstructions: CondaSetupInstructions = {
99-
CondaSetupInstructions(packages.toList, channels.toList, extraArgs)
101+
CondaSetupInstructions(packages.toList, channels.toList, extraArgs, envVars)
100102
}
101103
}
102104

@@ -155,7 +157,8 @@ object CondaEnvironment {
155157
case class CondaSetupInstructions(
156158
packages: Seq[String],
157159
unauthenticatedChannels: Seq[UnauthenticatedChannel],
158-
extraArgs: Seq[String])
160+
extraArgs: Seq[String],
161+
envVars: Map[String, String])
159162
(userInfos: Map[UnauthenticatedChannel, String]) {
160163
require(unauthenticatedChannels.nonEmpty)
161164
require(packages.nonEmpty)
@@ -167,10 +170,11 @@ object CondaEnvironment {
167170
}
168171

169172
object CondaSetupInstructions {
170-
def apply(packages: Seq[String], channels: Seq[AuthenticatedChannel], extraArgs: Seq[String])
173+
def apply(packages: Seq[String], channels: Seq[AuthenticatedChannel], extraArgs: Seq[String],
174+
envVars: Map[String, String])
171175
: CondaSetupInstructions = {
172176
val ChannelsWithCreds(unauthed, userInfos) = unauthenticateChannels(channels)
173-
CondaSetupInstructions(packages, unauthed, extraArgs)(userInfos)
177+
CondaSetupInstructions(packages, unauthed, extraArgs, envVars)(userInfos)
174178
}
175179
}
176180
}

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
7373
baseDir: String,
7474
condaPackages: Seq[String],
7575
condaChannelUrls: Seq[String],
76-
condaExtraArgs: Seq[String] = Nil): CondaEnvironment = {
76+
condaExtraArgs: Seq[String] = Nil,
77+
condaEnvVars: Map[String, String] = Map.empty): CondaEnvironment = {
7778
require(condaPackages.nonEmpty, "Expected at least one conda package.")
7879
require(condaChannelUrls.nonEmpty, "Can't have an empty list of conda channel URLs")
7980
val name = "conda-env"
@@ -95,7 +96,8 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
9596
::: verbosityFlags
9697
::: "--" :: condaPackages.toList,
9798
description = "create conda env",
98-
channels = condaChannelUrls.toList
99+
channels = condaChannelUrls.toList,
100+
envVars = condaEnvVars
99101
)
100102

101103
new CondaEnvironment(this, linkedBaseDir, name, condaPackages, condaChannelUrls, condaExtraArgs)
@@ -139,7 +141,8 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
139141
private[conda] def runCondaProcess(baseRoot: Path,
140142
args: List[String],
141143
channels: List[String],
142-
description: String): Unit = {
144+
description: String,
145+
envVars: Map[String, String]): Unit = {
143146
val condarc = generateCondarc(baseRoot, channels)
144147
val fakeHomeDir = baseRoot.resolve("home")
145148
// Attempt to create fake home dir
@@ -148,7 +151,7 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
148151
val extraEnv = List(
149152
"CONDARC" -> condarc.toString,
150153
"HOME" -> fakeHomeDir.toString
151-
)
154+
) ++ envVars
152155

153156
val command = Process(
154157
condaBinaryPath :: args,

core/src/main/scala/org/apache/spark/deploy/CondaRunner.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,35 @@ object CondaRunner {
5454
val condaBootstrapDeps = sparkConf.get(CONDA_BOOTSTRAP_PACKAGES)
5555
val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS)
5656
val condaExtraArgs = sparkConf.get(CONDA_EXTRA_ARGUMENTS)
57+
val condaEnvVariables = extractEnvVariables(sparkConf)
5758
val condaBaseDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), "conda").getAbsolutePath
5859
val condaEnvironmentManager = CondaEnvironmentManager.fromConf(sparkConf)
59-
val environment = condaEnvironmentManager
60-
.create(condaBaseDir, condaBootstrapDeps, condaChannelUrls, condaExtraArgs)
60+
val environment =
61+
condaEnvironmentManager.create(
62+
condaBaseDir,
63+
condaBootstrapDeps,
64+
condaChannelUrls,
65+
condaExtraArgs,
66+
condaEnvVariables)
6167
setCondaEnvironment(environment)
6268
Some(environment)
6369
} else {
6470
None
6571
}
6672
}
6773

74+
/**
75+
* Extracts environment variables specified in the form
76+
* "spark.conda.env.[EnvironmentVariableName]" from the sparkConf.
77+
*/
78+
def extractEnvVariables(sparkConf: SparkConf): Map[String, String] = {
79+
val condaEnvPrefix = "spark.conda.env."
80+
sparkConf.getAll
81+
.filter { case (k, v) => k.startsWith(condaEnvPrefix) }
82+
.map { case (k, v) => (k.substring(condaEnvPrefix.length), v) }
83+
.toMap
84+
}
85+
6886
/**
6987
* Sets the given environment as the global environment, which will be accessible by calling
7088
* [[SparkContext.condaEnvironment]]. This method can only be called once! If an environment
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
import org.apache.spark.{SparkConf, SparkFunSuite}
21+
22+
class CondaRunnerSuite extends SparkFunSuite {
23+
24+
test("correctly reads conda env vars") {
25+
val conf = new SparkConf
26+
conf.set("spark.conda.env.key1", "value1")
27+
conf.set("spark.conda.env.key2", "value2")
28+
val expected = Map("key1" -> "value1", "key2" -> "value2")
29+
assert(CondaRunner.extractEnvVariables(conf) == expected)
30+
}
31+
32+
}

0 commit comments

Comments
 (0)