1
1
package spark .jobserver
2
2
3
- import java .io .IOException
4
3
import java .nio .file .{Files , Paths }
5
4
import java .nio .charset .Charset
6
5
import java .util .concurrent .TimeUnit
@@ -20,6 +19,7 @@ import spark.jobserver.common.akka.InstrumentedActor
20
19
import scala .concurrent .Await
21
20
import akka .pattern .gracefulStop
22
21
import org .joda .time .DateTime
22
+ import org .slf4j .LoggerFactory
23
23
import spark .jobserver .io .JobDAOActor .CleanContextJobInfos
24
24
25
25
/**
@@ -37,7 +37,6 @@ import spark.jobserver.io.JobDAOActor.CleanContextJobInfos
37
37
* {{{
38
38
* deploy {
39
39
* manager-start-cmd = "./manager_start.sh"
40
- * wait-for-manager-start = true
41
40
* }
42
41
* }}}
43
42
*/
@@ -54,7 +53,6 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef, dataManagerActor: ActorRef)
54
53
TimeUnit .SECONDS )
55
54
val contextDeletionTimeout = SparkJobUtils .getContextDeletionTimeout(config)
56
55
val managerStartCommand = config.getString(" deploy.manager-start-cmd" )
57
- val waitForManagerStart = config.getBoolean(" deploy.wait-for-manager-start" )
58
56
import context .dispatcher
59
57
60
58
// actor name -> (context isadhoc, success callback, failure callback)
@@ -238,28 +236,11 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef, dataManagerActor: ActorRef)
238
236
cmdString = cmdString + s " ${contextConfig.getString(SparkJobUtils .SPARK_PROXY_USER_PARAM )}"
239
237
}
240
238
241
- val pb = Process (cmdString)
242
- val pio = new ProcessIO (_ => (),
243
- stdout => scala.io.Source .fromInputStream(stdout)
244
- .getLines.foreach(println),
245
- stderr => scala.io.Source .fromInputStream(stderr).getLines().foreach(println))
246
- logger.info(" Starting to execute sub process {}" , pb)
247
- val processStart = Try {
248
- val process = pb.run(pio)
249
- if (waitForManagerStart) {
250
- val exitVal = process.exitValue()
251
- if (exitVal != 0 ) {
252
- throw new IOException (" Failed to launch context process, got exit code " + exitVal)
253
- }
254
- }
255
- }
256
-
257
- if (processStart.isSuccess) {
258
- contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
259
- } else {
260
- failureFunc(processStart.failed.get)
261
- }
239
+ val contextLogger = LoggerFactory .getLogger(" manager_start" )
240
+ val process = Process (cmdString.split(" " ))
241
+ process.run(ProcessLogger (out => contextLogger.info(out), err => contextLogger.warn(err)))
262
242
243
+ contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
263
244
}
264
245
265
246
private def createContextDir (name : String ,
0 commit comments