Skip to content

Commit 58a47ae

Browse files
committed
Merge pull request spark-jobserver#425 from hntd187/dependencies
Upgraded to sbt 0.13.11, upgraded dependencies to latest minor verson…
2 parents 8fc1edf + e0cc671 commit 58a47ae

19 files changed

+115
-113
lines changed

.travis.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@ env:
33
global:
44
_JAVA_OPTIONS="-Xmx1500m -XX:MaxPermSize=512m -Dakka.test.timefactor=3"
55
scala:
6-
- 2.10.4
7-
- 2.11.6
6+
- 2.10.6
7+
- 2.11.8
8+
jdk:
9+
- oraclejdk8
10+
- oraclejdk7

job-server/src/spark.jobserver/AkkaClusterSupervisorActor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ class AkkaClusterSupervisorActor(daoActor: ActorRef) extends InstrumentedActor {
184184
contexts(ctxName) = (ref, resActor)
185185
context.watch(ref)
186186
successFunc(ref)
187+
case _ => logger.info("Failed for unknown reason.")
188+
ref ! PoisonPill
189+
failureFunc(new RuntimeException("Failed for unknown reason."))
187190
}
188191
}
189192

job-server/test/spark.jobserver/DataManagerActorSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ class DataManagerActorSpec extends TestKit(DataManagerActorSpec.system) with Imp
1515
with FunSpecLike with Matchers with BeforeAndAfter with BeforeAndAfterAll {
1616

1717
import com.typesafe.config._
18-
import CommonMessages.NoSuchJobId
1918
import DataManagerActor._
2019

2120
private val bytes = Array[Byte](0, 1, 2)
2221
private val tmpDir = Files.createTempDirectory("ut")
23-
private val config = ConfigFactory.empty().withValue("spark.jobserver.datadao.rootdir", ConfigValueFactory.fromAnyRef(tmpDir.toString))
22+
private val config = ConfigFactory.empty().withValue("spark.jobserver.datadao.rootdir",
23+
ConfigValueFactory.fromAnyRef(tmpDir.toString))
2424

2525
override def afterAll() {
2626
dao.shutdown()

job-server/test/spark.jobserver/InMemoryDAO.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ class InMemoryDAO extends JobDAO {
1616
jars((appName, uploadTime)) = jarBytes
1717
}
1818

19-
def getApps(): Map[String, DateTime] = {
19+
def getApps: Map[String, DateTime] = {
2020
jars.keys
2121
.groupBy(_._1)
2222
.map { case (appName, appUploadTimeTuples) =>
2323
appName -> appUploadTimeTuples.map(_._2).toSeq.head
24-
}.toMap
24+
}
2525
}
2626

2727
def retrieveJarFile(appName: String, uploadTime: DateTime): String = {
@@ -34,7 +34,7 @@ class InMemoryDAO extends JobDAO {
3434
} finally {
3535
bos.close()
3636
}
37-
outFile.getAbsolutePath()
37+
outFile.getAbsolutePath
3838
}
3939

4040
val jobInfos = mutable.HashMap.empty[String, JobInfo]
@@ -49,5 +49,5 @@ class InMemoryDAO extends JobDAO {
4949

5050
def saveJobConfig(jobId: String, jobConfig: Config) { jobConfigs(jobId) = jobConfig }
5151

52-
def getJobConfigs(): Map[String, Config] = jobConfigs.toMap
52+
def getJobConfigs: Map[String, Config] = jobConfigs.toMap
5353
}

job-server/test/spark.jobserver/JobManagerActorSpec.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package spark.jobserver
22

3-
import akka.actor.Props
4-
import akka.testkit.TestProbe
53
import spark.jobserver.CommonMessages.{JobErroredOut, JobResult}
64
import spark.jobserver.io.JobDAOActor
75

job-server/test/spark.jobserver/JobManagerSpec.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
package spark.jobserver
22

3-
import java.nio.file.Paths
3+
import scala.collection.mutable
44

55
import com.typesafe.config.ConfigFactory
66
import spark.jobserver.JobManagerActor.KillJob
7-
import scala.collection.mutable
8-
import spark.jobserver.io.JobDAO
97

108
object JobManagerSpec extends JobSpecConfig
119

1210
abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
1311
import scala.concurrent.duration._
12+
1413
import CommonMessages._
1514
import JobManagerSpec.MaxJobsPerContext
1615
import akka.testkit._
@@ -95,7 +94,7 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
9594
uploadTestJar()
9695
manager ! JobManagerActor.StartJob("demo", wordCountClass, stringConfig, syncEvents ++ errorEvents)
9796
expectMsgPF(startJobWait, "Did not get JobResult") {
98-
case JobResult(_, result: Map[String, Int]) => println("I got results! " + result)
97+
case JobResult(_, result) => println("I got results! " + result)
9998
}
10099
expectNoMsg()
101100
}
@@ -129,7 +128,7 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
129128
manager ! JobManagerActor.StartJob("demo", classPrefix + "ConfigCheckerJob", jobConfig,
130129
syncEvents ++ errorEvents)
131130
expectMsgPF(startJobWait, "Did not get JobResult") {
132-
case JobResult(_, keys: Seq[String]) =>
131+
case JobResult(_, keys: Seq[_]) =>
133132
keys should contain ("foo")
134133
}
135134
}
@@ -203,10 +202,10 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
203202
uploadTestJar()
204203
manager ! JobManagerActor.StartJob("demo", classPrefix + "LongPiJob", stringConfig, allEvents)
205204
expectMsgPF(5.seconds.dilated, "Did not get JobResult") {
206-
case JobStarted(id, _, _) => {
205+
case JobStarted(id, _, _) =>
207206
manager ! KillJob(id)
208207
expectMsgClass(classOf[JobKilled])
209-
}
208+
210209
}
211210
}
212211

@@ -219,12 +218,13 @@ abstract class JobManagerSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
219218
val jobJarDepsConfigs = ConfigFactory.parseString(
220219
s"""
221220
|dependent-jar-uris = ["file://$getExtrasJarPath"]
222-
""".stripMargin)
221+
""".stripMargin
222+
)
223223

224224
manager ! JobManagerActor.StartJob("demo", classPrefix + "jobJarDependenciesJob", jobJarDepsConfigs,
225225
syncEvents ++ errorEvents)
226226
expectMsgPF(startJobWait, "Did not get JobResult") {
227-
case JobResult(_, result: Map[String, Long]) => println("I got results! " + result)
227+
case JobResult(_, result) => println("I got results! " + result)
228228
}
229229
expectNoMsg()
230230
}

job-server/test/spark.jobserver/JobSpecBase.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ trait JobSpecConfig {
1717
import collection.JavaConverters._
1818

1919
val JobResultCacheSize = Integer.valueOf(30)
20-
val NumCpuCores = Integer.valueOf(Runtime.getRuntime.availableProcessors()) // number of cores to allocate. Required.
21-
val MemoryPerNode = "512m" // Executor memory per node, -Xmx style eg 512m, 1G, etc.
20+
// number of cores to allocate. Required.
21+
val NumCpuCores = Integer.valueOf(Runtime.getRuntime.availableProcessors())
22+
// Executor memory per node, -Xmx style eg 512m, 1G, etc.
23+
val MemoryPerNode = "512m"
2224
val MaxJobsPerContext = Integer.valueOf(2)
23-
def contextFactory = classOf[DefaultSparkContextFactory].getName
25+
def contextFactory: String = classOf[DefaultSparkContextFactory].getName
2426
lazy val config = {
2527
val ConfigMap = Map(
2628
"spark.jobserver.job-result-cache-size" -> JobResultCacheSize,
@@ -29,7 +31,7 @@ trait JobSpecConfig {
2931
"spark.jobserver.max-jobs-per-context" -> MaxJobsPerContext,
3032
"spark.jobserver.named-object-creation-timeout" -> "60 s",
3133
"akka.log-dead-letters" -> Integer.valueOf(0),
32-
"spark.master" -> "local[4]",
34+
"spark.master" -> "local[*]",
3335
"context-factory" -> contextFactory,
3436
"spark.context-settings.test" -> ""
3537
)
@@ -44,14 +46,14 @@ trait JobSpecConfig {
4446
lazy val contextConfig = {
4547
val ConfigMap = Map(
4648
"context-factory" -> contextFactory,
47-
"streaming.batch_interval" -> new Integer(40),
48-
"streaming.stopGracefully" -> false,
49-
"streaming.stopSparkContext" -> true
49+
"streaming.batch_interval" -> Integer.valueOf(40),
50+
"streaming.stopGracefully" -> Boolean.box(false),
51+
"streaming.stopSparkContext" -> Boolean.box(true)
5052
)
5153
ConfigFactory.parseMap(ConfigMap.asJava).withFallback(ConfigFactory.defaultOverrides())
5254
}
5355

54-
def getNewSystem = ActorSystem("test", config)
56+
def getNewSystem: ActorSystem = ActorSystem("test", config)
5557
}
5658

5759
abstract class JobSpecBaseBase(system: ActorSystem) extends TestKit(system) with ImplicitSender

job-server/test/spark.jobserver/JobStatusActorSpec.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package spark.jobserver
22

3-
import akka.actor.{Props, PoisonPill, ActorRef, ActorSystem}
3+
import akka.actor.{Props, ActorRef, ActorSystem}
44
import akka.testkit.{TestKit, ImplicitSender}
55
import spark.jobserver.io.{JobDAOActor, JarInfo, JobInfo, JobDAO}
66
import org.joda.time.DateTime
7-
import org.scalatest.matchers.ShouldMatchers
8-
import org.scalatest.{FunSpecLike, FunSpec, BeforeAndAfter, BeforeAndAfterAll}
7+
import org.scalatest.Matchers
8+
import org.scalatest.{FunSpecLike, BeforeAndAfter, BeforeAndAfterAll}
99

1010
object JobStatusActorSpec {
1111
val system = ActorSystem("test")
1212
}
1313

1414
class JobStatusActorSpec extends TestKit(JobStatusActorSpec.system) with ImplicitSender
15-
with FunSpecLike with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll {
15+
with FunSpecLike with Matchers with BeforeAndAfter with BeforeAndAfterAll {
1616

17-
import com.typesafe.config._
1817
import CommonMessages._
1918
import JobStatusActor._
2019

@@ -24,7 +23,6 @@ with FunSpecLike with ShouldMatchers with BeforeAndAfter with BeforeAndAfterAll
2423
private val jarInfo = JarInfo(appName, DateTime.now)
2524
private val classPath = "classPath"
2625
private val jobInfo = JobInfo(jobId, contextName, jarInfo, classPath, DateTime.now, None, None)
27-
private val jobConfig = ConfigFactory.empty()
2826

2927
override def afterAll() {
3028
ooyala.common.akka.AkkaTestUtils.shutdownAndWait(JobStatusActorSpec.system)

job-server/test/spark.jobserver/JobWithNamedRddsSpec.scala

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package spark.jobserver
22

3-
import akka.actor.{ ActorRef, ActorSystem, Props }
4-
import akka.testkit.{ ImplicitSender, TestKit }
5-
import org.apache.spark.{ SparkContext, SparkConf }
3+
import org.apache.spark.{SparkConf, SparkContext}
64
import org.apache.spark.storage.StorageLevel
7-
import org.scalatest.{ FunSpecLike, FunSpec, BeforeAndAfterAll, BeforeAndAfter }
8-
import com.typesafe.config.Config
9-
import com.typesafe.config.ConfigFactory
10-
import spark.jobserver.CommonMessages.{ JobErroredOut, JobResult }
115
import java.util.concurrent.TimeoutException
126
import scala.concurrent.duration._
137
import org.apache.spark.rdd.RDD
14-
class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
158

16-
private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
9+
import com.typesafe.config.Config
10+
11+
class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
1712

1813
val sc = new SparkContext("local[4]", getClass.getSimpleName, new SparkConf)
1914

@@ -42,7 +37,7 @@ class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
4237

4338
describe("NamedRdds") {
4439
it("get() should return None when RDD does not exist") {
45-
namedTestRdds.getNames.foreach { rddName => namedTestRdds.destroy(rddName) }
40+
namedTestRdds.getNames().foreach { rddName => namedTestRdds.destroy(rddName) }
4641
namedTestRdds.get[Int]("No such RDD") should equal(None)
4742
}
4843

@@ -60,7 +55,7 @@ class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
6055

6156
var rdd : Option[RDD[Int]] = None
6257
val thread = new Thread {
63-
override def run {
58+
override def run() {
6459
namedTestRdds.getOrElseCreate("rdd-sleep", {
6560
val t1 = System.currentTimeMillis()
6661
var x = 0d
@@ -73,7 +68,7 @@ class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
7368
})(1.milliseconds)
7469
}
7570
}
76-
thread.start
71+
thread.start()
7772
Thread.sleep(11)
7873
//don't wait
7974
val err = intercept[TimeoutException] { namedTestRdds.get[Int]("rdd-sleep")(1.milliseconds) }
@@ -147,7 +142,7 @@ class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
147142
}
148143

149144
it("should include underlying exception when error occurs") {
150-
def errorFunc = {
145+
def errorFunc: RDD[Int] = {
151146
throw new IllegalArgumentException("boo!")
152147
sc.parallelize(Seq(1, 2))
153148
}

job-server/test/spark.jobserver/LocalContextSupervisorSpec.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package spark.jobserver
33
import akka.actor._
44
import akka.testkit.{ImplicitSender, TestKit}
55
import com.typesafe.config.ConfigFactory
6-
import org.apache.hadoop.conf.Configuration
7-
import org.apache.spark.SparkConf
86
import spark.jobserver.io.{JobDAO, JobDAOActor}
97
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpecLike, Matchers}
108

@@ -102,21 +100,20 @@ class LocalContextSupervisorSpec extends TestKit(LocalContextSupervisorSpec.syst
102100
expectMsg(Seq("c1", "c2"))
103101
supervisor ! GetResultActor("c1")
104102
val rActor = expectMsgClass(classOf[ActorRef])
105-
rActor.path.toString should not include ("global")
103+
rActor.path.toString should not include "global"
106104
}
107105

108106
it("should be able to get context configs") {
109107
supervisor ! AddContext("c1", contextConfig)
110108
expectMsg(ContextInitialized)
111109
supervisor ! GetContext("c1")
112110
expectMsgPF(5 seconds, "I can't find that context :'-(") {
113-
case (contextActor: ActorRef, resultActor: ActorRef) => {
111+
case (contextActor: ActorRef, resultActor: ActorRef) =>
114112
contextActor ! GetContextConfig
115113
val cc = expectMsgClass(classOf[ContextConfig])
116114
cc.contextName shouldBe "c1"
117115
cc.contextConfig.get("spark.ui.enabled") shouldBe "false"
118116
cc.hadoopConfig.get("mapreduce.framework.name") shouldBe "ayylmao"
119-
}
120117
}
121118
}
122119

0 commit comments

Comments
 (0)