@@ -3,20 +3,23 @@ package spark.jobserver
3
3
import java .io .IOException
4
4
import java .nio .file .{Files , Paths }
5
5
import java .nio .charset .Charset
6
- import java .util .concurrent .TimeUnit
6
+ import java .util .concurrent .{ ExecutorService , Executors , TimeUnit }
7
7
8
8
import akka .actor ._
9
9
import akka .cluster .Cluster
10
10
import akka .cluster .ClusterEvent .{InitialStateAsEvents , MemberEvent , MemberUp }
11
11
import akka .util .Timeout
12
+ import com .google .common .util .concurrent .ThreadFactoryBuilder
12
13
import com .typesafe .config .{Config , ConfigFactory , ConfigRenderOptions }
13
14
import spark .jobserver .util .SparkJobUtils
15
+
14
16
import scala .collection .mutable
15
17
import scala .util .{Failure , Success , Try }
16
18
import scala .sys .process ._
17
-
18
19
import spark .jobserver .common .akka .InstrumentedActor
19
20
21
+ import scala .collection .concurrent .TrieMap
22
+
20
23
/**
21
24
* The AkkaClusterSupervisorActor launches Spark Contexts as external processes
22
25
* that connect back with the master node via Akka Cluster.
@@ -51,8 +54,10 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
51
54
// TODO: try to pass this state to the jobManager at start instead of having to track
52
55
// extra state. What happens if the WebApi process dies before the forked process
53
56
// starts up? Then it never gets initialized, and this state disappears.
54
- private val contextInitInfos = mutable.HashMap .empty[String , (Boolean , ActorRef => Unit , Throwable => Unit )]
55
-
57
+ private val contextInitInfos = TrieMap .empty[String , (Boolean , ActorRef => Unit , Throwable => Unit )]
58
+ private val contextInitExecutorService = Executors .newCachedThreadPool(
59
+ new ThreadFactoryBuilder ().setDaemon(true ).setNameFormat(" job-server-context-init-thread -% d" ).build
60
+ )
56
61
// actor name -> (JobManagerActor ref, ResultActor ref)
57
62
private val contexts = mutable.HashMap .empty[String , (ActorRef , ActorRef )]
58
63
@@ -214,26 +219,31 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
214
219
cmdString = cmdString + s " ${contextConfig.getString(SparkJobUtils .SPARK_PROXY_USER_PARAM )}"
215
220
}
216
221
217
- val pb = Process (cmdString)
218
- val pio = new ProcessIO (_ => (),
219
- stdout => scala.io.Source .fromInputStream(stdout)
220
- .getLines.foreach(println),
221
- stderr => scala.io.Source .fromInputStream(stderr).getLines().foreach(println))
222
- logger.info(" Starting to execute sub process {}" , pb)
223
- val processStart = Try {
224
- val process = pb.run(pio)
225
- val exitVal = process.exitValue()
226
- if (exitVal != 0 ) {
227
- throw new IOException (" Failed to launch context process, got exit code " + exitVal)
228
- }
229
- }
230
-
231
- if (processStart.isSuccess) {
232
- contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
233
- } else {
234
- failureFunc(processStart.failed.get)
235
- }
222
+ contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
223
+
224
+ contextInitExecutorService.submit(new Runnable {
225
+ override def run (): Unit = {
226
+ val pb = Process (cmdString)
227
+ val pio = new ProcessIO (_ => (),
228
+ stdout => scala.io.Source .fromInputStream(stdout)
229
+ .getLines.foreach(println),
230
+ stderr => scala.io.Source .fromInputStream(stderr).getLines().foreach(println))
231
+
232
+ logger.info(" Starting to execute sub process {}" , pb)
233
+ val processStart = Try {
234
+ val process = pb.run(pio)
235
+ val exitVal = process.exitValue()
236
+ if (exitVal != 0 ) {
237
+ throw new IOException (" Failed to launch context process, got exit code " + exitVal)
238
+ }
239
+ }
236
240
241
+ if (processStart.isFailure) {
242
+ failureFunc(processStart.failed.get)
243
+ contextInitInfos.remove(contextActorName)
244
+ }
245
+ }
246
+ })
237
247
}
238
248
239
249
private def createContextDir (name : String ,
0 commit comments