Skip to content

Commit 48ad3c6

Browse files
committed
[SPARK-53874] SparkAppDriverConf should respect sparkVersion of SparkApplication CRD
### What changes were proposed in this pull request? This PR aims to fix `SparkAppDriverConf` to respect `sparkVersion` of `SparkApplication` CRD. ### Why are the changes needed? This is a long standing bug from the initial implementation. - apache#10 Since Apache Spark K8s Operator can launch various Spark versions, `spark-version` label should come from `SparkApplication` CRD's `sparkVersion` field. However, currently, the Spark version of compile dependency is used for `Driver` resources (like `Driver Pod` and `Driver Service`. We should override this. ### Does this PR introduce _any_ user-facing change? Yes, this is a bug fix to use a correct version information. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#385 from dongjoon-hyun/SPARK-53874. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7989344 commit 48ad3c6

File tree

4 files changed

+72
-18
lines changed

4 files changed

+72
-18
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22+
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
23+
2224
import scala.Option;
2325

2426
import org.apache.spark.SparkConf;
@@ -30,14 +32,18 @@
3032

3133
/** Spark application driver configuration. */
3234
public final class SparkAppDriverConf extends KubernetesDriverConf {
35+
private final String sparkVersion;
36+
3337
private SparkAppDriverConf(
3438
SparkConf sparkConf,
39+
String sparkVersion,
3540
String appId,
3641
MainAppResource mainAppResource,
3742
String mainClass,
3843
String[] appArgs,
3944
Option<String> proxyUser) {
4045
super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null);
46+
this.sparkVersion = sparkVersion;
4147
}
4248

4349
/**
@@ -53,6 +59,7 @@ private SparkAppDriverConf(
5359
*/
5460
public static SparkAppDriverConf create(
5561
SparkConf sparkConf,
62+
String sparkVersion,
5663
String appId,
5764
MainAppResource mainAppResource,
5865
String mainClass,
@@ -61,7 +68,8 @@ public static SparkAppDriverConf create(
6168
// pre-create check only
6269
KubernetesVolumeUtils.parseVolumesWithPrefix(
6370
sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
64-
return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser);
71+
return new SparkAppDriverConf(
72+
sparkConf, sparkVersion, appId, mainAppResource, mainClass, appArgs, proxyUser);
6573
}
6674

6775
/**
@@ -74,6 +82,16 @@ public String resourceNamePrefix() {
7482
return appId();
7583
}
7684

85+
/**
86+
* Returns the driver label key and value map.
87+
*
88+
* @return The label key-value pair map.
89+
*/
90+
@Override
91+
public scala.collection.immutable.Map<String, String> labels() {
92+
return super.labels().updated(LABEL_SPARK_VERSION_NAME, sparkVersion);
93+
}
94+
7795
/**
7896
* Creates the name to be used by the driver config map. The name consists of `resourceNamePrefix`
7997
* and Spark instance type (driver). Operator proposes `resourceNamePrefix` with leaves naming

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
4242
import org.apache.spark.k8s.operator.spec.ConfigMapSpec;
4343
import org.apache.spark.k8s.operator.spec.DriverServiceIngressSpec;
44+
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
4445
import org.apache.spark.k8s.operator.utils.ModelUtils;
4546
import org.apache.spark.k8s.operator.utils.StringUtils;
4647

@@ -158,8 +159,11 @@ protected SparkAppDriverConf buildDriverConf(
158159
sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
159160
String appId = generateSparkAppId(app);
160161
effectiveSparkConf.setIfMissing("spark.app.id", appId);
162+
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
163+
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
161164
return SparkAppDriverConf.create(
162165
effectiveSparkConf,
166+
sparkVersion,
163167
effectiveSparkConf.getAppId(),
164168
primaryResource,
165169
applicationSpec.getMainClass(),

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
3434

3535
class SparkAppDriverConfTest {
36+
static final String VERSION = "dev";
37+
3638
@Test
3739
void testResourceNamePrefix() {
3840
// Resource prefix shall be deterministic per SparkApp per attempt
@@ -42,7 +44,13 @@ void testResourceNamePrefix() {
4244
String appId = UUID.randomUUID().toString();
4345
SparkAppDriverConf sparkAppDriverConf =
4446
SparkAppDriverConf.create(
45-
sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty());
47+
sparkConf,
48+
VERSION,
49+
appId,
50+
mock(JavaMainAppResource.class),
51+
"foo",
52+
null,
53+
Option.empty());
4654
String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
4755
assertEquals(
4856
resourcePrefix,
@@ -65,10 +73,34 @@ void testConfigMapNameDriver() {
6573
String appId = "a".repeat(1000);
6674
SparkAppDriverConf sparkAppDriverConf =
6775
SparkAppDriverConf.create(
68-
sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty());
76+
sparkConf,
77+
VERSION,
78+
appId,
79+
mock(JavaMainAppResource.class),
80+
"foo",
81+
null,
82+
Option.empty());
6983
String configMapNameDriver = sparkAppDriverConf.configMapNameDriver();
7084
assertTrue(
7185
configMapNameDriver.length() <= 253,
7286
"config map name length should always comply k8s DNS subdomain length");
7387
}
88+
89+
@Test
90+
void testLabels() {
91+
SparkConf sparkConf = new SparkConf();
92+
sparkConf.set("foo", "bar");
93+
sparkConf.set("spark.executor.instances", "1");
94+
String appId = "a".repeat(1000);
95+
SparkAppDriverConf sparkAppDriverConf =
96+
SparkAppDriverConf.create(
97+
sparkConf,
98+
VERSION,
99+
appId,
100+
mock(JavaMainAppResource.class),
101+
"foo",
102+
null,
103+
Option.empty());
104+
assertEquals(VERSION, sparkAppDriverConf.labels().get("spark-version").get());
105+
}
74106
}

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() {
7676

7777
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
7878
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, overrides);
79-
assertEquals(6, constructorArgs.get(conf).size());
79+
assertEquals(7, constructorArgs.get(conf).size());
8080

8181
// validate SparkConf with override
8282
assertInstanceOf(SparkConf.class, constructorArgs.get(conf).get(0));
@@ -90,14 +90,14 @@ void buildDriverConfShouldApplySpecAndPropertiesOverride() {
9090
"namespace from CR takes highest precedence");
9191

9292
// validate main resources
93-
assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(2));
94-
JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(2);
93+
assertInstanceOf(JavaMainAppResource.class, constructorArgs.get(conf).get(3));
94+
JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(3);
9595
assertTrue(mainResource.primaryResource().isEmpty());
9696

97-
assertEquals("foo-class", constructorArgs.get(conf).get(3));
97+
assertEquals("foo-class", constructorArgs.get(conf).get(4));
9898

99-
assertInstanceOf(String[].class, constructorArgs.get(conf).get(4));
100-
String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4);
99+
assertInstanceOf(String[].class, constructorArgs.get(conf).get(5));
100+
String[] capturedArgs = (String[]) constructorArgs.get(conf).get(5);
101101
assertEquals(2, capturedArgs.length);
102102
assertEquals("a", capturedArgs[0]);
103103
assertEquals("b", capturedArgs[1]);
@@ -120,11 +120,11 @@ void buildDriverConfForPythonApp() {
120120

121121
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
122122
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
123-
assertEquals(6, constructorArgs.get(conf).size());
123+
assertEquals(7, constructorArgs.get(conf).size());
124124

125125
// validate main resources
126-
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
127-
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
126+
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3));
127+
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3);
128128
assertEquals("foo", mainResource.primaryResource());
129129
}
130130
}
@@ -146,13 +146,13 @@ void handlePyFiles() {
146146

147147
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
148148
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
149-
assertEquals(6, constructorArgs.get(conf).size());
149+
assertEquals(7, constructorArgs.get(conf).size());
150150
assertEquals(
151151
"lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
152152

153153
// validate main resources
154-
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
155-
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
154+
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(3));
155+
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(3);
156156
assertEquals("main.py", mainResource.primaryResource());
157157
}
158158
}
@@ -173,11 +173,11 @@ void buildDriverConfForRApp() {
173173

174174
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
175175
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
176-
assertEquals(6, constructorArgs.get(conf).size());
176+
assertEquals(7, constructorArgs.get(conf).size());
177177

178178
// validate main resources
179-
assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(2));
180-
RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(2);
179+
assertInstanceOf(RMainAppResource.class, constructorArgs.get(conf).get(3));
180+
RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(3);
181181
assertEquals("foo", mainResource.primaryResource());
182182
}
183183
}

0 commit comments

Comments
 (0)