Skip to content

Commit d2721e6

Browse files
committed
Merge pull request spark-jobserver#392 from koettert/NamedDS
NamedObjects: cache with references to named objects must be a singleton
2 parents 2fb7795 + f8e6c8b commit d2721e6

File tree

7 files changed

+166
-7
lines changed

7 files changed

+166
-7
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package spark.jobserver
2+
3+
import com.typesafe.config.Config
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.storage.StorageLevel
7+
import org.apache.spark.sql.types._
8+
import org.apache.spark.sql.{ SQLContext, Row, DataFrame }
9+
10+
/**
11+
* A test job that accepts a SQLContext, as opposed to the regular SparkContext.
12+
* Just initializes some dummy data into a table.
13+
*/
14+
class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
15+
import NamedObjectsTestJobConfig._
16+
implicit def rddPersister: NamedObjectPersister[NamedRDD[Row]] = new RDDPersister[Row]
17+
implicit def dataFramePersister: NamedObjectPersister[NamedDataFrame] = new DataFramePersister
18+
19+
def validate(sql: SparkContext, config: Config): SparkJobValidation = SparkJobValid
20+
21+
private def rows(sc: SparkContext): RDD[Row] = {
22+
sc.parallelize(List(Row(1, true), Row(2, false), Row(55, true)))
23+
}
24+
25+
def runJob(sc: SparkContext, config: Config): Array[String] = {
26+
if (config.hasPath(CREATE_DF) && config.getBoolean(CREATE_DF)) {
27+
val sqlContext = new SQLContext(sc)
28+
val struct = StructType(
29+
StructField("i", IntegerType, true) ::
30+
StructField("b", BooleanType, false) :: Nil)
31+
val df = sqlContext.createDataFrame(rows(sc), struct)
32+
namedObjects.update("df1", NamedDataFrame(df, true, StorageLevel.MEMORY_AND_DISK))
33+
}
34+
if (config.hasPath(CREATE_RDD) && config.getBoolean(CREATE_RDD)) {
35+
namedObjects.update("rdd1", NamedRDD(rows(sc), true, StorageLevel.MEMORY_ONLY))
36+
}
37+
38+
if (config.hasPath(DELETE)) {
39+
val iter = config.getStringList(DELETE).iterator
40+
while (iter.hasNext) {
41+
namedObjects.forget(iter.next)
42+
}
43+
}
44+
45+
namedObjects.getNames().toArray
46+
}
47+
}
48+
49+
object NamedObjectsTestJobConfig {
50+
val CREATE_DF = "createDF"
51+
val CREATE_RDD = "createRDD"
52+
val DELETE = "delete"
53+
}
54+
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package spark.jobserver
2+
3+
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.testkit.{ ImplicitSender, TestKit }
5+
import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
6+
import akka.testkit.TestProbe
7+
import spark.jobserver.CommonMessages.{ JobErroredOut, JobResult }
8+
import spark.jobserver.io.JobDAOActor
9+
import collection.JavaConversions._
10+
11+
class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
12+
13+
//private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
14+
15+
before {
16+
dao = new InMemoryDAO
17+
daoActor = system.actorOf(JobDAOActor.props(dao))
18+
manager = system.actorOf(JobManagerActor.props(JobManagerSpec.getContextConfig(adhoc = false)))
19+
supervisor = TestProbe().ref
20+
21+
manager ! JobManagerActor.Initialize(daoActor, None)
22+
23+
expectMsgClass(classOf[JobManagerActor.Initialized])
24+
25+
uploadTestJar()
26+
27+
}
28+
29+
val jobName = "spark.jobserver.NamedObjectsTestJob"
30+
31+
private def getCreateConfig(createDF: Boolean, createRDD: Boolean) : Config = {
32+
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
33+
NamedObjectsTestJobConfig.CREATE_DF + " = " + createDF + ", " +
34+
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD)
35+
}
36+
37+
private def getDeleteConfig(names: List[String]) : Config = {
38+
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
39+
NamedObjectsTestJobConfig.DELETE+" = [" + names.mkString(", ") + "]")
40+
}
41+
42+
describe("NamedObjects (RDD)") {
43+
it("should survive from one job to another one") {
44+
45+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, true), errorEvents ++ syncEvents)
46+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
47+
names should contain("rdd1")
48+
49+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
50+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
51+
52+
names2 should contain("rdd1")
53+
names2 should not contain("df1")
54+
55+
//clean-up
56+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1")), errorEvents ++ syncEvents)
57+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
58+
59+
names3 should not contain("rdd1")
60+
names3 should not contain("df1")
61+
}
62+
}
63+
64+
describe("NamedObjects (DataFrame)") {
65+
it("should survive from one job to another one") {
66+
67+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, false), errorEvents ++ syncEvents)
68+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
69+
70+
names should contain("df1")
71+
72+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
73+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
74+
75+
names2 should equal(names)
76+
77+
//clean-up
78+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("df1")), errorEvents ++ syncEvents)
79+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
80+
}
81+
}
82+
83+
describe("NamedObjects (DataFrame + RDD)") {
84+
it("should survive from one job to another one") {
85+
86+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true), errorEvents ++ syncEvents)
87+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
88+
89+
names should contain("rdd1")
90+
names should contain("df1")
91+
92+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
93+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
94+
95+
names2 should equal(names)
96+
97+
//clean-up
98+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1")), errorEvents ++ syncEvents)
99+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
100+
}
101+
}
102+
103+
}

job-server-extras/test/spark.jobserver/NamedObjectsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package spark.jobserver
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
44
import akka.testkit.{ ImplicitSender, TestKit }
5-
import org.apache.spark.SparkContext
5+
import org.apache.spark.{ SparkContext, SparkConf }
66
import org.apache.spark.sql.{ SQLContext, Row, DataFrame}
77
import org.apache.spark.sql.types._
88
import org.apache.spark.rdd.RDD
@@ -16,7 +16,7 @@ import org.scalatest.{ Matchers, FunSpecLike, FunSpec, BeforeAndAfterAll, Before
1616
class NamedObjectsSpec extends TestKit(ActorSystem("NamedObjectsSpec")) with FunSpecLike
1717
with ImplicitSender with Matchers with BeforeAndAfter with BeforeAndAfterAll {
1818

19-
val sc = new SparkContext("local[4]", getClass.getSimpleName)
19+
val sc = new SparkContext("local[3]", getClass.getSimpleName, new SparkConf)
2020
val sqlContext = new SQLContext(sc)
2121
val namedObjects: NamedObjects = new JobServerNamedObjects(system)
2222

job-server/src/spark.jobserver/JobManagerActor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
9595
protected var resultActor: ActorRef = _
9696
private var daoActor: ActorRef = _
9797

98+
private val jobServerNamedObjects = new JobServerNamedObjects(context.system)
9899

99100
override def postStop() {
100101
logger.info("Shutting down SparkContext {}", contextName)
@@ -257,7 +258,7 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
257258
if (job.isInstanceOf[NamedObjectSupport]) {
258259
val namedObjects = job.asInstanceOf[NamedObjectSupport].namedObjectsPrivate
259260
if (namedObjects.get() == null) {
260-
namedObjects.compareAndSet(null, new JobServerNamedObjects(context.system))
261+
namedObjects.compareAndSet(null, jobServerNamedObjects)
261262
}
262263
}
263264

job-server/test/spark.jobserver/JobWithNamedRddsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package spark.jobserver
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
44
import akka.testkit.{ ImplicitSender, TestKit }
5-
import org.apache.spark.SparkContext
5+
import org.apache.spark.{ SparkContext, SparkConf }
66
import org.apache.spark.storage.StorageLevel
77
import org.scalatest.{ FunSpecLike, FunSpec, BeforeAndAfterAll, BeforeAndAfter }
88
import com.typesafe.config.Config
@@ -15,7 +15,7 @@ class JobWithNamedRddsSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
1515

1616
private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
1717

18-
val sc = new SparkContext("local[4]", getClass.getSimpleName)
18+
val sc = new SparkContext("local[4]", getClass.getSimpleName, new SparkConf)
1919

2020
class TestJob1 extends SparkJob with NamedRddSupport {
2121
def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid

job-server/test/spark.jobserver/LocalContextSupervisorSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ object LocalContextSupervisorSpec {
2020
jobserver.job-result-cache-size = 100
2121
jobserver.context-creation-timeout = 5 s
2222
jobserver.yarn-context-creation-timeout = 40 s
23+
jobserver.named-object-creation-timeout = 60 s
2324
contexts {
2425
olap-demo {
2526
num-cpu-cores = 4

job-server/test/spark.jobserver/NamedObjectsRDDsOnlySpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package spark.jobserver
22

33
import akka.actor.{ ActorRef, ActorSystem, Props }
44
import akka.testkit.{ ImplicitSender, TestKit }
5-
import org.apache.spark.SparkContext
5+
import org.apache.spark.{ SparkContext, SparkConf }
66
import org.apache.spark.rdd.RDD
77
import org.apache.spark.storage.StorageLevel
88
import org.scalatest.{ Matchers, FunSpecLike, FunSpec, BeforeAndAfterAll, BeforeAndAfter }
@@ -14,7 +14,7 @@ import org.scalatest.{ Matchers, FunSpecLike, FunSpec, BeforeAndAfterAll, Before
1414
class NamedObjectsRDDsOnlySpec extends TestKit(ActorSystem("NamedObjectsSpec")) with FunSpecLike
1515
with ImplicitSender with Matchers with BeforeAndAfter with BeforeAndAfterAll {
1616

17-
val sc = new SparkContext("local[4]", getClass.getSimpleName)
17+
val sc = new SparkContext("local[2]", getClass.getSimpleName, new SparkConf)
1818
val namedObjects: NamedObjects = new JobServerNamedObjects(system)
1919

2020
implicit def rddPersister[T] = new RDDPersister[T]

0 commit comments

Comments
 (0)