Skip to content

Commit 53d02f9

Browse files
NamedObjects: create one instance of JobServerNamedObjects instead of a singleton object
1 parent c2b3fa1 commit 53d02f9

File tree

3 files changed

+35
-51
lines changed

3 files changed

+35
-51
lines changed

job-server-extras/test/spark.jobserver/NamedObjectsJobSpec.scala

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,42 +10,50 @@ import collection.JavaConversions._
1010

1111
class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
1212

13-
private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
13+
//private val emptyConfig = ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s")
1414

1515
before {
1616
dao = new InMemoryDAO
1717
daoActor = system.actorOf(JobDAOActor.props(dao))
1818
manager = system.actorOf(JobManagerActor.props(JobManagerSpec.getContextConfig(adhoc = false)))
1919
supervisor = TestProbe().ref
20+
21+
manager ! JobManagerActor.Initialize(daoActor, None)
22+
23+
expectMsgClass(classOf[JobManagerActor.Initialized])
24+
25+
uploadTestJar()
26+
2027
}
2128

2229
val jobName = "spark.jobserver.NamedObjectsTestJob"
2330

31+
private def getCreateConfig(createDF: Boolean, createRDD: Boolean) : Config = {
32+
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
33+
NamedObjectsTestJobConfig.CREATE_DF + " = " + createDF + ", " +
34+
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD)
35+
}
36+
37+
private def getDeleteConfig(names: List[String]) : Config = {
38+
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
39+
NamedObjectsTestJobConfig.DELETE+" = [" + names.mkString(", ") + "]")
40+
}
41+
2442
describe("NamedObjects (RDD)") {
2543
it("should survive from one job to another one") {
2644

27-
manager ! JobManagerActor.Initialize(daoActor, None)
28-
expectMsgClass(classOf[JobManagerActor.Initialized])
29-
30-
uploadTestJar()
31-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
32-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(true)),
33-
errorEvents ++ syncEvents)
45+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, true), errorEvents ++ syncEvents)
3446
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
3547
names should contain("rdd1")
3648

37-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
38-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
39-
errorEvents ++ syncEvents)
49+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
4050
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
4151

4252
names2 should contain("rdd1")
4353
names2 should not contain("df1")
4454

4555
//clean-up
46-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.DELETE, ConfigValueFactory.fromIterable(List("rdd1")))
47-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
48-
errorEvents ++ syncEvents)
56+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1")), errorEvents ++ syncEvents)
4957
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
5058

5159
names3 should not contain("rdd1")
@@ -55,57 +63,39 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
5563

5664
describe("NamedObjects (DataFrame)") {
5765
it("should survive from one job to another one") {
58-
manager ! JobManagerActor.Initialize(daoActor, None)
59-
expectMsgClass(classOf[JobManagerActor.Initialized])
6066

61-
uploadTestJar()
62-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(true))
63-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
64-
errorEvents ++ syncEvents)
67+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, false), errorEvents ++ syncEvents)
6568
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
6669

6770
names should contain("df1")
6871

69-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
70-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
71-
errorEvents ++ syncEvents)
72+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
7273
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
7374

7475
names2 should equal(names)
7576

7677
//clean-up
77-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.DELETE,
78-
ConfigValueFactory.fromIterable(List("df1"))),
79-
errorEvents ++ syncEvents)
78+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("df1")), errorEvents ++ syncEvents)
8079
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
8180
}
8281
}
8382

8483
describe("NamedObjects (DataFrame + RDD)") {
8584
it("should survive from one job to another one") {
86-
manager ! JobManagerActor.Initialize(daoActor, None)
87-
expectMsgClass(classOf[JobManagerActor.Initialized])
8885

89-
uploadTestJar()
90-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(true))
91-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(true)),
92-
errorEvents ++ syncEvents)
86+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true), errorEvents ++ syncEvents)
9387
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
9488

9589
names should contain("rdd1")
9690
names should contain("df1")
9791

98-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.CREATE_DF, ConfigValueFactory.fromAnyRef(false))
99-
.withValue(NamedObjectsTestJobConfig.CREATE_RDD, ConfigValueFactory.fromAnyRef(false)),
100-
errorEvents ++ syncEvents)
92+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
10193
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
10294

10395
names2 should equal(names)
10496

10597
//clean-up
106-
manager ! JobManagerActor.StartJob("demo", jobName, emptyConfig.withValue(NamedObjectsTestJobConfig.DELETE,
107-
ConfigValueFactory.fromIterable(List("rdd1", "df1"))),
108-
errorEvents ++ syncEvents)
98+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1")), errorEvents ++ syncEvents)
10999
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
110100
}
111101
}

job-server/src/spark.jobserver/JobManagerActor.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
9595
protected var resultActor: ActorRef = _
9696
private var daoActor: ActorRef = _
9797

98+
private val jobServerNamedObjects = new JobServerNamedObjects(context.system)
9899

99100
override def postStop() {
100101
logger.info("Shutting down SparkContext {}", contextName)
@@ -257,7 +258,7 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
257258
if (job.isInstanceOf[NamedObjectSupport]) {
258259
val namedObjects = job.asInstanceOf[NamedObjectSupport].namedObjectsPrivate
259260
if (namedObjects.get() == null) {
260-
namedObjects.compareAndSet(null, new JobServerNamedObjects(context.system))
261+
namedObjects.compareAndSet(null, jobServerNamedObjects)
261262
}
262263
}
263264

job-server/src/spark.jobserver/JobServerNamedObjects.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ import spray.util._
1818
*/
1919
class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
2020

21-
import JobServerNamedObjects._
22-
2321
val logger = LoggerFactory.getLogger(getClass)
2422

2523
implicit val ec: ExecutionContext = system.dispatcher
@@ -32,6 +30,11 @@ class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
3230
config.getDuration("spark.jobserver.named-object-creation-timeout",
3331
SECONDS), SECONDS)
3432

33+
// we must store a reference to each NamedObject even though only its ID is used here
34+
// this reference prevents the object from being GCed and cleaned by sparks ContextCleaner
35+
// or some other GC for other types of objects
36+
private val namesToObjects: Cache[NamedObject] = LruCache()
37+
3538
override def getOrElseCreate[O <: NamedObject](name: String, objGen: => O)
3639
(implicit timeout: Timeout = defaultTimeout,
3740
persister: NamedObjectPersister[O]): O = {
@@ -93,15 +96,5 @@ class JobServerNamedObjects(system: ActorSystem) extends NamedObjects {
9396
case answer: Iterable[String] @unchecked => answer
9497
}
9598
}
96-
}
9799

98-
/**
99-
* companion object that hold reference to cache with named object so that we can reference
100-
* named objects across jobs
101-
*/
102-
object JobServerNamedObjects {
103-
// we must store a reference to each NamedObject even though only its ID is used here
104-
// this reference prevents the object from being GCed and cleaned by sparks ContextCleaner
105-
// or some other GC for other types of objects
106-
val namesToObjects: Cache[NamedObject] = LruCache()
107100
}

0 commit comments

Comments
 (0)