Skip to content

Commit cfbb7b7

Browse files
authored
Merge pull request #15 from riptano/DSP-15163-dse-2
DSP-15163 use dse script to start context in separate jvm
2 parents 0a135ae + 3ebce33 commit cfbb7b7

File tree

4 files changed

+44
-27
lines changed

4 files changed

+44
-27
lines changed

bin/manager_start.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ get_abs_script_path
1414
. $appdir/setenv.sh
1515

1616
# Override logging options to provide per-context logging
17-
LOGGING_OPTS="-Dlog4j.configuration=file:$appdir/log4j-server.properties
17+
LOGGING_OPTS="$LOGGING_OPTS_FILE
1818
-DLOG_DIR=$1"
1919

2020
GC_OPTS="-XX:+UseConcMarkSweepGC
@@ -40,5 +40,5 @@ else
4040
$appdir/spark-job-server.jar $1 $2 $conffile'
4141
fi
4242

43-
eval $cmd > /dev/null 2>&1 &
43+
eval $cmd > /dev/null 2>&1
4444
# exec java -cp $CLASSPATH $GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES $MAIN $1 $2 $conffile 2>&1 &

bin/setenv.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ if [ -z "$LOG_DIR" ]; then
5353
fi
5454
mkdir -p $LOG_DIR
5555

56-
LOGGING_OPTS="-Dlogback.configurationFile=file:$appdir/logback-server.xml
57-
-DLOG_DIR=$LOG_DIR"
56+
# used in server_start and in manager_start
57+
LOGGING_OPTS_FILE="-Dlogback.configurationFile=file:$appdir/logback-server.xml"
58+
59+
LOGGING_OPTS="$LOGGING_OPTS_FILE -DLOG_DIR=$LOG_DIR"
5860

5961
if [ -z "$JMX_PORT" ]; then
6062
JMX_PORT=9999

job-server/config/dse.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,8 @@ spark {
5656

5757
# Note that you can use this file to define settings not only for job server,
5858
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults.
59+
60+
61+
deploy {
62+
manager-start-cmd = "dse spark-jobserver context-per-jvm-managed-start"
63+
}

job-server/src/main/scala/spark/jobserver/AkkaClusterSupervisorActor.scala

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,23 @@ package spark.jobserver
33
import java.io.IOException
44
import java.nio.file.{Files, Paths}
55
import java.nio.charset.Charset
6-
import java.util.concurrent.TimeUnit
6+
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
77

88
import akka.actor._
99
import akka.cluster.Cluster
1010
import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberEvent, MemberUp}
1111
import akka.util.Timeout
12+
import com.google.common.util.concurrent.ThreadFactoryBuilder
1213
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
1314
import spark.jobserver.util.SparkJobUtils
15+
1416
import scala.collection.mutable
1517
import scala.util.{Failure, Success, Try}
1618
import scala.sys.process._
17-
1819
import spark.jobserver.common.akka.InstrumentedActor
1920

21+
import scala.collection.concurrent.TrieMap
22+
2023
/**
2124
* The AkkaClusterSupervisorActor launches Spark Contexts as external processes
2225
* that connect back with the master node via Akka Cluster.
@@ -51,8 +54,10 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
5154
//TODO: try to pass this state to the jobManager at start instead of having to track
5255
//extra state. What happens if the WebApi process dies before the forked process
5356
//starts up? Then it never gets initialized, and this state disappears.
54-
private val contextInitInfos = mutable.HashMap.empty[String, (Boolean, ActorRef => Unit, Throwable => Unit)]
55-
57+
private val contextInitInfos = TrieMap.empty[String, (Boolean, ActorRef => Unit, Throwable => Unit)]
58+
private val contextInitExecutorService = Executors.newCachedThreadPool(
59+
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("job-server-context-init-thread -% d").build
60+
)
5661
// actor name -> (JobManagerActor ref, ResultActor ref)
5762
private val contexts = mutable.HashMap.empty[String, (ActorRef, ActorRef)]
5863

@@ -214,26 +219,31 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
214219
cmdString = cmdString + s" ${contextConfig.getString(SparkJobUtils.SPARK_PROXY_USER_PARAM)}"
215220
}
216221

217-
val pb = Process(cmdString)
218-
val pio = new ProcessIO(_ => (),
219-
stdout => scala.io.Source.fromInputStream(stdout)
220-
.getLines.foreach(println),
221-
stderr => scala.io.Source.fromInputStream(stderr).getLines().foreach(println))
222-
logger.info("Starting to execute sub process {}", pb)
223-
val processStart = Try {
224-
val process = pb.run(pio)
225-
val exitVal = process.exitValue()
226-
if (exitVal != 0) {
227-
throw new IOException("Failed to launch context process, got exit code " + exitVal)
228-
}
229-
}
230-
231-
if (processStart.isSuccess) {
232-
contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
233-
} else {
234-
failureFunc(processStart.failed.get)
235-
}
222+
contextInitInfos(contextActorName) = (isAdHoc, successFunc, failureFunc)
223+
224+
contextInitExecutorService.submit(new Runnable {
225+
override def run(): Unit = {
226+
val pb = Process(cmdString)
227+
val pio = new ProcessIO(_ => (),
228+
stdout => scala.io.Source.fromInputStream(stdout)
229+
.getLines.foreach(println),
230+
stderr => scala.io.Source.fromInputStream(stderr).getLines().foreach(println))
231+
232+
logger.info("Starting to execute sub process {}", pb)
233+
val processStart = Try {
234+
val process = pb.run(pio)
235+
val exitVal = process.exitValue()
236+
if (exitVal != 0) {
237+
throw new IOException("Failed to launch context process, got exit code " + exitVal)
238+
}
239+
}
236240

241+
if (processStart.isFailure) {
242+
failureFunc(processStart.failed.get)
243+
contextInitInfos.remove(contextActorName)
244+
}
245+
}
246+
})
237247
}
238248

239249
private def createContextDir(name: String,

0 commit comments

Comments
 (0)