Skip to content

Commit 002a05c

Browse files
Update Versions for Spark 2.4
Upgrade Py4j To the Correct Version Spark 2.4 Pyspark Compatiblity Several version bumps of associated libraries. Also includes converting the current subprocess.py and associated PythonJob to use an authentication token when starting Pyspark. The method used for generating the token is the same as inside native Spark. The token is passed via environment variable from the PythonJob class to the subprocess.py process which starts up the python interpreter. Fix Tests Switches warehouse directories per test job since the "DROP TABLE" doesn't seem to work in SPARK 2.4 like it did in Spark 2.2. Extend the timeout on the python Job spec.
1 parent 0b9d55c commit 002a05c

File tree

13 files changed

+81
-34
lines changed

13 files changed

+81
-34
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ lazy val publishSettings = Seq(
297297
autoScalaLibrary := false,
298298
credentials += Credentials(Path.userHome / ".sbt" / ".credentials"),
299299
publishMavenStyle := true,
300-
publishTo := Some(sys.env("MVN_PUBLISH_REPO") at sys.env("MVN_PUBLISH_URL")),
300+
publishTo := Some(sys.env.getOrElse("MVN_PUBLISH_REPO", "NONE")
301+
at sys.env.getOrElse("MVN_PUBLISH_URL", "NONE")),
301302
licenses += ("Apache-2.0", url("http://choosealicense.com/licenses/apache/")),
302303
pomIncludeRepository := { _ => false },
303304
/** Since users are encouraged to use dse-spark-dependencies, which provides most of the needed

job-server-extras/src/main/scala/spark/jobserver/context/SessionContextFactory.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package spark.jobserver.context
22

3+
import java.io.File
4+
import java.nio.file.Files
5+
36
import com.typesafe.config.Config
47
import org.apache.spark.{SparkConf, SparkContext}
58
import org.apache.spark.sql.SparkSession
@@ -19,6 +22,13 @@ class SessionContextFactory extends ScalaContextFactory {
1922

2023
def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SparkSessionJob]
2124

25+
// creates a stub warehouse dir for derby/hive metastore
26+
def makeWarehouseDir(): File = {
27+
val warehouseDir = Files.createTempDirectory("warehouse").toFile()
28+
warehouseDir.delete()
29+
warehouseDir
30+
}
31+
2232
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
2333
val builder = SparkSession.builder()
2434
builder.config(sparkConf).appName(contextName)

job-server-extras/src/main/scala/spark/jobserver/python/PythonJob.scala

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package spark.jobserver.python
22

3+
import java.security.SecureRandom
4+
35
import com.typesafe.config.Config
4-
import org.scalactic.{Good, Every, Or}
6+
import org.apache.spark.SparkConf
7+
import org.scalactic.{Every, Good, Or}
58
import org.slf4j.LoggerFactory
6-
import py4j.GatewayServer
7-
import spark.jobserver.api.{SparkJobBase, ValidationProblem, JobEnvironment}
9+
import spark.jobserver.api.{JobEnvironment, SparkJobBase, ValidationProblem}
10+
import org.spark_project.guava.hash.HashCodes
811

9-
import scala.sys.process.{ProcessLogger, Process}
12+
import scala.sys.process.{Process, ProcessLogger}
1013
import scala.util.{Failure, Success, Try}
1114

1215
case class PythonJob[X <: PythonContextLike](eggPath: String,
@@ -41,6 +44,17 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
4144
runtime: JobEnvironment,
4245
config: Config): Or[Config, Every[ValidationProblem]] = Good(config)
4346

47+
/**
48+
* Copied from Spark private method in 2.4.0
49+
*/
50+
def createSecret(conf: SparkConf): String = {
51+
val bits = conf.getInt("spark.authenticate.secretBitLength", 256)
52+
val rnd = new SecureRandom()
53+
val secretBytes = new Array[Byte](bits / java.lang.Byte.SIZE)
54+
rnd.nextBytes(secretBytes)
55+
HashCodes.fromBytes(secretBytes).toString()
56+
}
57+
4458
/**
4559
* This is the entry point for a Spark Job Server to execute Python jobs.
4660
* It calls a Python subprocess to execute the relevant Python Job class.
@@ -53,7 +67,12 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
5367
override def runJob(sc: X, runtime: JobEnvironment, data: Config): Any = {
5468
logger.info(s"Running $modulePath from $eggPath")
5569
val ep = endpoint(sc, runtime.contextConfig, runtime.jobId, data)
56-
val server = new GatewayServer(ep, 0)
70+
val secret = createSecret(sc.sparkContext.getConf)
71+
val server = new py4j.GatewayServer.GatewayServerBuilder()
72+
.entryPoint(ep)
73+
.javaPort(0)
74+
.authToken(secret)
75+
.build()
5776
val pythonPathDelimiter : String = if (System.getProperty("os.name").indexOf("Win") >= 0) ";" else ":"
5877
val pythonPath = (eggPath +: sc.pythonPath).mkString(pythonPathDelimiter)
5978
logger.info(s"Using Python path of ${pythonPath}")
@@ -66,7 +85,8 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
6685
None,
6786
"EGGPATH" -> eggPath,
6887
"PYTHONPATH" -> pythonPath,
69-
"PYSPARK_PYTHON" -> sc.pythonExecutable)
88+
"PYSPARK_PYTHON" -> sc.pythonExecutable,
89+
"PYSPARK_GATEWAY_SECRET" -> secret)
7090
val err = new StringBuffer
7191
val procLogger =
7292
ProcessLogger(

job-server-extras/src/test/scala/spark/jobserver/SessionJobSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ import scala.concurrent.duration._
1212

1313
class TestSessionContextFactory extends SessionContextFactory {
1414

15-
override def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
15+
override def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
1616
val builder = SparkSession.builder()
1717
builder.config(sparkConf).appName(contextName).master("local")
1818
builder.config("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:myDB;create=true")
1919
builder.config("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
20+
builder.config("spark.sql.warehouse.dir", makeWarehouseDir().getAbsolutePath)
2021
try {
2122
builder.enableHiveSupport()
2223
} catch {

job-server-extras/src/test/scala/spark/jobserver/python/PythonJobManagerSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class PythonJobManagerSpec extends ExtrasJobSpecBase(PythonJobManagerSpec.getNew
3939
"example_jobs.word_count.WordCountSparkSessionJob",
4040
ConfigFactory.parseString("""input.strings = ["a", "b", "a"]"""),
4141
errorEvents ++ syncEvents)
42-
expectMsgPF(3 seconds, "Expected a JobResult or JobErroredOut message!") {
42+
expectMsgPF(10 seconds, "Expected a JobResult or JobErroredOut message!") {
4343
case JobResult(_, x) => x should matchPattern {
4444
case m: java.util.Map[_, _] if m.asScala == Map("b" -> 1, "a" -> 2) =>
4545
}

job-server-python/src/python/run-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/usr/bin/env bash
2-
PYTHONPATH=.:$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH python test/apitests.py
2+
PYTHONPATH=.:$SPARK_HOME/python/lib/pyspark.zip:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH python test/apitests.py
33
exitCode=$?
44
#This sleep is here so that all of Spark's shutdown stdout if written before we exit,
55
#so that we return cleanly to the command prompt.

job-server-python/src/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@
99
url="https://github.com/spark-jobserver/spark-jobserver",
1010
license="Apache License 2.0",
1111
packages=find_packages(exclude=["test*", "example*"]),
12-
install_requires=["pyhocon", "py4j"]
12+
install_requires=["pyhocon", "py4j==0.10.7"]
1313
)

job-server-python/src/python/sparkjobserver/subprocess.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import sys
1919
import os
2020
from importlib import import_module
21-
from py4j.java_gateway import JavaGateway, java_import, GatewayClient
21+
from py4j.java_gateway import JavaGateway, java_import, GatewayClient, GatewayParameters
2222
from pyhocon import ConfigFactory
2323
from pyspark.context import SparkContext, SparkConf
2424
from pyspark.sql import SQLContext, HiveContext, SparkSession
@@ -53,7 +53,10 @@ def import_class(cls):
5353

5454
if __name__ == "__main__":
5555
port = int(sys.argv[1])
56-
gateway = JavaGateway(GatewayClient(port=port), auto_convert=True)
56+
57+
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
58+
parameters = GatewayParameters(port=port, auto_convert=True, auth_token=gateway_secret)
59+
gateway = JavaGateway(gateway_parameters=parameters)
5760
entry_point = gateway.entry_point
5861
imports = entry_point.getPy4JImports()
5962
for i in imports:

job-server-python/src/test/scala/spark/jobserver/python/SubprocessSpec.scala

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package spark.jobserver.python
33
import java.io.File
44
import java.nio.file.Files
55

6-
import com.typesafe.config.{ConfigRenderOptions, Config, ConfigFactory}
6+
import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
77
import org.apache.spark.SparkContext
88
import org.apache.spark.api.java.JavaSparkContext
99
import org.apache.spark.sql.SQLContext
1010
import org.apache.spark.sql.hive.HiveContext
1111
import org.apache.spark.{SparkConf, SparkContext}
12-
import org.scalatest.{BeforeAndAfterAll, Matchers, FunSpec}
12+
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
1313
import py4j.GatewayServer
14+
import py4j.GatewayServer.GatewayServerBuilder
1415

1516
import scala.collection.JavaConverters._
1617
import scala.sys.process.Process
@@ -34,10 +35,10 @@ object SubprocessSpec {
3435
lazy val jobServerPath = getPythonDir("src/python")
3536

3637
lazy val pysparkPath = sys.env.get("SPARK_HOME").map(d => s"$d/python/lib/pyspark.zip")
37-
lazy val py4jPath = sys.env.get("SPARK_HOME").map(d => s"$d/python/lib/py4j-0.10.4-src.zip")
38+
lazy val py4jPath = sys.env.get("SPARK_HOME").map(d => s"$d/python/lib/py4j-0.10.7-src.zip")
3839
lazy val sparkPaths = sys.env.get("SPARK_HOME").map{sh =>
3940
val pysparkPath = s"$sh/python/lib/pyspark.zip"
40-
val py4jPath = s"$sh/python/lib/py4j-0.10.4-src.zip"
41+
val py4jPath = s"$sh/python/lib/py4j-0.10.7-src.zip"
4142
Seq(pysparkPath, py4jPath)
4243
}.getOrElse(Seq())
4344
lazy val originalPythonPath = sys.env.get("PYTHONPATH")
@@ -93,18 +94,28 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
9394
val pathList = Seq(jobServerPath) ++ sparkPaths ++ originalPythonPath.toSeq
9495
val p = pathList.mkString(pythonPathDelimiter)
9596
// Scarman 10-13-2016
96-
//println(p)
97+
println(p)
9798
p
9899
}
99100

101+
lazy val env = Seq (
102+
"PYTHONPATH" -> pythonPath,
103+
"PYSPARK_GATEWAY_SECRET" -> secret
104+
)
105+
100106
// creates a stub warehouse dir for derby/hive metastore
101107
def makeWarehouseDir(): File = {
102108
val warehouseDir = Files.createTempDirectory("warehouse").toFile()
103109
warehouseDir.delete()
104110
warehouseDir
105111
}
112+
val secret = "Test"
106113
def buildGateway(endpoint: TestEndpoint): GatewayServer = {
107-
val server = new GatewayServer(endpoint, 0)
114+
val server = new GatewayServerBuilder()
115+
.entryPoint(endpoint)
116+
.javaPort(0)
117+
.authToken(secret)
118+
.build()
108119
//Server runs asynchronously on a dedicated thread. See Py4J source for more detail
109120
server.start()
110121
server
@@ -162,7 +173,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
162173
Process(
163174
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
164175
None,
165-
"PYTHONPATH" -> pythonPath)
176+
env: _*)
166177
val pythonExitCode = process.!
167178
pythonExitCode should be (0)
168179
endpoint.result should matchPattern {
@@ -181,7 +192,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
181192
Process(
182193
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
183194
None,
184-
"PYTHONPATH" -> pythonPath)
195+
env: _*)
185196
val pythonExitCode = process.!
186197
pythonExitCode should be (1)
187198
endpoint.validationProblems should be (Some(Seq("config input.strings not found")))
@@ -205,7 +216,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
205216
Process(
206217
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
207218
None,
208-
"PYTHONPATH" -> pythonPath)
219+
env: _*)
209220
val pythonExitCode = process.!
210221
pythonExitCode should be (0)
211222
endpoint.result should matchPattern {
@@ -235,7 +246,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
235246
Process(
236247
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
237248
None,
238-
"PYTHONPATH" -> pythonPath)
249+
env: _*)
239250
val pythonExitCode = process.!
240251
pythonExitCode should be (0)
241252
endpoint.result should matchPattern {
@@ -268,7 +279,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
268279
Process(
269280
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
270281
None,
271-
"PYTHONPATH" -> pythonPath)
282+
env: _*)
272283
val pythonExitCode = process.!
273284
pythonExitCode should be (0)
274285
endpoint.result should be ("done")
@@ -283,7 +294,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
283294
Process(
284295
Seq("python", "-m", "sparkjobserver.subprocess", gw2.getListeningPort.toString),
285296
None,
286-
"PYTHONPATH" -> pythonPath)
297+
env: _*)
287298
val pythonExitCode2 = process2.!
288299
pythonExitCode2 should be (0)
289300
endpoint2.result should matchPattern {
@@ -308,7 +319,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
308319
Process(
309320
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
310321
None,
311-
"PYTHONPATH" -> pythonPath)
322+
env: _*)
312323
val pythonExitCode = process.!
313324
pythonExitCode should be (2)
314325
stopGateway(gw)
@@ -323,7 +334,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
323334
Process(
324335
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
325336
None,
326-
"PYTHONPATH" -> pythonPath)
337+
env: _*)
327338
val pythonExitCode = process.!
328339
pythonExitCode should be (4)
329340
stopGateway(gw)
@@ -338,7 +349,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
338349
Process(
339350
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
340351
None,
341-
"PYTHONPATH" -> pythonPath)
352+
env: _*)
342353
val pythonExitCode = process.!
343354
pythonExitCode should be (3)
344355
stopGateway(gw)
@@ -356,7 +367,7 @@ class SubprocessSpec extends FunSpec with Matchers with BeforeAndAfterAll {
356367
Process(
357368
Seq("python", "-m", "sparkjobserver.subprocess", gw.getListeningPort.toString),
358369
None,
359-
"PYTHONPATH" -> pythonPath)
370+
env: _*)
360371
val pythonExitCode = process.!
361372
pythonExitCode should be (0)
362373
endpoint.result should be ("Hello World 3")

project/Assembly.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ object Assembly {
1010
assemblyExcludedJars in assembly <<= (fullClasspath in assembly) map { _ filter { cp =>
1111
List("servlet-api", "guice-all", "junit", "uuid",
1212
"jetty", "jsp-api-2.0", "antlr", "avro", "slf4j-log4j", "log4j-1.2",
13-
"scala-actors", "commons-cli", "stax-api", "mockito",
13+
"scala-actors", "commons-cli", "stax-api", "mockito", "lz4",
1414
// we rely on whatever version DSE has:
1515
"spark", "netty", "dse-java-driver").exists(cp.data.getName.startsWith(_))
1616
} },

0 commit comments

Comments
 (0)