Skip to content

Commit fdaef15

Browse files
committed
Merge pull request spark-jobserver#404 from CBribiescas/master
Adding NamedBroadcasts with persister
2 parents d2721e6 + df9ad00 commit fdaef15

File tree

4 files changed

+63
-3
lines changed

4 files changed

+63
-3
lines changed

doc/yarn.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
(I would like to thank Jon Buffington for sharing the config tips below.... @velvia)
66

7+
Note: This is for yarn with docker. If you are looking to deploy on a yarn cluster via EMR, then this link would be more useful [EMR](https://github.com/spark-jobserver/spark-jobserver/blob/master/doc/EMR.md)
8+
79
### Configuring the Spark-Jobserver Docker package to run in Yarn-Client Mode
810

911
To run the Spark-Jobserver in yarn-client mode you have to do a little bit extra of configuration.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package spark.jobserver
2+
3+
import org.apache.spark.broadcast.Broadcast
4+
5+
/**
6+
* wrapper for named objects of type Broadcast
7+
*/
8+
case class NamedBroadcast[T](broadcast: Broadcast[T]) extends NamedObject
9+
10+
/**
11+
* implementation of a NamedObjectPersister for Broadcast objects
12+
*/
13+
class BroadcastPersister[T] extends NamedObjectPersister[NamedBroadcast[T]] {
14+
override def persist(namedObj: NamedBroadcast[T], name: String) {
15+
}
16+
override def unpersist(namedObj: NamedBroadcast[T]) {
17+
namedObj match {
18+
case NamedBroadcast(broadcast) =>
19+
broadcast.unpersist(blocking = false)
20+
}
21+
}
22+
/**
23+
* @param namedBroadcast the NamedBroadcast to refresh
24+
*/
25+
override def refresh(namedBroadcast: NamedBroadcast[T]): NamedBroadcast[T] = namedBroadcast match {
26+
case NamedBroadcast(broadcast) =>
27+
namedBroadcast
28+
}
29+
}

job-server-extras/src/spark.jobserver/NamedObjectsTestJob.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
1515
import NamedObjectsTestJobConfig._
1616
implicit def rddPersister: NamedObjectPersister[NamedRDD[Row]] = new RDDPersister[Row]
1717
implicit def dataFramePersister: NamedObjectPersister[NamedDataFrame] = new DataFramePersister
18+
implicit def broadcastPersister[U]: NamedObjectPersister[NamedBroadcast[U]] = new BroadcastPersister[U]
1819

1920
def validate(sql: SparkContext, config: Config): SparkJobValidation = SparkJobValid
2021

@@ -35,6 +36,11 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
3536
namedObjects.update("rdd1", NamedRDD(rows(sc), true, StorageLevel.MEMORY_ONLY))
3637
}
3738

39+
if (config.hasPath(CREATE_BROADCAST)){
40+
val broadcast = sc.broadcast(Set(1,2,3,4,5))
41+
namedObjects.update("broadcast1", NamedBroadcast(broadcast))
42+
}
43+
3844
if (config.hasPath(DELETE)) {
3945
val iter = config.getStringList(DELETE).iterator
4046
while (iter.hasNext) {
@@ -49,6 +55,7 @@ class NamedObjectsTestJob extends SparkJob with NamedObjectSupport {
4955
object NamedObjectsTestJobConfig {
5056
val CREATE_DF = "createDF"
5157
val CREATE_RDD = "createRDD"
58+
val CREATE_BROADCAST = "createBroadcast"
5259
val DELETE = "delete"
5360
}
5461

job-server-extras/test/spark.jobserver/NamedObjectsJobSpec.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
2828

2929
val jobName = "spark.jobserver.NamedObjectsTestJob"
3030

31-
private def getCreateConfig(createDF: Boolean, createRDD: Boolean) : Config = {
32-
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
31+
private def getCreateConfig(createDF: Boolean, createRDD: Boolean, createBroadcast: Boolean = false) : Config = {
32+
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
3333
NamedObjectsTestJobConfig.CREATE_DF + " = " + createDF + ", " +
34-
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD)
34+
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD + ", " +
35+
NamedObjectsTestJobConfig.CREATE_BROADCAST + " = " + createBroadcast)
3536
}
3637

3738
private def getDeleteConfig(names: List[String]) : Config = {
@@ -100,4 +101,25 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
100101
}
101102
}
102103

104+
describe("NamedObjects (Broadcast)") {
105+
it("should survive from one job to another one") {
106+
107+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true, true), errorEvents ++ syncEvents)
108+
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
109+
110+
names should contain("rdd1")
111+
names should contain("df1")
112+
names should contain("broadcast1")
113+
114+
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false, false), errorEvents ++ syncEvents)
115+
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])
116+
117+
names2 should equal(names)
118+
119+
//clean-up
120+
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1", "broadcast1"))
121+
, errorEvents ++ syncEvents)
122+
val JobResult(_, names3: Array[String]) = expectMsgClass(classOf[JobResult])
123+
}
124+
}
103125
}

0 commit comments

Comments
 (0)