Skip to content

Commit 6b871e8

Browse files
rfrohocknoorul
authored andcommitted
Support Adding Application Egg to PySpark (spark-jobserver#941)
* add submitted egg to spark context on subprocess start * slight revision * test for empty and nonexistent strings
1 parent 9f7ae32 commit 6b871e8

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

job-server-extras/src/main/scala/spark/jobserver/python/PythonJob.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
6464
Process(
6565
Seq(sc.pythonExecutable, "-m", "sparkjobserver.subprocess", server.getListeningPort.toString),
6666
None,
67+
"EGGPATH" -> eggPath,
6768
"PYTHONPATH" -> pythonPath,
6869
"PYSPARK_PYTHON" -> sc.pythonExecutable)
6970
val err = new StringBuffer

job-server-python/src/python/sparkjobserver/subprocess.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import print_function
1818
import sys
19+
import os
1920
from importlib import import_module
2021
from py4j.java_gateway import JavaGateway, java_import, GatewayClient
2122
from pyhocon import ConfigFactory
@@ -71,6 +72,7 @@ def import_class(cls):
7172
spark_conf = SparkConf(_jconf=jspark_conf)
7273
context_class = jcontext.contextType()
7374
context = None
75+
sc = None
7476
if context_class == 'org.apache.spark.api.java.JavaSparkContext':
7577
context = SparkContext(
7678
gateway=gateway, jsc=jcontext, conf=spark_conf)
@@ -98,6 +100,15 @@ def import_class(cls):
98100
exit_with_failure(
99101
"Expected JavaSparkContext, SQLContext "
100102
"or HiveContext but received %s" % repr(context_class), 2)
103+
104+
egg_path = os.environ.get("EGGPATH", None)
105+
if egg_path and sc:
106+
try:
107+
sc.addPyFile(egg_path)
108+
except Exception as error:
109+
exit_with_failure(
110+
"Error while adding Python Egg to Spark Context: %s\n%s" %
111+
(repr(error), traceback.format_exc()), 5)
101112
try:
102113
job_data = job.validate(context, None, job_config)
103114
except Exception as error:

0 commit comments

Comments
 (0)