Skip to content

Commit b6993cb

Browse files
pan3793sryza
authored andcommitted
[SPARK-53516][SDP] Fix spark.api.mode arg process in SparkPipelines
### What changes were proposed in this pull request? This PR fixes two issues: - Trim the value of `spark.api.mode` before evaluation - The value of `spark.api.mode` should be case insensitive - Support both `-c spark.api.mode=xxx` and `--conf spark.api.mode=xxx` - Avoid duplicated `--conf spark.api.mode=connect` args in the final generated commands ### Why are the changes needed? Bug fix. ### Does this PR introduce _any_ user-facing change? No, SDP is an unreleased feature. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52261 from pan3793/SPARK-53516. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 0e42b95 commit b6993cb

File tree

2 files changed

+79
-10
lines changed

2 files changed

+79
-10
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.deploy
1919

2020
import java.util
21+
import java.util.Locale
2122

2223
import scala.collection.mutable.ArrayBuffer
2324
import scala.jdk.CollectionConverters._
2425

2526
import org.apache.spark.SparkUserAppException
2627
import org.apache.spark.internal.Logging
28+
import org.apache.spark.launcher.SparkLauncher.SPARK_API_MODE
2729
import org.apache.spark.launcher.SparkSubmitArgumentsParser
2830
import org.apache.spark.util.SparkExitCode
2931

@@ -63,16 +65,17 @@ object SparkPipelines extends Logging {
6365
if (opt == "--remote") {
6466
remote = value
6567
} else if (opt == "--class") {
66-
logInfo("--class argument not supported.")
67-
throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
68-
} else if (opt == "--conf" &&
69-
value.startsWith("spark.api.mode=") &&
70-
value != "spark.api.mode=connect") {
71-
logInfo(
72-
"--spark.api.mode must be 'connect'. " +
73-
"Declarative Pipelines currently only supports Spark Connect."
74-
)
68+
logError("--class argument not supported.")
7569
throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
70+
} else if ((opt == "--conf" || opt == "-c") && value.startsWith(s"$SPARK_API_MODE=")) {
71+
val apiMode = value.stripPrefix(s"$SPARK_API_MODE=").trim
72+
if (apiMode.toLowerCase(Locale.ROOT) != "connect") {
73+
logError(
74+
s"$SPARK_API_MODE must be 'connect' (was '$apiMode'). " +
75+
"Declarative Pipelines currently only supports Spark Connect."
76+
)
77+
throw SparkUserAppException(SparkExitCode.EXIT_FAILURE)
78+
}
7679
} else if (Seq("--name", "-h", "--help").contains(opt)) {
7780
pipelinesArgs += opt
7881
if (value != null && value.nonEmpty) {
@@ -99,7 +102,7 @@ object SparkPipelines extends Logging {
99102
}
100103

101104
sparkSubmitArgs += "--conf"
102-
sparkSubmitArgs += "spark.api.mode=connect"
105+
sparkSubmitArgs += s"$SPARK_API_MODE=connect"
103106
sparkSubmitArgs += "--remote"
104107
sparkSubmitArgs += remote
105108
(sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)

core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,72 @@ class SparkPipelinesSuite extends SparkSubmitTestUtils with BeforeAndAfterEach {
117117
}
118118
}
119119

120+
test("spark.api.mode arg") {
121+
var args = Array("--conf", "spark.api.mode=classic")
122+
intercept[SparkUserAppException] {
123+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
124+
}
125+
args = Array("-c", "spark.api.mode=classic")
126+
intercept[SparkUserAppException] {
127+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
128+
}
129+
args = Array("--conf", "spark.api.mode=CONNECT")
130+
assert(
131+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
132+
Seq(
133+
"--conf",
134+
"spark.api.mode=connect",
135+
"--remote",
136+
"local",
137+
"abc/python/pyspark/pipelines/cli.py"
138+
)
139+
)
140+
args = Array("--conf", "spark.api.mode=CoNNect")
141+
assert(
142+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
143+
Seq(
144+
"--conf",
145+
"spark.api.mode=connect",
146+
"--remote",
147+
"local",
148+
"abc/python/pyspark/pipelines/cli.py"
149+
)
150+
)
151+
args = Array("--conf", "spark.api.mode=connect")
152+
assert(
153+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
154+
Seq(
155+
"--conf",
156+
"spark.api.mode=connect",
157+
"--remote",
158+
"local",
159+
"abc/python/pyspark/pipelines/cli.py"
160+
)
161+
)
162+
args = Array("--conf", "spark.api.mode= connect")
163+
assert(
164+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
165+
Seq(
166+
"--conf",
167+
"spark.api.mode=connect",
168+
"--remote",
169+
"local",
170+
"abc/python/pyspark/pipelines/cli.py"
171+
)
172+
)
173+
args = Array("-c", "spark.api.mode=connect")
174+
assert(
175+
SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
176+
Seq(
177+
"--conf",
178+
"spark.api.mode=connect",
179+
"--remote",
180+
"local",
181+
"abc/python/pyspark/pipelines/cli.py"
182+
)
183+
)
184+
}
185+
120186
test("name arg") {
121187
val args = Array(
122188
"init",

0 commit comments

Comments
 (0)