Skip to content

Commit 5d7a35c

Browse files
carbribicarbribi
authored andcommitted
Adding NamedBroadcasts with persister
1 parent d2721e6 commit 5d7a35c

File tree

3 files changed

+69
-8
lines changed

3 files changed

+69
-8
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package spark.jobserver
2+
3+
import org.apache.spark.broadcast.Broadcast
4+
5+
/**
6+
* wrapper for named objects of type Broadcast
7+
*/
8+
case class NamedBroadcast[T](broadcast: Broadcast[T]) extends NamedObject
9+
10+
/**
11+
* implementation of a NamedObjectPersister for Broadcast objects
12+
*/
13+
class BroadcastPersister[T] extends NamedObjectPersister[NamedBroadcast[T]] {
14+
override def persist(namedObj: NamedBroadcast[T], name: String) {
15+
namedObj match {
16+
case NamedBroadcast(broadcast) =>{}
17+
}
18+
}
19+
override def unpersist(namedObj: NamedBroadcast[T]) {
20+
namedObj match {
21+
case NamedBroadcast(broadcast) =>
22+
broadcast.unpersist(blocking = false)
23+
}
24+
}
25+
/**
26+
* @param namedBroadcast the NamedBroadcast to refresh
27+
*/
28+
override def refresh(namedBroadcast: NamedBroadcast[T]): NamedBroadcast[T] = namedBroadcast match {
29+
case NamedBroadcast(broadcast) =>
30+
namedBroadcast
31+
}
32+
}

job-server-extras/src/spark.jobserver/NamedObjectsTestJob.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
1515
import NamedObjectsTestJobConfig._
1616
implicit def rddPersister: NamedObjectPersister[NamedRDD[Row]] = new RDDPersister[Row]
1717
implicit def dataFramePersister: NamedObjectPersister[NamedDataFrame] = new DataFramePersister
18+
implicit def broadcastPersister[U]: NamedObjectPersister[NamedBroadcast[U]] = new BroadcastPersister[U]
1819

1920
def validate(sql: SparkContext, config: Config): SparkJobValidation = SparkJobValid
2021

@@ -35,6 +36,11 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
3536
namedObjects.update("rdd1", NamedRDD(rows(sc), true, StorageLevel.MEMORY_ONLY))
3637
}
3738

39+
if (config.hasPath(CREATE_BROADCAST)){
40+
val broadcast = sc.broadcast(Set(1,2,3,4,5))
41+
namedObjects.update("broadcast1", NamedBroadcast(broadcast))
42+
}
43+
3844
if (config.hasPath(DELETE)) {
3945
val iter = config.getStringList(DELETE).iterator
4046
while (iter.hasNext) {
@@ -49,6 +55,7 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
4955
object NamedObjectsTestJobConfig {
5056
val CREATE_DF = "createDF"
5157
val CREATE_RDD = "createRDD"
58+
val CREATE_BROADCAST = "createBroadcast"
5259
val DELETE = "delete"
5360
}
5461

job-server-extras/test/spark.jobserver/NamedObjectsJobSpec.scala

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
2828

2929
val jobName = "spark.jobserver.NamedObjectsTestJob"
3030

31-
private def getCreateConfig(createDF: Boolean, createRDD: Boolean) : Config = {
31+
private def getCreateConfig(createDF: Boolean, createRDD: Boolean, createBroadcast: Boolean) : Config = {
3232
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
3333
NamedObjectsTestJobConfig.CREATE_DF + " = " + createDF + ", " +
34-
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD)
34+
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD + ", " +
35+
NamedObjectsTestJobConfig.CREATE_BROADCAST + " = " + createBroadcast)
3536
}
3637

3738
private def getDeleteConfig(names: List[String]) : Config = {
@@ -42,11 +43,11 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
4243
describe("NamedObjects (RDD)") {
4344
it("should survive from one job to another one") {
4445

45-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, true), errorEvents ++ syncEvents)
46+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, true, false), errorEvents ++ syncEvents)
4647
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
4748
names should contain("rdd1")
4849

49-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
50+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false, false), errorEvents ++ syncEvents)
5051
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
5152

5253
names2 should contain("rdd1")
@@ -64,12 +65,12 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
6465
describe("NamedObjects (DataFrame)") {
6566
it("should survive from one job to another one") {
6667

67-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, false), errorEvents ++ syncEvents)
68+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, false, false), errorEvents ++ syncEvents)
6869
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
6970

7071
names should contain("df1")
7172

72-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
73+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false, false), errorEvents ++ syncEvents)
7374
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
7475

7576
names2 should equal(names)
@@ -83,13 +84,13 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
8384
describe("NamedObjects (DataFrame + RDD)") {
8485
it("should survive from one job to another one") {
8586

86-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true), errorEvents ++ syncEvents)
87+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true, false), errorEvents ++ syncEvents)
8788
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
8889

8990
names should contain("rdd1")
9091
names should contain("df1")
9192

92-
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
93+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false, false), errorEvents ++ syncEvents)
9394
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
9495

9596
names2 should equal(names)
@@ -100,4 +101,25 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
100101
}
101102
}
102103

104+
describe("NamedObjects (Broadcast)") {
105+
it("should survive from one job to another one") {
106+
107+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true, true), errorEvents ++ syncEvents)
108+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
109+
110+
names should contain("rdd1")
111+
names should contain("df1")
112+
names should contain("broadcast1")
113+
114+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false, false), errorEvents ++ syncEvents)
115+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
116+
117+
names2 should equal(names)
118+
119+
//clean-up
120+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1", "broadcast1"))
121+
, errorEvents ++ syncEvents)
122+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
123+
}
124+
}
103125
}

0 commit comments

Comments
 (0)