Skip to content

Commit ca0e237

Browse files
zromi18velvia
authored andcommitted
fix(job-server-api,job-server,akka-app): fixed starting a new job does not determine proxy user issue spark-jobserver#827 (spark-jobserver#854)
Issues: Starting a new job does not determine proxy user spark-jobserver#827 Fixes: Added determine proxy user code to adHoc context creation and job starting Pull Request checklist: The commit(s) message(s) follows the contribution guidelines ? Tests for the changes have been added (for bug fixes / features) ? [No] Docs have been added / updated (for bug fixes / features) ? Current behavior : After context is created by proxy-user then starting a job for such a context fails or the context name has to be prepended with user name and separator (~) New behavior : After context is created by proxy-user then starting a job for such a context with just its name succeeds, AdHoc context names are internally prepended with user name and separator (~) * fix(job-server-api,job-server,akka-app): changes starting SparkJobs with adHoc and explicit context * Move userNamePrefix, removeProxyUserPrefix, NameContextDelimiter and regRexPart2 from WebApi to SparkJobUtils * Change jobRoutes implementation in WebApi, so it checks proxy user * Change adHoc context starting in AkkaClusterSupervisorActor and LocalContextSupervisorActor, so it handles proxy user * Add tests to LocalContextSupervisorSpec covering adHoc context starting * Change WebApiWithAuthenticationSpec test as a consequence of moving class members from WebApi to SparkJobUtils * Added shiro.basic.ini, WebApiSecuredSpec and WebApiSecuredRoutesSpec to job-server/tests
1 parent 362dfff commit ca0e237

File tree

27 files changed

+266
-180
lines changed

27 files changed

+266
-180
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ config/*.conf
1515
config/*.sh
1616
config/*.ini
1717
job-server/config/*.conf
18-
job-server/config/*.sh
1918
job-server/config/*.ini
2019
job-server-extras/spark-warehouse/
2120
metastore_db/

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ Spark Job Server is now included in Datastax Enterprise 4.8!
8989

9090
- *"Spark as a Service"*: Simple REST interface (including HTTPS) for all aspects of job, context management
9191
- Support for Spark SQL, Hive, Streaming Contexts/jobs and custom job contexts! See [Contexts](doc/contexts.md).
92-
- [Python](doc/python.md), Scala, and Java (see [TestJob.java](https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/java/spark/jobserver/api/TestJob.java)) support
92+
- [Python](doc/python.md), Scala, and [Java](doc/javaapi.md) (see [TestJob.java](https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/java/spark/jobserver/api/TestJob.java)) support
9393
- LDAP Auth support via Apache Shiro integration
9494
- Separate JVM per SparkContext for isolation (EXPERIMENTAL)
9595
- Supports sub-second low-latency jobs via long-running job contexts
@@ -602,7 +602,7 @@ jdbc {
602602
user = "secret"
603603
password = "secret"
604604
}
605-
```
605+
```
606606
607607
### Chef
608608
@@ -629,14 +629,14 @@ Flow diagrams are checked in in the doc/ subdirectory. .diagram files are for w
629629
GET /binaries - lists all current binaries
630630
POST /binaries/<appName> - upload a new binary file
631631
DELETE /binaries/<appName> - delete defined binary
632-
632+
633633
When POSTing new binaries, the content-type header must be set to one of the types supported by the subclasses of the `BinaryType` trait. e.g. "application/java-archive" or application/python-archive"
634634
635635
### Jars (deprecated)
636636
637637
GET /jars - lists all the jars and the last upload timestamp
638638
POST /jars/<appName> - uploads a new jar under <appName>
639-
639+
640640
These routes are kept for legacy purposes but are deprecated in favour of the /binaries routes
641641
642642
### Contexts
@@ -807,8 +807,8 @@ for instance: `sbt ++2.11.6 job-server/compile`
807807
- You may need to set `SPARK_LOCAL_IP` to `localhost` to ensure Akka port can bind successfully
808808
- Note for Windows users: very few tests fail on Windows. Thus, run `testOnly -- -l WindowsIgnore` from SBT shell to ignore them.
809809
- Logging for tests goes to "job-server-test.log"
810-
- Run `scoverage:test` to check the code coverage and improve it.
811-
- Windows users: run `; coverage ; testOnly -- -l WindowsIgnore ; coverageReport` from SBT shell.
810+
- Run `scoverage:test` to check the code coverage and improve it.
811+
- Windows users: run `; coverage ; testOnly -- -l WindowsIgnore ; coverageReport` from SBT shell.
812812
- Please run scalastyle to ensure your code changes don't break the style guide.
813813
- Do "reStart" from SBT for quick restarts of the job server process
814814
- Please update the g8 template if you change the SparkJob API

akka-app/src/test/scala/spark/jobserver/common/akka/ActorMetricsSpec.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package spark.jobserver.common.akka
22

3-
import org.scalatest.{Matchers, FunSpec}
4-
import akka.testkit.TestActorRef
3+
import org.scalatest.{BeforeAndAfterAll, Matchers, FunSpec}
4+
import akka.testkit.{TestKit, TestActorRef}
55

66
import akka.actor.{Actor, ActorSystem}
77

88

9-
class ActorMetricsSpec extends FunSpec with Matchers {
9+
class ActorMetricsSpec extends FunSpec with Matchers with BeforeAndAfterAll {
1010
implicit val system = ActorSystem("test")
1111

12+
override def afterAll(): Unit = {
13+
TestKit.shutdownActorSystem(system)
14+
}
15+
1216
describe("actor metrics") {
1317
it("should increment receive count metric when a message is received") {
1418
val actorRef = TestActorRef(new DummyActor with ActorMetrics)

akka-app/src/test/scala/spark/jobserver/common/akka/ActorStackSpec.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package spark.jobserver.common.akka
22

3-
import org.scalatest.{Matchers, FunSpec}
4-
import akka.testkit.TestActorRef
3+
import org.scalatest.{BeforeAndAfterAll, Matchers, FunSpec}
4+
import akka.testkit.{TestKit, TestActorRef}
55

66
import akka.actor.{Actor, ActorSystem}
77

@@ -22,9 +22,13 @@ trait AddPrefix extends ActorStack {
2222
}
2323
}
2424

25-
class ActorStackSpec extends FunSpec with Matchers {
25+
class ActorStackSpec extends FunSpec with Matchers with BeforeAndAfterAll {
2626
implicit val system = ActorSystem("test")
2727

28+
override def afterAll(): Unit = {
29+
TestKit.shutdownActorSystem(system)
30+
}
31+
2832
describe("stacking traits") {
2933
it("should be able to stack traits and receive messages") {
3034
val actorRef = TestActorRef(new DummyActor with AddPrefix)

akka-app/src/test/scala/spark/jobserver/common/akka/actor/ReaperSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package spark.jobserver.common.akka.actor
33
import akka.actor.{ActorSystem, Props, ActorRef}
44
import akka.testkit.{TestKit, ImplicitSender, TestProbe}
55
import org.scalatest.{MustMatchers, FunSpecLike, BeforeAndAfterAll}
6+
import spark.jobserver.common.akka.AkkaTestUtils
67

78
// Our test reaper. Sends the snooper a message when all
89
// the souls have been reaped
@@ -19,7 +20,7 @@ class ReaperSpec extends TestKit(ActorSystem("ReaperSpec")) with ImplicitSender
1920
import scala.concurrent.duration._
2021

2122
override def afterAll() {
22-
system.shutdown()
23+
AkkaTestUtils.shutdownAndWait(system)
2324
}
2425

2526
describe("Reaper") {

akka-app/src/test/scala/spark/jobserver/common/akka/web/CommonRoutesSpec.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package spark.jobserver.common.akka.web
22

33
import java.util.concurrent.TimeUnit
44

5+
import akka.testkit.TestKit
56
import org.scalatest.{Matchers, FunSpec}
67
import spray.testkit.ScalatestRouteTest
78

@@ -32,6 +33,10 @@ class CommonRoutesSpec extends FunSpec with Matchers with ScalatestRouteTest wit
3233
val timerMap = Map("type" -> "timer", "rate" -> (meterMap - "type"),
3334
"duration" -> (histMap ++ Map("units" -> "milliseconds") - "type"))
3435

36+
override def afterAll():Unit = {
37+
TestKit.shutdownActorSystem(system)
38+
}
39+
3540
describe("/metricz route") {
3641
it("should report all metrics") {
3742
Get("/metricz") ~> commonRoutes ~> check {

build.sbt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ lazy val dockerSettings = Seq(
150150
apt-get -y update && \
151151
apt-get -y install mesos=${MESOS_VERSION} && \
152152
apt-get clean
153-
""")
153+
""")
154154
env("MAVEN_VERSION","3.3.9")
155155
runRaw(
156156
"""mkdir -p /usr/share/maven /usr/share/maven/ref \
@@ -160,7 +160,7 @@ lazy val dockerSettings = Seq(
160160
""")
161161
env("MAVEN_HOME","/usr/share/maven")
162162
env("MAVEN_CONFIG", "/.m2")
163-
163+
164164
copy(artifact, artifactTargetPath)
165165
copy(baseDirectory(_ / "bin" / "server_start.sh").value, file("app/server_start.sh"))
166166
copy(baseDirectory(_ / "bin" / "server_stop.sh").value, file("app/server_stop.sh"))
@@ -192,14 +192,14 @@ lazy val dockerSettings = Seq(
192192
},
193193
imageNames in docker := Seq(
194194
sbtdocker.ImageName(namespace = Some("velvia"),
195-
repository = "spark-jobserver",
196-
tag = Some(
197-
s"${version.value}" +
198-
s".mesos-${Versions.mesos.split('-')(0)}" +
199-
s".spark-${Versions.spark}" +
200-
s".scala-${scalaBinaryVersion.value}" +
201-
s".jdk-${Versions.java}")
202-
)
195+
repository = "spark-jobserver",
196+
tag = Some(
197+
s"${version.value}" +
198+
s".mesos-${Versions.mesos.split('-')(0)}" +
199+
s".spark-${Versions.spark}" +
200+
s".scala-${scalaBinaryVersion.value}" +
201+
s".jdk-${Versions.java}")
202+
)
203203
)
204204
)
205205

job-server-api/src/main/scala/spark/jobserver/context/SparkContextFactory.scala

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,24 @@ case object JobWrongType extends LoadingError
2020
case class JobLoadError(ex: Exception) extends LoadingError
2121

2222
/**
23-
* Factory trait for creating a SparkContext or any derived Contexts,
24-
* such as SQLContext, StreamingContext, HiveContext, etc.
25-
* My implementing classes can be dynamically loaded using classloaders to ensure that the entire
26-
* SparkContext has access to certain dynamically loaded classes, for example, job jars.
27-
* Also, this is capable of loading jobs which don't necessarily implement SparkJobBase, or even
28-
* Python jobs etc., so long as a wrapping SparkJobBase is returned.
29-
*/
23+
* Factory trait for creating a SparkContext or any derived Contexts,
24+
* such as SQLContext, StreamingContext, HiveContext, etc.
25+
* My implementing classes can be dynamically loaded using classloaders to ensure that the entire
26+
* SparkContext has access to certain dynamically loaded classes, for example, job jars.
27+
* Also, this is capable of loading jobs which don't necessarily implement SparkJobBase, or even
28+
* Python jobs etc., so long as a wrapping SparkJobBase is returned.
29+
*/
3030
trait SparkContextFactory {
3131
import SparkJobUtils._
3232

3333
type C <: ContextLike
3434
type J <: JobContainer
3535

3636
/**
37-
* Loads the job of the given appName, version, and class path, and validates that it is
38-
* the right type of job given the current context type. For example, it may load a JAR
39-
* and validate the classpath exists and try to invoke its constructor.
40-
*/
37+
* Loads the job of the given appName, version, and class path, and validates that it is
38+
* the right type of job given the current context type. For example, it may load a JAR
39+
* and validate the classpath exists and try to invoke its constructor.
40+
*/
4141
def loadAndValidateJob(appName: String,
4242
uploadTime: DateTime,
4343
classPath: String,
@@ -53,12 +53,12 @@ trait SparkContextFactory {
5353
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C
5454

5555
/**
56-
* Creates a SparkContext or derived context.
57-
* @param config the overall system / job server Typesafe Config
58-
* @param contextConfig the config specific to this particular context
59-
* @param contextName the name of the context to start
60-
* @return the newly created context.
61-
*/
56+
* Creates a SparkContext or derived context.
57+
* @param config the overall system / job server Typesafe Config
58+
* @param contextConfig the config specific to this particular context
59+
* @param contextName the name of the context to start
60+
* @return the newly created context.
61+
*/
6262
def makeContext(config: Config, contextConfig: Config, contextName: String): C = {
6363
val sparkConf = configToSparkConf(config, contextConfig, contextName)
6464
makeContext(sparkConf, contextConfig, contextName)
@@ -70,8 +70,8 @@ case class ScalaJobContainer(job: api.SparkJobBase) extends JobContainer {
7070
}
7171

7272
/**
73-
* A SparkContextFactory designed for Scala and Java jobs that loads jars
74-
*/
73+
* A SparkContextFactory designed for Scala and Java jobs that loads jars
74+
*/
7575
trait ScalaContextFactory extends SparkContextFactory {
7676
type J = ScalaJobContainer
7777

@@ -95,12 +95,12 @@ trait ScalaContextFactory extends SparkContextFactory {
9595
}
9696

9797
/**
98-
* Returns true if the job is valid for this context.
99-
* At the minimum this should check for if the job can actually take a context of this type;
100-
* for example, a SQLContext should only accept jobs that take a SQLContext.
101-
* The recommendation is to define a trait for each type of context job; the standard
102-
* [[DefaultSparkContextFactory]] checks to see if the job is of type [[SparkJob]].
103-
*/
98+
* Returns true if the job is valid for this context.
99+
* At the minimum this should check for if the job can actually take a context of this type;
100+
* for example, a SQLContext should only accept jobs that take a SQLContext.
101+
* The recommendation is to define a trait for each type of context job; the standard
102+
* [[DefaultSparkContextFactory]] checks to see if the job is of type [[SparkJob]].
103+
*/
104104
def isValidJob(job: api.SparkJobBase): Boolean
105105
}
106106

@@ -130,12 +130,12 @@ trait JavaContextFactory extends SparkContextFactory {
130130
}
131131

132132
/**
133-
* The default factory creates a standard SparkContext.
134-
* In the future if we want to add additional methods, etc. then we can have additional factories.
135-
* For example a specialized SparkContext to manage RDDs in a user-defined way.
136-
*
137-
* If you create your own SparkContextFactory, please make sure it has zero constructor args.
138-
*/
133+
* The default factory creates a standard SparkContext.
134+
* In the future if we want to add additional methods, etc. then we can have additional factories.
135+
* For example a specialized SparkContext to manage RDDs in a user-defined way.
136+
*
137+
* If you create your own SparkContextFactory, please make sure it has zero constructor args.
138+
*/
139139
class DefaultSparkContextFactory extends ScalaContextFactory {
140140

141141
type C = SparkContext with ContextLike

job-server-api/src/main/scala/spark/jobserver/util/SparkJobUtils.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,41 @@ import scala.util.Try
1313
object SparkJobUtils {
1414
import collection.JavaConverters._
1515

16+
val NameContextDelimiter = "~"
17+
1618
/**
1719
* User impersonation for an already Kerberos authenticated user is supported via the
1820
* `spark.proxy.user` query param
1921
*/
2022
val SPARK_PROXY_USER_PARAM = "spark.proxy.user"
2123

24+
private val regRexPart2 = "([^" + SparkJobUtils.NameContextDelimiter + "]+.*)"
25+
26+
/**
27+
* appends the NameContextDelimiter to the user name and,
28+
* if the user name contains the delimiter as well, then it doubles it so that we can be sure
29+
* that our prefix is unique
30+
*/
31+
def userNamePrefix(userName: String) : String = {
32+
userName.replaceAll(NameContextDelimiter,
33+
NameContextDelimiter + NameContextDelimiter) +
34+
NameContextDelimiter
35+
}
36+
37+
/**
38+
* filter the given context names so that the user may only see his/her own contexts
39+
*/
40+
def removeProxyUserPrefix(userName: => String, contextNames: Seq[String], filter: Boolean): Seq[String] = {
41+
if (filter) {
42+
val RegExPrefix = ("^" + userNamePrefix(userName) + regRexPart2).r
43+
contextNames collect {
44+
case RegExPrefix(cName) => cName
45+
}
46+
} else {
47+
contextNames
48+
}
49+
}
50+
2251
/**
2352
* Creates a SparkConf for initializing a SparkContext based on various configs.
2453
* Note that anything in contextConfig with keys beginning with spark. get
12 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)