Skip to content

Commit cd6073c

Browse files
mnazbroRobert Kruszewski
authored andcommitted
Improve sharing of conda environments across Python and R
By storing previous environments that are the same within a single Java executor process, we are able to reuse environments that we know are exactly the same. This cache needs to be protected by a lock to prevent concurrent modifications since we do not want extra copies of the environment on disk.
1 parent 125397d commit cd6073c

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import java.nio.file.Files
2020
import java.nio.file.Path
2121
import java.nio.file.Paths
2222
import java.nio.file.attribute.PosixFilePermission
23+
import java.util.concurrent.locks.{Lock, ReentrantLock}
2324
import java.util.regex.Pattern
2425

2526
import scala.collection.JavaConverters._
27+
import scala.collection.mutable
2628
import scala.sys.process.BasicIO
2729
import scala.sys.process.Process
2830
import scala.sys.process.ProcessBuilder
@@ -211,6 +213,9 @@ object CondaEnvironmentManager extends Logging {
211213
private[this] val httpUrlToken =
212214
Pattern.compile("(\\b\\w+://[^:/@]*:)([^/@]+)(?=@([\\w-.]+)(:\\d+)?\\b)")
213215

216+
private val initializedEnvironmentsLock = new ReentrantLock()
217+
private val initializedEnvironments = mutable.HashMap[CondaSetupInstructions, CondaEnvironment]()
218+
214219
private[conda] def redactCredentials(line: String): String = {
215220
httpUrlToken.matcher(line).replaceAll("$1<password>")
216221
}
@@ -241,4 +246,22 @@ object CondaEnvironmentManager extends Logging {
241246
condaEnvManager.create(envDir, condaPackages, instructions.channels, instructions.extraArgs)
242247
}
243248

249+
/**
250+
* Helper method that will cache precreated conda environments.
251+
*/
252+
def createOrGetCondaEnvironment(instructions: CondaSetupInstructions): CondaEnvironment = {
253+
initializedEnvironmentsLock.lock()
254+
try {
255+
initializedEnvironments.get(instructions) match {
256+
case Some(condaEnv) => condaEnv
257+
case None =>
258+
val condaEnv = createCondaEnvironment(instructions)
259+
initializedEnvironments.put(instructions, condaEnv)
260+
condaEnv
261+
}
262+
} finally {
263+
initializedEnvironmentsLock.unlock()
264+
}
265+
}
266+
244267
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ private[spark] class PythonWorkerFactory(requestedPythonExec: Option[String],
9494

9595
private[this] val condaEnv = {
9696
// Set up conda environment if there are any conda packages requested
97-
condaInstructions.map(CondaEnvironmentManager.createCondaEnvironment)
97+
condaInstructions.map(CondaEnvironmentManager.createOrGetCondaEnvironment)
9898
}
9999

100100
private[this] val envVars: Map[String, String] = {

core/src/main/scala/org/apache/spark/api/r/RRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ private[r] object RRunner {
348348
val sparkConf = SparkEnv.get.conf
349349
val requestedRCommand = Provenance.fromConf("spark.r.command")
350350
.orElse(Provenance.fromConf("spark.sparkr.r.command"))
351-
val condaEnv = condaSetupInstructions.map(CondaEnvironmentManager.createCondaEnvironment)
351+
val condaEnv = condaSetupInstructions.map(CondaEnvironmentManager.createOrGetCondaEnvironment)
352352
val rCommand = condaEnv.map { conda =>
353353
requestedRCommand.foreach(exec => sys.error(s"It's forbidden to set the r executable " +
354354
s"when using conda, but found: $exec"))

0 commit comments

Comments
 (0)