@@ -20,6 +20,7 @@ package org.apache.spark.deploy
20
20
import java .util .concurrent .atomic .AtomicReference
21
21
22
22
import org .apache .spark .SparkConf
23
+ import org .apache .spark .SparkContext
23
24
import org .apache .spark .api .conda .CondaEnvironment
24
25
import org .apache .spark .api .conda .CondaEnvironmentManager
25
26
import org .apache .spark .internal .Logging
@@ -33,31 +34,46 @@ import org.apache.spark.util.Utils
33
34
abstract class CondaRunner extends Logging {
34
35
final def main (args : Array [String ]): Unit = {
35
36
val sparkConf = new SparkConf ()
37
+ run(args, CondaRunner .setupCondaEnvironmentAutomatically(sparkConf))
38
+ }
36
39
40
+ def run (args : Array [String ], maybeConda : Option [CondaEnvironment ]): Unit
41
+ }
42
+
43
+ object CondaRunner {
44
+ private [spark] val condaEnvironment : AtomicReference [Option [CondaEnvironment ]] =
45
+ new AtomicReference (None )
46
+
47
+ /**
48
+ * Sets up a conda environment if [[CondaEnvironmentManager.isConfigured ]] returns true.
49
+ * Once an environment has been set up, calling this method again (or the [[main ]] method)
50
+ * will throw a [[RuntimeException ]].
51
+ */
52
+ def setupCondaEnvironmentAutomatically (sparkConf : SparkConf ): Option [CondaEnvironment ] = {
37
53
if (CondaEnvironmentManager .isConfigured(sparkConf)) {
38
54
val condaBootstrapDeps = sparkConf.get(CONDA_BOOTSTRAP_PACKAGES )
39
55
val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS )
40
56
val condaBaseDir = Utils .createTempDir(Utils .getLocalDir(sparkConf), " conda" ).getAbsolutePath
41
57
val condaEnvironmentManager = CondaEnvironmentManager .fromConf(sparkConf)
42
58
val environment = condaEnvironmentManager
43
- .create(condaBaseDir, condaBootstrapDeps, condaChannelUrls)
44
-
45
- // Save this as a global in order for SparkContext to be able to access it later, in case we
46
- // are shelling out, but providing a bridge back into this JVM.
47
- require(CondaRunner .condaEnvironment.compareAndSet(None , Some (environment)),
48
- " Couldn't set condaEnvironment to the newly created environment, it was already set to: "
49
- + CondaRunner .condaEnvironment.get())
50
-
51
- run(args, Some (environment))
59
+ .create(condaBaseDir, condaBootstrapDeps, condaChannelUrls)
60
+ setCondaEnvironment(environment)
61
+ Some (environment)
52
62
} else {
53
- run(args, None )
63
+ None
54
64
}
55
65
}
56
66
57
- def run (args : Array [String ], maybeConda : Option [CondaEnvironment ]): Unit
58
- }
59
-
60
- object CondaRunner {
61
- private [spark] val condaEnvironment : AtomicReference [Option [CondaEnvironment ]] =
62
- new AtomicReference (None )
67
+ /**
68
+ * Sets the given environment as the global environment, which will be accessible by calling
69
+ * [[SparkContext.condaEnvironment ]]. This method can only be called once! If an environment
70
+ * has already been set, calling this method again will throw a [[RuntimeException ]].
71
+ */
72
+ def setCondaEnvironment (environment : CondaEnvironment ): Unit = {
73
+ // Save this as a global in order for SparkContext to be able to access it later, in case we
74
+ // are shelling out, but providing a bridge back into this JVM.
75
+ require(CondaRunner .condaEnvironment.compareAndSet(None , Some (environment)),
76
+ " Couldn't set condaEnvironment to the newly created environment, it was already set to: "
77
+ + CondaRunner .condaEnvironment.get())
78
+ }
63
79
}
0 commit comments