Skip to content

Commit 2064947

Browse files
committed
Merge pull request spark-jobserver#415 from hntd187/master
Added ability to add to Spark's Hadoop configuration
2 parents ec32d8b + 65ef545 commit 2064947

File tree

6 files changed

+78
-9
lines changed

6 files changed

+78
-9
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,14 @@ To pass settings directly to the sparkConf that do not use the "spark." prefix "
600600
}
601601
}
602602
603+
To add to the underlying Hadoop configuration in a Spark context, add the hadoop section to the context settings
604+
605+
spark.context-settings {
606+
hadoop {
607+
mapreduce.framework.name = "Foo"
608+
}
609+
}
610+
603611
For the exact context configuration parameters, see JobManagerActor docs as well as application.conf.
604612
605613
Also see the [yarn doc](doc/yarn.md) for more tips.

job-server/src/main/resources/application.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ spark {
127127
passthrough {
128128
spark.driver.allowMultipleContexts = true # Ignore the Multiple context exception related with SPARK-2243
129129
}
130+
131+
#This adds configuration to the underlying Hadoop configuration in the Spark Context
132+
#hadoop {
133+
# mapreduce.framework.name = "FooFramework"
134+
#}
130135
}
131136
}
132137

job-server/src/spark.jobserver/JobManagerActor.scala

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package spark.jobserver
22

33
import java.util.concurrent.Executors._
4-
import akka.actor.{ActorRef, Props, PoisonPill}
4+
5+
import akka.actor.{ActorRef, PoisonPill, Props}
56
import com.typesafe.config.Config
67
import java.net.{URI, URL}
78
import java.util.concurrent.atomic.AtomicInteger
9+
810
import ooyala.common.akka.InstrumentedActor
9-
import org.apache.spark.{ SparkEnv, SparkContext }
11+
import org.apache.hadoop.conf.Configuration
12+
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
1013
import org.joda.time.DateTime
11-
import scala.concurrent.{ Future, ExecutionContext }
14+
15+
import scala.concurrent.{ExecutionContext, Future}
1216
import scala.util.{Failure, Success, Try}
1317
import spark.jobserver.ContextSupervisor.StopContext
14-
import spark.jobserver.io.{JobDAOActor, JobDAO, JobInfo, JarInfo}
18+
import spark.jobserver.io.{JarInfo, JobDAO, JobDAOActor, JobInfo}
1519
import spark.jobserver.util.{ContextURLClassLoader, SparkJobUtils}
1620

1721
object JobManagerActor {
@@ -20,9 +24,11 @@ object JobManagerActor {
2024
case class StartJob(appName: String, classPath: String, config: Config,
2125
subscribedEvents: Set[Class[_]])
2226
case class KillJob(jobId: String)
27+
case object GetContextConfig
2328
case object SparkContextStatus
2429

2530
// Results/Data
31+
case class ContextConfig(contextName: String, contextConfig: SparkConf, hadoopConfig: Configuration)
2632
case class Initialized(contextName: String, resultActor: ActorRef)
2733
case class InitError(t: Throwable)
2834
case class JobLoadingError(err: Throwable)
@@ -152,7 +158,23 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
152158
sender ! SparkContextAlive
153159
} catch {
154160
case e: Exception => {
155-
logger.error("SparkContext is not exist!")
161+
logger.error("SparkContext does not exist!")
162+
sender ! SparkContextDead
163+
}
164+
}
165+
}
166+
}
167+
case GetContextConfig => {
168+
if (jobContext.sparkContext == null) {
169+
sender ! SparkContextDead
170+
} else {
171+
try {
172+
val conf: SparkConf = jobContext.sparkContext.getConf
173+
val hadoopConf: Configuration = jobContext.sparkContext.hadoopConfiguration
174+
sender ! ContextConfig(jobContext.sparkContext.appName, conf, hadoopConf)
175+
} catch {
176+
case e: Exception => {
177+
logger.error("SparkContext does not exist!")
156178
sender ! SparkContextDead
157179
}
158180
}

job-server/src/spark.jobserver/context/SparkContextFactory.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ class DefaultSparkContextFactory extends SparkContextFactory {
5050
type C = SparkContext with ContextLike
5151

5252
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
53-
new SparkContext(sparkConf) with ContextLike {
53+
val sc = new SparkContext(sparkConf) with ContextLike {
5454
def sparkContext: SparkContext = this
5555
def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SparkJob]
5656
}
57+
for ((k, v) <- SparkJobUtils.getHadoopConfig(config)) sc.hadoopConfiguration.set(k, v)
58+
sc
5759
}
5860
}

job-server/src/spark.jobserver/util/SparkJobUtils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ object SparkJobUtils {
7373
conf
7474
}
7575

76+
/**
77+
*
78+
* @param config the specific context configuration
79+
* @return a map of the hadoop configuration values or an empty Map
80+
*/
81+
def getHadoopConfig(config: Config): Map[String, String] = {
82+
Try(config.getConfig("hadoop").entrySet().asScala.map { e =>
83+
e.getKey -> e.getValue.unwrapped().toString
84+
}.toMap).getOrElse(Map())
85+
}
86+
7687
/**
7788
* Returns the maximum number of jobs that can run at the same time
7889
*/

job-server/test/spark.jobserver/LocalContextSupervisorSpec.scala

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package spark.jobserver
22

33
import akka.actor._
4-
import akka.testkit.{TestKit, ImplicitSender}
4+
import akka.testkit.{ImplicitSender, TestKit}
55
import com.typesafe.config.ConfigFactory
6-
import spark.jobserver.io.{JobDAOActor, JobDAO}
7-
import org.scalatest.{Matchers, FunSpecLike, BeforeAndAfterAll, BeforeAndAfter}
6+
import org.apache.hadoop.conf.Configuration
7+
import org.apache.spark.SparkConf
8+
import spark.jobserver.io.{JobDAO, JobDAOActor}
9+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSpecLike, Matchers}
810

911
import scala.concurrent.duration._
1012

@@ -36,6 +38,9 @@ object LocalContextSupervisorSpec {
3638
spark.driver.allowMultipleContexts = true
3739
spark.ui.enabled = false
3840
}
41+
hadoop {
42+
mapreduce.framework.name = "ayylmao"
43+
}
3944
}
4045
}
4146
akka.log-dead-letters = 0
@@ -71,6 +76,7 @@ class LocalContextSupervisorSpec extends TestKit(LocalContextSupervisorSpec.syst
7176
}
7277

7378
import ContextSupervisor._
79+
import JobManagerActor._
7480

7581
describe("context management") {
7682
it("should list empty contexts at startup") {
@@ -99,6 +105,21 @@ class LocalContextSupervisorSpec extends TestKit(LocalContextSupervisorSpec.syst
99105
rActor.path.toString should not include ("global")
100106
}
101107

108+
it("should be able to get context configs") {
109+
supervisor ! AddContext("c1", contextConfig)
110+
expectMsg(ContextInitialized)
111+
supervisor ! GetContext("c1")
112+
expectMsgPF(5 seconds, "I can't find that context :'-(") {
113+
case (contextActor: ActorRef, resultActor: ActorRef) => {
114+
contextActor ! GetContextConfig
115+
val cc = expectMsgClass(classOf[ContextConfig])
116+
cc.contextName shouldBe "c1"
117+
cc.contextConfig.get("spark.ui.enabled") shouldBe "false"
118+
cc.hadoopConfig.get("mapreduce.framework.name") shouldBe "ayylmao"
119+
}
120+
}
121+
}
122+
102123
it("should be able to stop contexts already running") {
103124
supervisor ! AddContext("c1", contextConfig)
104125
expectMsg(ContextInitialized)

0 commit comments

Comments
 (0)