Skip to content

Commit 5e4ae6f

Browse files
committed
[SPARK-53869] Support multiple files in pyFiles
1 parent 75515c7 commit 5e4ae6f

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ protected SparkAppDriverConf buildDriverConf(
139139
if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
140140
primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars()));
141141
effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
142+
} else if ("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass())) {
143+
String[] files = applicationSpec.getPyFiles().split(",", 2);
144+
primaryResource = new PythonMainAppResource(files[0]);
145+
if (files.length > 1 && !files[1].isBlank()) {
146+
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]);
147+
}
142148
} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
143149
primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles());
144150
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles());

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,34 @@ void buildDriverConfForPythonApp() {
129129
}
130130
}
131131

132+
@Test
133+
void handlePyFiles() {
134+
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
135+
try (MockedConstruction<SparkAppDriverConf> mocked =
136+
mockConstruction(
137+
SparkAppDriverConf.class,
138+
(mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) {
139+
SparkApplication mockApp = mock(SparkApplication.class);
140+
ApplicationSpec mockSpec = mock(ApplicationSpec.class);
141+
ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
142+
when(mockApp.getSpec()).thenReturn(mockSpec);
143+
when(mockApp.getMetadata()).thenReturn(appMeta);
144+
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
145+
when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
146+
147+
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
148+
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
149+
assertEquals(6, constructorArgs.get(conf).size());
150+
assertEquals(
151+
"lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
152+
153+
// validate main resources
154+
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
155+
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
156+
assertEquals("main.py", mainResource.primaryResource());
157+
}
158+
}
159+
132160
@Test
133161
void buildDriverConfForRApp() {
134162
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();

0 commit comments

Comments
 (0)