Skip to content

Commit ad8154f

Browse files
committed
Merge branch 'master' into pr/165
Conflicts: job-server/src/main/resources/application.conf
2 parents 4b72274 + 59b8307 commit ad8154f

25 files changed

+249
-61
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ test-reports/
1111
# ignore deployment configs
1212
config/*.conf
1313
config/*.sh
14+
job-server/config/*.conf
15+
job-server/config/*.sh
1416
metastore_db/

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ See [Troubleshooting Tips](doc/troubleshooting.md) as well as [Yarn tips](doc/ya
1717
- Avenida.com
1818
- GumGum
1919
- Fuse Elements
20+
- Frontline Solvers
21+
- Aruba Networks
22+
- [Zed Worldwide](www.zed.com)
2023

2124
## Features
2225

2326
- *"Spark as a Service"*: Simple REST interface for all aspects of job, context management
24-
- Support for Spark SQL and Hive Contexts/jobs and custom job contexts! See [Contexts](doc/contexts.md).
27+
- Support for Spark SQL, Hive, Streaming Contexts/jobs and custom job contexts! See [Contexts](doc/contexts.md).
2528
- Supports sub-second low-latency jobs via long-running job contexts
2629
- Start and stop job contexts for RDD sharing and low-latency jobs; change resources on restart
2730
- Kill running jobs via stop context
@@ -46,6 +49,8 @@ For release notes, look in the `notes/` directory. They should also be up on [l
4649

4750
## Quick start / development mode
4851

52+
NOTE: This quick start guide uses SBT to run the job server and the included test jar, but the normal development process is to create a separate project for Job Server jobs and to deploy the job server to a Spark cluster. Please see the deployment section below for more details.
53+
4954
You need to have [SBT](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) installed.
5055

5156
To set the current version, do something like this:
@@ -231,6 +236,8 @@ def validate(sc:SparkContext, config: Contig): SparkJobValidation = {
231236
it to the remotes you have configured in `<environment>.sh`
232237
3. On the remote server, start it in the deployed directory with `server_start.sh` and stop it with `server_stop.sh`
233238

239+
The `server_start.sh` script uses `spark-submit` under the hood and may be passed any of the standard extra arguments from `spark-submit`.
240+
234241
NOTE: by default the assembly jar from `job-server-extras`, which includes support for SQLContext and HiveContext, is used. If you face issues with all the extra dependencies, consider modifying the install scripts to invoke `sbt job-server/assembly` instead, which doesn't include the extra dependencies.
235242

236243
Note: to test out the deploy to a local staging dir, or package the job server for Mesos,
@@ -275,6 +282,8 @@ the REST API.
275282
DELETE /jobs/<jobId> - Kills the specified job
276283
GET /jobs/<jobId>/config - Gets the job configuration
277284

285+
For details on the Typesafe config format used for input (JSON also works), see the [Typesafe Config docs](https://github.com/typesafehub/config).
286+
278287
### Context configuration
279288

280289
A number of context-specific settings can be controlled when creating a context (POST /contexts) or running an

akka-app/src/ooyala.common.akka/AkkaTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ object AkkaTestUtils {
1010
// This is a var for now because we need to let people change it, and we can't pass this in as a param
1111
// because then we would change the API. If we have it as a default param, we can't have multiple methods
1212
// with the same name.
13-
var timeout = 10 seconds
13+
var timeout = 15 seconds
1414

1515
def shutdownAndWait(actor: ActorRef) {
1616
if (actor != null) {

bin/server_start.sh

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/bin/bash
22
# Script to start the job server
3+
# Extra arguments will be spark-submit options, for example
4+
# ./server_start.sh --jars cassandra-spark-connector.jar
35
set -e
46

57
get_abs_script_path() {
@@ -16,7 +18,7 @@ GC_OPTS="-XX:+UseConcMarkSweepGC
1618
-XX:MaxPermSize=512m
1719
-XX:+CMSClassUnloadingEnabled "
1820

19-
JAVA_OPTS="-Xmx5g -XX:MaxDirectMemorySize=512M
21+
JAVA_OPTS="-XX:MaxDirectMemorySize=512M
2022
-XX:+HeapDumpOnOutOfMemoryError -Djava.net.preferIPv4Stack=true
2123
-Dcom.sun.management.jmxremote.port=9999
2224
-Dcom.sun.management.jmxremote.authenticate=false
@@ -42,13 +44,6 @@ if [ -z "$SPARK_HOME" ]; then
4244
exit 1
4345
fi
4446

45-
if [ -z "$SPARK_CONF_DIR" ]; then
46-
SPARK_CONF_DIR=$SPARK_HOME/conf
47-
fi
48-
49-
# Pull in other env vars in spark config, such as MESOS_NATIVE_LIBRARY
50-
. $SPARK_CONF_DIR/spark-env.sh
51-
5247
pidFilePath=$appdir/$PIDFILE
5348

5449
if [ -f "$pidFilePath" ] && kill -0 $(cat "$pidFilePath"); then
@@ -62,7 +57,7 @@ if [ -z "$LOG_DIR" ]; then
6257
fi
6358
mkdir -p $LOG_DIR
6459

65-
LOGGING_OPTS="-Dlog4j.configuration=log4j-server.properties
60+
LOGGING_OPTS="-Dlog4j.configuration=file:$appdir/log4j-server.properties
6661
-DLOG_DIR=$LOG_DIR"
6762

6863
# For Mesos
@@ -75,12 +70,15 @@ if [ "$PORT" != "" ]; then
7570
CONFIG_OVERRIDES+="-Dspark.jobserver.port=$PORT "
7671
fi
7772

73+
if [ -z "$DRIVER_MEMORY" ]; then
74+
DRIVER_MEMORY=1G
75+
fi
76+
7877
# This needs to be exported for standalone mode so drivers can connect to the Spark cluster
7978
export SPARK_HOME
8079

81-
# job server jar needs to appear first so its deps take higher priority
82-
# need to explicitly include app dir in classpath so logging configs can be found
83-
CLASSPATH="$appdir:$appdir/spark-job-server.jar:$($SPARK_HOME/bin/compute-classpath.sh)"
84-
85-
exec java -cp $CLASSPATH $GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES $MAIN $conffile 2>&1 &
80+
$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory $DRIVER_MEMORY \
81+
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS" \
82+
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES" \
83+
$@ $appdir/spark-job-server.jar $conffile 2>&1 &
8684
echo $! > $pidFilePath

doc/contexts.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,13 @@ This can be done easily by extending the `SparkContextFactory` trait, like `SQLC
3838

3939
## Jars
4040

41-
If you wish to use the `SQLContext` or `HiveContext`, be sure to pull down the job-server-extras package.
41+
If you wish to use the `SQLContext` or `HiveContext`, be sure to pull down the job-server-extras package.
42+
43+
## StreamingContext
44+
45+
`job-server-extras` provides a context to run Spark Streaming jobs. There are a couple of configurations you can change in job-server's .conf file:
46+
47+
* `streaming.batch_interval`: the streaming batch in millis
48+
* `streaming.stopGracefully`: if true, stops gracefully by waiting for the processing of all received data to be completed
49+
* `streaming.stopSparkContext`: if true, stops the SparkContext with the StreamingContext. The underlying SparkContext will be stopped regardless of whether the StreamingContext has been started.
50+

doc/troubleshooting.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@ send timeout param along with your request (in secs). eg below.
1616
http://devsparkcluster.cloudapp.net/jobs?appName=job-server-tests&classPath=spark.jobserver.WordCountExample&sync=true&timeout=20
1717
```
1818

19+
You may need to adjust Spray's default request timeout and idle timeout, which are by default 40 secs and 60 secs. To do this, modify the configuration file in your deployed job server, adding a section like the following:
20+
21+
```
22+
spray.can.server {
23+
idle-timeout = 210 s
24+
request-timeout = 200 s
25+
}
26+
```
27+
28+
Then simply restart the job server.
29+
30+
Note that the idle-timeout must be higher than request-timeout, or Spray and the job server won't start.
31+
1932
## Job server won't start / cannot bind to 0.0.0.0:8090
2033

2134
Check that another process isn't already using that port. If it is, you may want to start it on another port:
@@ -33,6 +46,10 @@ after this fixed, I can run jobs submitted from a remote job server successfully
3346

3447
(Thanks to @pcliu)
3548

49+
## Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorRefFactory.dispatcher()Lscala/concurrent/ExecutionContextExecutor;
50+
51+
If you are running CDH 5.3 or older, you may have an incompatible version of Akka bundled together. :( Try modifying the version of Akka included with spark-jobserver to match the one in CDH (2.2.4, I think), or upgrade to CDH 5.4. If you are on CDH 5.4, check that `sparkVersion` in `Dependencies.scala` matches CDH. Or see [isse #154](https://github.com/spark-jobserver/spark-jobserver/issues/154).
52+
3653
## I want to run job-server on Windows
3754

3855
1. Create directory `C:\Hadoop\bin`
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package spark.jobserver
2+
3+
import org.apache.spark.streaming.StreamingContext
4+
5+
/**
6+
* Defines a Job that runs on a [[StreamingContext]], note that
7+
* these jobs are usually long running jobs and there's (yet) no way in Spark
8+
* Job Server to query the status of these jobs.
9+
*/
10+
trait SparkStramingJob extends SparkJobBase {
11+
type C = StreamingContext
12+
}

job-server-extras/src/spark.jobserver/SqlTestJob.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package spark.jobserver
22

3-
import com.typesafe.config.{Config, ConfigFactory}
4-
import org.apache.spark._
3+
import com.typesafe.config.Config
54
import org.apache.spark.sql.SQLContext
65

76
/**
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package spark.jobserver
2+
3+
import com.google.common.annotations.VisibleForTesting
4+
import com.typesafe.config.Config
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.streaming.StreamingContext
7+
8+
import scala.collection.mutable
9+
10+
@VisibleForTesting
11+
object StreamingTestJob extends SparkStramingJob {
12+
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid
13+
14+
15+
def runJob(ssc: StreamingContext, config: Config): Any = {
16+
val queue = mutable.Queue[RDD[String]]()
17+
queue += ssc.sparkContext.makeRDD(Seq("123", "test", "test2"))
18+
val lines = ssc.queueStream(queue)
19+
val words = lines.flatMap(_.split(" "))
20+
val pairs = words.map(word => (word, 1))
21+
val wordCounts = pairs.reduceByKey(_ + _)
22+
//do something
23+
wordCounts.foreachRDD(rdd => println(rdd.count()))
24+
ssc.start()
25+
ssc.awaitTermination()
26+
}
27+
}

job-server-extras/src/spark.jobserver/context/HiveContextFactory.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@ import spark.jobserver.{ContextLike, SparkHiveJob, SparkJobBase}
77
import spark.jobserver.util.SparkJobUtils
88

99
class HiveContextFactory extends SparkContextFactory {
10-
import SparkJobUtils._
11-
1210
type C = HiveContext with ContextLike
1311

14-
def makeContext(config: Config, contextConfig: Config, contextName: String): C = {
15-
val conf = configToSparkConf(config, contextConfig, contextName)
16-
contextFactory(conf)
12+
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
13+
contextFactory(sparkConf)
1714
}
1815

1916
protected def contextFactory(conf: SparkConf): C = {

0 commit comments

Comments
 (0)