Skip to content

Commit ccb6beb

Browse files
committed
[SPARK-53869] Support multiple files in pyFiles field
### What changes were proposed in this pull request? This PR aims to support multiple files in `SparkApplication`'s `pyFiles` field. ### Why are the changes needed? Currently, `pyFiles` is mapped to the main resource directly because it assumes a single Python file. https://github.com/apache/spark-kubernetes-operator/blob/75515c752086853b1676cff39c7fc84b99163dc0/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java#L143 However, it's supposed to be a comma-separated string. If users provide multiple files, t causes a failure like the following. ``` python3: can't open file '/opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/sort.py': [Errno 2] │ ``` This PR proposes a mitigation to handle the first file of `pyFiles` as the primary resource and the rest of files as the real `pyFiles`. Note that the previous logic works without any change and new logic is going to be applied only when `mainClass` is `org.apache.spark.deploy.PythonRunner` specified additionally. **BEFORE** ```yaml spec: pyFiles: "local:///opt/spark/examples/src/main/python/pi.py" ``` **AFTER** ```yaml spec: mainClass: "org.apache.spark.deploy.PythonRunner" pyFiles: "local:///opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/lib.py" ``` ### Does this PR introduce _any_ user-facing change? No behavior change because new logic works only when `mainClass` is `org.apache.spark.deploy.PythonRunner`. ### How was this patch tested? Pass the CIs with newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#382 from dongjoon-hyun/SPARK-53869. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 75515c7 commit ccb6beb

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ protected SparkAppDriverConf buildDriverConf(
139139
if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
140140
primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars()));
141141
effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
142+
} else if ("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass())) {
143+
String[] files = applicationSpec.getPyFiles().split(",", 2);
144+
primaryResource = new PythonMainAppResource(files[0]);
145+
if (files.length > 1 && !files[1].isBlank()) {
146+
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]);
147+
}
142148
} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
143149
primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles());
144150
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles());

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,34 @@ void buildDriverConfForPythonApp() {
129129
}
130130
}
131131

132+
@Test
133+
void handlePyFiles() {
134+
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
135+
try (MockedConstruction<SparkAppDriverConf> mocked =
136+
mockConstruction(
137+
SparkAppDriverConf.class,
138+
(mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) {
139+
SparkApplication mockApp = mock(SparkApplication.class);
140+
ApplicationSpec mockSpec = mock(ApplicationSpec.class);
141+
ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
142+
when(mockApp.getSpec()).thenReturn(mockSpec);
143+
when(mockApp.getMetadata()).thenReturn(appMeta);
144+
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
145+
when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
146+
147+
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
148+
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
149+
assertEquals(6, constructorArgs.get(conf).size());
150+
assertEquals(
151+
"lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
152+
153+
// validate main resources
154+
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
155+
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
156+
assertEquals("main.py", mainResource.primaryResource());
157+
}
158+
}
159+
132160
@Test
133161
void buildDriverConfForRApp() {
134162
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();

0 commit comments

Comments
 (0)