Skip to content

Commit faa1aaa

Browse files
anishm-dbdongjoon-hyun
authored andcommitted
[SPARK-53498][SDP] Correctly Reference pyspark/pipelines/cli.py from spark-pipelines Binary
### What changes were proposed in this pull request? When the `spark-pipelines` binary is run, SDP's `cli.py` is submitted to the configured spark cluster via a `spark-submit` request. Previously, the file path to retrieve `cli.py` was hard-coded with respect to the resolved `SPARK_HOME`. This is incorrect as the package layout is different depending on how spark was installed (ex. Spark tarball installation vs PyPi pyspark installation). The proposed change is to resolve the file path as a hard coded string with respect to where the pyspark module source code is, rather than `SPARK_HOME`. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #52254 from AnishMahto/fix-sdp-cli-pyspark. Authored-by: anishm-db <anish.mahto@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 2962650 commit faa1aaa

File tree

4 files changed

+25
-11
lines changed

4 files changed

+25
-11
lines changed

bin/spark-pipelines

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,11 @@ fi
3030
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
3131
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
3232

33-
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$@"
33+
SDP_CLI_PY_FILE_PATH=$("${PYSPARK_PYTHON}" - <<'EOF'
34+
import pyspark, os
35+
from pathlib import Path
36+
print(Path(os.path.dirname(pyspark.__file__)) / "pipelines" / "cli.py")
37+
EOF
38+
)
39+
40+
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$SDP_CLI_PY_FILE_PATH" "$@"

core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,16 @@ import org.apache.spark.util.SparkExitCode
3434
*/
3535
object SparkPipelines extends Logging {
3636
def main(args: Array[String]): Unit = {
37-
val sparkHome = sys.env("SPARK_HOME")
38-
SparkSubmit.main(constructSparkSubmitArgs(args, sparkHome).toArray)
37+
val pipelinesCliFile = args(0)
38+
val sparkSubmitAndPipelinesArgs = args.slice(1, args.length)
39+
SparkSubmit.main(
40+
constructSparkSubmitArgs(pipelinesCliFile, sparkSubmitAndPipelinesArgs).toArray)
3941
}
4042

4143
protected[deploy] def constructSparkSubmitArgs(
42-
args: Array[String],
43-
sparkHome: String): Seq[String] = {
44+
pipelinesCliFile: String,
45+
args: Array[String]): Seq[String] = {
4446
val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args)
45-
val pipelinesCliFile = s"$sparkHome/python/pyspark/pipelines/cli.py"
4647
(sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs)
4748
}
4849

core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
3535
"spark.conf2=3"
3636
)
3737
assert(
38-
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
38+
SparkPipelines.constructSparkSubmitArgs(
39+
pipelinesCliFile = "abc/python/pyspark/pipelines/cli.py", args = args) ==
3940
Seq(
4041
"--deploy-mode",
4142
"client",
@@ -60,7 +61,8 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
6061
"pipeline.yml"
6162
)
6263
assert(
63-
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
64+
SparkPipelines.constructSparkSubmitArgs(
65+
pipelinesCliFile = "abc/python/pyspark/pipelines/cli.py", args = args) ==
6466
Seq(
6567
"--conf",
6668
"spark.api.mode=connect",
@@ -86,7 +88,8 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
8688
"spark.conf2=3"
8789
)
8890
assert(
89-
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
91+
SparkPipelines.constructSparkSubmitArgs(
92+
pipelinesCliFile = "abc/python/pyspark/pipelines/cli.py", args = args) ==
9093
Seq(
9194
"--supervise",
9295
"--conf",
@@ -109,7 +112,8 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
109112
"org.apache.spark.deploy.SparkPipelines"
110113
)
111114
intercept[SparkUserAppException] {
112-
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
115+
SparkPipelines.constructSparkSubmitArgs(
116+
pipelinesCliFile = "abc/python/pyspark/pipelines/cli.py", args = args)
113117
}
114118
}
115119

@@ -120,7 +124,8 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
120124
"myproject"
121125
)
122126
assert(
123-
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
127+
SparkPipelines.constructSparkSubmitArgs(
128+
pipelinesCliFile = "abc/python/pyspark/pipelines/cli.py", args = args) ==
124129
Seq(
125130
"--conf",
126131
"spark.api.mode=connect",

python/packaging/classic/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ def run(self):
306306
"pyspark.pandas.spark",
307307
"pyspark.pandas.typedef",
308308
"pyspark.pandas.usage_logging",
309+
"pyspark.pipelines",
309310
"pyspark.python.pyspark",
310311
"pyspark.python.lib",
311312
"pyspark.testing",

0 commit comments

Comments
 (0)