Skip to content

Commit 1ea4a35

Browse files
NamedObjects: cache with references to named objects must be a singleton
1 parent 9130ccd commit 1ea4a35

File tree

3 files changed

+162
-5
lines changed

3 files changed

+162
-5
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package spark.jobserver
2+
3+
import com.typesafe.config.Config
4+
import org.apache.spark.SparkContext
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.storage.StorageLevel
7+
import org.apache.spark.sql.types._
8+
import org.apache.spark.sql.{ SQLContext, Row, DataFrame }
9+
10+
/**
11+
* A test job that accepts a SQLContext, as opposed to the regular SparkContext.
12+
* Just initializes some dummy data into a table.
13+
*/
14+
class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
15+
import NamedObjectsTestJobConfig._
16+
implicit def rddPersister: NamedObjectPersister[NamedRDD[Row]] = new RDDPersister[Row]
17+
implicit def dataFramePersister: NamedObjectPersister[NamedDataFrame] = new DataFramePersister
18+
19+
def validate(sql: SparkContext, config: Config): SparkJobValidation = SparkJobValid
20+
21+
private def rows(sc: SparkContext): RDD[Row] = {
22+
sc.parallelize(List(Row(1, true), Row(2, false), Row(55, true)))
23+
}
24+
25+
def runJob(sc: SparkContext, config: Config): Array[String] = {
26+
if (config.hasPath(CREATE_DF) && config.getBoolean(CREATE_DF)) {
27+
val sqlContext = new SQLContext(sc)
28+
val struct = StructType(
29+
StructField("i", IntegerType, true) ::
30+
StructField("b", BooleanType, false) :: Nil)
31+
val df = sqlContext.createDataFrame(rows(sc), struct)
32+
namedObjects.update("df1", NamedDataFrame(df, true, StorageLevel.MEMORY_AND_DISK))
33+
}
34+
if (config.hasPath(CREATE_RDD) && config.getBoolean(CREATE_RDD)) {
35+
namedObjects.update("rdd1", NamedRDD(rows(sc), true, StorageLevel.MEMORY_ONLY))
36+
}
37+
38+
if (config.hasPath(DELETE)) {
39+
val iter = config.getStringList(DELETE).iterator
40+
while (iter.hasNext) {
41+
namedObjects.forget(iter.next)
42+
}
43+
}
44+
45+
namedObjects.getNames().toArray
46+
}
47+
}
48+
49+
object NamedObjectsTestJobConfig {
50+
val CREATE_DF = "createDF"
51+
val CREATE_RDD = "createRDD"
52+
val DELETE = "delete"
53+
}
54+
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package spark.jobserver
2+
3+
import akka.actor.{ ActorRef, ActorSystem, Props }
4+
import akka.testkit.{ ImplicitSender, TestKit }
5+
import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
6+
import akka.testkit.TestProbe
7+
import spark.jobserver.CommonMessages.{ JobErroredOut, JobResult }
8+
import spark.jobserver.io.JobDAOActor
9+
import collection.JavaConversions._
10+
11+
class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
12+
13+
private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
14+
15+
before {
16+
dao = new InMemoryDAO
17+
daoActor = system.actorOf(JobDAOActor.props(dao))
18+
manager = system.actorOf(JobManagerActor.props(JobManagerSpec.getContextConfig(adhoc = false)))
19+
supervisor = TestProbe().ref
20+
}
21+
22+
val jobName = "spark.jobserver.NamedObjectsTestJob"
23+
24+
describe("NamedObjects (RDD)") {
25+
it("should survive from one job to another one") {
26+
27+
manager ! JobManagerActor.Initialize(daoActor, None)
28+
expectMsgClass(classOf[JobManagerActor.Initialized])
29+
30+
uploadTestJar()
31+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
32+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(true)),
33+
errorEvents ++ syncEvents)
34+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
35+
names should equal(Array("rdd1"))
36+
37+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
38+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
39+
errorEvents ++ syncEvents)
40+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
41+
42+
names2 should equal(names)
43+
44+
//clean-up
45+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.DELETE, ConfigValueFactory.fromIterable(names.toList))
46+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
47+
errorEvents ++ syncEvents)
48+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
49+
50+
names3.size should equal(0)
51+
}
52+
}
53+
54+
describe("NamedObjects (DataFrame)") {
55+
it("should survive from one job to another one") {
56+
manager ! JobManagerActor.Initialize(daoActor, None)
57+
expectMsgClass(classOf[JobManagerActor.Initialized])
58+
59+
uploadTestJar()
60+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(true))
61+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
62+
errorEvents ++ syncEvents)
63+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
64+
names should equal(Array("df1"))
65+
66+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
67+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
68+
errorEvents ++ syncEvents)
69+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
70+
71+
names2 should equal(names)
72+
}
73+
}
74+
75+
describe("NamedObjects (DataFrame + RDD)") {
76+
it("should survive from one job to another one") {
77+
manager ! JobManagerActor.Initialize(daoActor, None)
78+
expectMsgClass(classOf[JobManagerActor.Initialized])
79+
80+
uploadTestJar()
81+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(true))
82+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(true)),
83+
errorEvents ++ syncEvents)
84+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
85+
names should equal(Array("rdd1", "df1"))
86+
87+
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
88+
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
89+
errorEvents ++ syncEvents)
90+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
91+
92+
names2 should equal(names)
93+
}
94+
}
95+
96+
}

job-server/src/spark.jobserver/JobServerNamedObjects.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import spray.util._
1818
*/
1919
class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
2020

21+
import JobServerNamedObjects._
22+
2123
val logger = LoggerFactory.getLogger(getClass)
2224

2325
implicit val ec: ExecutionContext = system.dispatcher
@@ -30,11 +32,6 @@ class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
3032
config.getDuration("spark.jobserver.named-object-creation-timeout",
3133
SECONDS), SECONDS)
3234

33-
// we must store a reference to each NamedObject even though only its ID is used here
34-
// this reference prevents the object from being GCed and cleaned by sparks ContextCleaner
35-
// or some other GC for other types of objects
36-
private val namesToObjects: Cache[NamedObject] = LruCache()
37-
3835
override def getOrElseCreate[O <: NamedObject](name: String, objGen: => O)
3936
(implicit timeout: Timeout = defaultTimeout,
4037
persister: NamedObjectPersister[O]): O = {
@@ -96,5 +93,15 @@ class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
9693
case answer: Iterable[String] @unchecked => answer
9794
}
9895
}
96+
}
9997

98+
/**
99+
* companion object that hold reference to cache with named object so that we can reference
100+
* named objects across jobs
101+
*/
102+
object JobServerNamedObjects {
103+
// we must store a reference to each NamedObject even though only its ID is used here
104+
// this reference prevents the object from being GCed and cleaned by sparks ContextCleaner
105+
// or some other GC for other types of objects
106+
val namesToObjects: Cache[NamedObject] = LruCache()
100107
}

0 commit comments

Comments
 (0)