Skip to content

Commit 3749a9e

Browse files
derSaschavelvia
authored andcommitted
feat(jobmanager): Add cluster support and transfer context conf via akka (spark-jobserver#927)
context config The job manager gets now initialized with cluster address and actor name as argument. After joining the cluster, the context config gets transferred via akka and the context initialized. This fixes spark-jobserver#915. spark master The following config options can be set and will be used in spark-submit too: spark.master can be local[...], yarn, spark://... or mesos://... spark.submit.deployMode can be client or cluster spark standalone cluster mode Spark does not transfer your job server config, jar and log4j config. You need to copy them on each host or adding them in e.g. HDFS and then set the following variable in your settings.sh: REMOTE_JOBSERVER_DIR If your don't set the remote dir, files are transferred via spark-submit --files... (this works fine with e.g. YARN). embedded H2 database Job server can now host the database instance for you: set spark.jobserver.startH2Server to true set spark.jobserver.sqldao.jdbc.url to "jdbc:h2:tcp://CLUSTER-IP:9092/h2-db;AUTO_RECONNECT=TRUE" Akka interface To run akka in a cluster (to interact with remote job manager) add the following config: akka { remote.netty.tcp { hostname = "FRONT-END-IP" } } This config ensures that akka binds to the right interface. The hostname option will be removed in the job manager config and the job then detects the right interface. Remote data files and jars PR spark-jobserver#924 contains a solution to access files uploaded via the data API (/data). Jars and other binaries are already transferred via SQL backend. logging Same behavior as before in client mode. Cluster mode use log4j-cluster.properties (same config as the server options but with log output to stderr instead of a logfile). This works well with YARN and spark standalone containers. Other information: Tested on virtual cluster with YARN and spark standalone. Might be work with Mesos too. This PR changes many files (constructor changes...). Code, test and doc changes are in its own commits to reduce the review complexity. Comments are welcome. * cleanup(config): remove default broadcast factory TorrentBroadcastFactory is already default since Spark 1.2 * feat(h2server): add embedded h2 server * tests(streaming): handle interrupted exception * stop context after each test * handle interrupted exception (thrown at context stop) * feat(deploy): context via akka and cluster mode Refactor deploy mode configuration: * use spark.master and spark.submit.deployMode config everywhere * provide context config via akka * support cluster deployment * add config files to spark-submit * add master and deploy mode from config to spark-submit * output log to stderr in cluster container * support remote job server dir * tests(deploy): context via akka and cluster mode * misc(codecov): ignore test jobs in coverage report * doc(cluster): add cluster deploy doc * tests(jobmanager): add job manager tests * move actor tests from JobManagerSpec into JobManagerActorSpecs * add job manager tests in JobManagerSpec * tests(jobserver): add job server tests
1 parent 8646810 commit 3749a9e

35 files changed

+927
-598
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ spark-jobserver provides a RESTful interface for submitting and managing [Apache
66
This repo contains the complete Spark job server project, including unit tests and deploy scripts.
77
It was originally started at [Ooyala](http://www.ooyala.com), but this is now the main development repo.
88

9-
Other useful links: [Troubleshooting Tips](doc/troubleshooting.md), [Yarn tips](doc/yarn.md), [Mesos tips](doc/mesos.md), [JMX tips](doc/jmx.md).
9+
Other useful links: [Troubleshooting](doc/troubleshooting.md), [cluster](doc/cluster.md), [YARN client](doc/yarn.md), [YARN on EMR](doc/EMR.md), [Mesos](doc/mesos.md), [JMX tips](doc/jmx.md).
1010

1111
Also see [Chinese docs / 中文](doc/chinese/job-server.md).
1212

@@ -103,7 +103,7 @@ Spark Job Server is now included in Datastax Enterprise 4.8!
103103
- Kill running jobs via stop context and delete job
104104
- Separate jar uploading step for faster job startup
105105
- Asynchronous and synchronous job API. Synchronous API is great for low latency jobs!
106-
- Works with Standalone Spark as well as Mesos and yarn-client
106+
- Works with Standalone Spark as well on [cluster](doc/cluster.md), [Mesos](doc/mesos.md), YARN [client](doc/yarn.md) and [on EMR](doc/EMR.md))
107107
- Job and jar info is persisted via a pluggable DAO interface
108108
- Named Objects (such as RDDs or DataFrames) to cache and retrieve RDDs or DataFrames by name, improving object sharing and reuse among jobs.
109109
- Supports Scala 2.10 and 2.11
@@ -568,6 +568,8 @@ curl -k --basic --user 'user:pw' https://localhost:8090/contexts
568568
569569
## Deployment
570570
571+
See also running on [cluster](doc/cluster.md), [YARN client](doc/yarn.md), on [EMR](doc/EMR.md) and running on [Mesos](doc/mesos.md).
572+
571573
### Manual steps
572574
573575
1. Copy `config/local.sh.template` to `<environment>.sh` and edit as appropriate. NOTE: be sure to set SPARK_VERSION if you need to compile against a different version.
@@ -906,8 +908,6 @@ To add to the underlying Hadoop configuration in a Spark context, add the hadoop
906908
907909
For the exact context configuration parameters, see JobManagerActor docs as well as application.conf.
908910
909-
Also see the [yarn doc](doc/yarn.md) for more tips.
910-
911911
### Other configuration settings
912912
913913
For all of the Spark Job Server configuration settings, see `job-server/src/main/resources/application.conf`.

bin/manager_start.sh

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22
# Script to start the job manager
3-
# args: <work dir for context> <cluster address> [proxy_user]
3+
# args: <master> <deployMode> <akkaAdress> <actorName> <workDir> [<proxyUser>]
44
set -e
55

66
get_abs_script_path() {
@@ -13,12 +13,8 @@ get_abs_script_path
1313

1414
. $appdir/setenv.sh
1515

16-
# Override logging options to provide per-context logging
17-
LOGGING_OPTS="-Dlog4j.configuration=file:$appdir/log4j-server.properties
18-
-DLOG_DIR=$2"
19-
2016
GC_OPTS="-XX:+UseConcMarkSweepGC
21-
-verbose:gc -XX:+PrintGCTimeStamps -Xloggc:$appdir/gc.out
17+
-verbose:gc -XX:+PrintGCTimeStamps
2218
-XX:MaxPermSize=512m
2319
-XX:+CMSClassUnloadingEnabled "
2420

@@ -27,26 +23,46 @@ JAVA_OPTS="-XX:MaxDirectMemorySize=$MAX_DIRECT_MEMORY
2723

2824
MAIN="spark.jobserver.JobManager"
2925

30-
MESOS_OPTS=""
31-
if [ $1 == "mesos-cluster" ]; then
32-
MESOS_OPTS="--master $MESOS_SPARK_DISPATCHER --deploy-mode cluster"
33-
appdir=$REMOTE_JOBSERVER_DIR
34-
fi
26+
# copy files via spark-submit and read them from current (container) dir
27+
if [ $2 = "cluster" -a -z "$REMOTE_JOBSERVER_DIR" ]; then
28+
SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS
29+
--master $1 --deploy-mode cluster
30+
--conf spark.yarn.submit.waitAppCompletion=false
31+
--files $appdir/log4j-cluster.properties,$conffile"
32+
JAR_FILE="$appdir/spark-job-server.jar"
33+
CONF_FILE=$(basename $conffile)
34+
LOGGING_OPTS="-Dlog4j.configuration=log4j-cluster.properties"
3535

36-
if [ ! -z $5 ]; then
37-
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory $JOBSERVER_MEMORY
38-
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS"
39-
--proxy-user $5
40-
$MESOS_OPTS
41-
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES"
42-
$appdir/spark-job-server.jar $2 $3 $4 $conffile'
36+
# use files in REMOTE_JOBSERVER_DIR
37+
elif [ $2 == "cluster" ]; then
38+
SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS
39+
--master $1 --deploy-mode cluster
40+
--conf spark.yarn.submit.waitAppCompletion=false"
41+
JAR_FILE="$REMOTE_JOBSERVER_DIR/spark-job-server.jar"
42+
CONF_FILE="$REMOTE_JOBSERVER_DIR/$(basename $conffile)"
43+
LOGGING_OPTS="-Dlog4j.configuration=$REMOTE_JOBSERVER_DIR/log4j-cluster.properties"
44+
45+
# client mode, use files from app dir
4346
else
44-
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory $JOBSERVER_MEMORY
45-
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS"
46-
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES"
47-
$MESOS_OPTS
48-
$appdir/spark-job-server.jar $2 $3 $4 $conffile'
47+
JAR_FILE="$appdir/spark-job-server.jar"
48+
CONF_FILE="$conffile"
49+
LOGGING_OPTS="-Dlog4j.configuration=file:$appdir/log4j-server.properties -DLOG_DIR=$5"
50+
GC_OPTS="$GC_OPTS -Xloggc:$5/gc.out"
51+
fi
52+
53+
if [ -n "$6" ]; then
54+
SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS --proxy-user $6"
55+
fi
56+
57+
if [ -n "$JOBSERVER_KEYTAB" ]; then
58+
SPARK_SUBMIT_OPTIONS="$SPARK_SUBMIT_OPTIONS --keytab $JOBSERVER_KEYTAB"
4959
fi
5060

51-
eval $cmd
61+
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory $JOBSERVER_MEMORY
62+
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS"
63+
$SPARK_SUBMIT_OPTIONS
64+
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES $SPARK_SUBMIT_JAVA_OPTIONS"
65+
$JAR_FILE $3 $4 $CONF_FILE'
66+
67+
eval $cmd 2>&1 > $5/spark-job-server.out
5268

bin/server_package.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pushd "${bin}/.." > /dev/null
5151
bin/setenv.sh
5252
${CONFIG_DIR}/${ENV}.conf
5353
config/shiro.ini
54+
config/log4j-cluster.properties
5455
config/log4j-server.properties"
5556

5657
rm -rf $WORK_DIR

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ lazy val jobServerTestJar = Project(id = "job-server-tests", base = file("job-se
4444
.settings(noPublishSettings)
4545
.dependsOn(jobServerApi)
4646
.disablePlugins(SbtScalariform)
47+
.disablePlugins(ScoverageSbtPlugin) // do not include in coverage report
4748

4849
lazy val jobServerApi = Project(id = "job-server-api", base = file("job-server-api"))
4950
.settings(commonSettings)
@@ -274,7 +275,7 @@ lazy val commonSettings = Defaults.coreDefaultSettings ++ dirSettings ++ implici
274275

275276
lazy val scoverageSettings = {
276277
// Semicolon-separated list of regexs matching classes to exclude
277-
coverageExcludedPackages := ".+Benchmark.*"
278+
coverageExcludedPackages := ".+Benchmark.*;.+Example.*;.+TestJob"
278279
}
279280

280281
lazy val publishSettings = Seq(

doc/EMR.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## Step by step instruction on how to run Spark Job Server on EMR 4.2.0 (Spark 1.6.0)
22

3+
See also running in [cluster mode](cluster.md), running [YARN in client mode](yarn.md) and running on [Mesos](Mesos.md).
4+
35
### Create EMR 4.2.0 cluster
46

57
Create EMR cluster using AWS EMR console or aws cli.

doc/cluster.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
## Configuring Job Server for YARN cluster mode
2+
3+
See also running [YARN in client mode](yarn.md), running [YARN on EMR](EMR.md) and running on [Mesos](mesos.md).
4+
5+
### Job Server configuration
6+
7+
Add the following properties in your job server config file:
8+
- set `spark.master` property to `yarn`, `spark://...` or `mesos://...`
9+
- set `spark.submit.deployMode` property to `cluster`
10+
- set `spark.jobserver.context-per-jvm` to `true`
11+
- set `akka.remote.netty.tcp.hostname` to the cluster interface of the host running the frontend
12+
- set `akka.remote.netty.tcp.maximum-frame-size` to support big remote jars fetch
13+
14+
Optional / required in spark standalone mode:
15+
- set `REMOTE_JOBSERVER_DIR` to `hdfs://...`, `file://...` or `http://...` in your settings `xxx.sh`
16+
- copy `spark-job-server.jar`, your job server config and `log4j-cluster.properties` file into this location
17+
18+
Example job server config (replace `CLUSTER-IP` with the internal IP of the host running the job server frontend):
19+
20+
spark {
21+
# deploy in yarn cluster mode
22+
master = yarn
23+
submit.deployMode = cluster
24+
25+
jobserver {
26+
context-per-jvm = true
27+
28+
# start a H2 DB server, reachable in your cluster
29+
sqldao {
30+
jdbc {
31+
url = "jdbc:h2:tcp://CLUSTER-IP:9092/h2-db;AUTO_RECONNECT=TRUE"
32+
}
33+
}
34+
startH2Server = false
35+
}
36+
}
37+
38+
# start akka on this interface, reachable from your cluster
39+
akka {
40+
remote.netty.tcp {
41+
hostname = "CLUSTER-IP"
42+
43+
# This controls the maximum message size, including job results, that can be sent
44+
maximum-frame-size = 100 MiB
45+
}
46+
}
47+
48+
Note:
49+
- YARN transfers the files provided via `--files` submit option into the cluster / container. Spark standalone does not support this in cluster mode and you have to transfer them manual.
50+
- Instead of running a H2 DB instance you can also run a real DB reachable inside your cluster. You can't use the default (host only) H2 configuration in a cluster setup.
51+
- Akka binds by [default](../job-server/src/main/resources/application.conf) to the local host interface and is not reachable from the cluster. You need to configure the akka hostname to the cluster internal address.
52+
53+
### Reading files uploaded via frontend
54+
55+
Files uploaded via the data API (`/data`) are stored on your job server frontend host.
56+
Call the [DataFileCache](../job-server-api/src/main/scala/spark/jobserver/api/SparkJobBase.scala) API implemented by the job environment in your spark jobs to access them:
57+
58+
```scala
59+
object RemoteDriverExample extends NewSparkJob {
60+
def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData): JobOutput =
61+
runtime.getDataFile(...)
62+
```
63+
64+
The job server transfers the files via akka to the host running your driver and caches them there.
65+
66+
Note: Files uploaded via the JAR or binary API are stored and transfered via the Job DB.

doc/mesos.md

Lines changed: 35 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,114 +1,64 @@
11
## Configuring Job Server for Mesos
22

3+
See also running on [cluster](cluster.md), YARN in [client mode](yarn.md) and running on [EMR](EMR.md).
4+
35
### Mesos client mode
46

5-
Configuring job-server for Mesos cluster mode is straight forward. All you need to change is `spark.master` config to
7+
Configuring job-server for Mesos client mode is straight forward. All you need to change is `spark.master` config to
68
point to Mesos master URL in job-server config file.
79

810
Example config file (important settings are marked with # important):
911

1012
spark {
11-
master = <mesos master URL here> # important, example: mesos://mesos-master:5050
12-
13-
# Default # of CPUs for jobs to use for Spark standalone cluster
14-
job-number-cpus = 4
15-
16-
jobserver {
17-
port = 8090
18-
jobdao = spark.jobserver.io.JobSqlDAO
19-
20-
sqldao {
21-
# Directory where default H2 driver stores its data. Only needed for H2.
22-
rootdir = /database
23-
24-
# Full JDBC URL / init string. Sorry, needs to match above.
25-
# Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass
26-
jdbc.url = "jdbc:h2:file:/database/h2-db"
27-
}
28-
}
29-
30-
# universal context configuration. These settings can be overridden, see README.md
31-
context-settings {
32-
num-cpu-cores = 2 # Number of cores to allocate. Required.
33-
memory-per-node = 512m # Executor memory per node, -Xmx style eg 512m, #1G, etc.
34-
}
13+
master = <mesos master URL here> # example: mesos://mesos-master:5050
3514
}
3615

37-
### Mesos cluster Mode
16+
### Mesos cluster mode
3817

3918
Configuring job-server for Mesos cluster mode is a bit tricky as compared to client mode.
4019

41-
Here is the checklist for the changes needed for the same:
42-
43-
- You need to start Mesos dispatcher in your cluster by running `./sbin/start-mesos-dispatcher.sh` available in
20+
You need to start Mesos dispatcher in your cluster by running `./sbin/start-mesos-dispatcher.sh` available in
4421
spark package. This step is not specific to job-server and as mentioned in [official spark documentation](https://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode) this is needed
4522
to submit spark job in Mesos cluster mode.
4623

47-
- Add following config at the end of job-server's settings.sh file:
48-
49-
```
50-
REMOTE_JOBSERVER_DIR=<path to job-server directory> # copy job-server directory on this location on all mesos agent nodes
51-
MESOS_SPARK_DISPATCHER=<mesos dispatcher URL> # example: mesos://mesos-dispatcher:7077
52-
```
53-
54-
- Set `spark.jobserver.driver-mode` property to `mesos-cluster` in job-server config file.
24+
Add the following config to you job-server config file:
25+
- set `spark.master` property to messos dispatcher URL (example: `mesos://mesos-dispatcher:7077`)
26+
- set `spark.submit.deployMode` property to `cluster`
27+
- set `spark.jobserver.context-per-jvm` to `true`
28+
- set `akka.remote.netty.tcp.hostname` to the cluster interface of the host running the frontend
29+
- set `akka.remote.netty.tcp.maximum-frame-size` to support big remote jars fetch
5530

56-
- Also override akka default configs in job-server config file to support big remote jars fetch, we have to set frame
57-
size to some large value, for example:
58-
59-
```
60-
akka.remote.netty.tcp {
61-
# use remote IP address to form akka cluster, not 127.0.0.1. This should be the IP of of the machine where the file
62-
# resides. That means for each mesos agents (where job-server directory is copied on REMOTE_JOBSERVER_DIR path),
63-
# the hostname should be the remote IP of that node.
64-
#
65-
hostname = "xxxxx"
66-
# This controls the maximum message size, including job results, that can be sent
67-
maximum-frame-size = 104857600b
68-
}
69-
```
70-
71-
- set `spark.master` to Mesos master URL (and not mesos-dispatcher URL).
72-
73-
- set `spark.jobserver.context-per-jvm` to `true` in job-server config file.
74-
75-
Example config file (important settings are marked with # important):
31+
Example job server config (replace `CLUSTER-IP` with the internal IP of the host running the job server frontend):
7632

7733
spark {
78-
master = <mesos master URL here> # important, example: mesos://mesos-master:5050
79-
80-
# Default # of CPUs for jobs to use for Spark standalone cluster
81-
job-number-cpus = 4
34+
master = <mesos dispatcher URL> # example: mesos://mesos-dispatcher:7077
35+
submit.deployMode = cluster
8236

8337
jobserver {
84-
port = 8090
85-
driver-mode = mesos-cluster #important
86-
context-per-jvm = true #important
87-
jobdao = spark.jobserver.io.JobSqlDAO
88-
89-
sqldao {
90-
# Directory where default H2 driver stores its data. Only needed for H2.
91-
rootdir = /database
38+
context-per-jvm = true
9239

93-
# Full JDBC URL / init string. Sorry, needs to match above.
94-
# Substitutions may be used to launch job-server, but leave it out here in the default or tests won't pass
95-
jdbc.url = "jdbc:h2:file:/database/h2-db"
40+
# start a H2 DB server, reachable in your cluster
41+
sqldao {
42+
jdbc {
43+
url = "jdbc:h2:tcp://CLUSTER-IP:9092/h2-db;AUTO_RECONNECT=TRUE"
44+
}
9645
}
97-
}
98-
99-
# universal context configuration. These settings can be overridden, see README.md
100-
context-settings {
101-
num-cpu-cores = 2 # Number of cores to allocate. Required.
102-
memory-per-node = 512m # Executor memory per node, -Xmx style eg 512m, #1G, etc.
46+
startH2Server = false
10347
}
10448
}
10549

106-
akka.remote.netty.tcp {
107-
# use remote IP address to form akka cluster, not 127.0.0.1. This should be the IP of of the machine where the file
108-
# resides. That means for each mesos agents (where job-server directory is copied on REMOTE_JOBSERVER_DIR path),
109-
# the hostname should be the remote IP of that node.
110-
#
111-
hostname = "xxxxx" #important
50+
# start akka on this interface, reachable from your cluster
51+
akka {
52+
remote.netty.tcp {
53+
hostname = "CLUSTER-IP"
54+
11255
# This controls the maximum message size, including job results, that can be sent
113-
maximum-frame-size = 104857600b #important
56+
maximum-frame-size = 100 MiB
57+
}
11458
}
59+
60+
- Optional: Add following config at the end of job-server's settings.sh file:
61+
62+
```
63+
REMOTE_JOBSERVER_DIR=<path to job-server directory> # copy of job-server directory on all mesos agent nodes
64+
```

doc/yarn.md

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
## Configuring Job Server for YARN
1+
## Configuring Job Server for YARN in client mode with docker
22

3-
(Looking for contributors for this page)
4-
5-
(I would like to thank Jon Buffington for sharing the config tips below.... @velvia)
6-
7-
Note: This is for yarn with docker. If you are looking to deploy on a yarn cluster via EMR, then this link would be more useful [EMR](https://github.com/spark-jobserver/spark-jobserver/blob/master/doc/EMR.md)
3+
See also running in [cluster mode](cluster.md), running [YARN on EMR](EMR.md) and running on [Mesos](mesos.md).
84

95
### Configuring the Spark-Jobserver Docker package to run in Yarn-Client Mode
106

@@ -19,11 +15,10 @@ Files we need:
1915
- dockerfile
2016
- cluster-config directory with hdfs-site.xml and yarn-site.xml (You should have these files already)
2117

22-
Example docker.conf (important settings are marked with # important):
18+
Example docker.conf:
2319

2420
spark {
25-
master = "yarn-client" # important
26-
master = ${?SPARK_MASTER}
21+
master = yarn
2722

2823
# Default # of CPUs for jobs to use for Spark standalone cluster
2924
job-number-cpus = 4

0 commit comments

Comments
 (0)