Skip to content

Commit 85705f5

Browse files
committed
[SPARK-53911] Support SPARK_VERSION placeholder in container image names
1 parent 563f4a7 commit 85705f5

File tree

5 files changed

+66
-6
lines changed

5 files changed

+66
-6
lines changed

examples/pi.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ spec:
2424
spark.dynamicAllocation.shuffleTracking.enabled: "true"
2525
spark.dynamicAllocation.maxExecutors: "3"
2626
spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
27-
spark.kubernetes.container.image: "apache/spark:4.0.1"
27+
spark.kubernetes.container.image: "apache/spark:{{SPARK_VERSION}}"
2828
spark.kubernetes.driver.pod.excludedFeatureSteps: "org.apache.spark.deploy.k8s.features.KerberosConfDriverFeatureStep"
2929
applicationTolerations:
3030
resourceRetainPolicy: OnFailure

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,16 @@ public SparkAppResourceSpec getResourceSpec(
124124
protected SparkAppDriverConf buildDriverConf(
125125
SparkApplication app, Map<String, String> confOverrides) {
126126
ApplicationSpec applicationSpec = app.getSpec();
127+
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
128+
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
127129
SparkConf effectiveSparkConf = new SparkConf();
128130
if (!applicationSpec.getSparkConf().isEmpty()) {
129131
for (String confKey : applicationSpec.getSparkConf().keySet()) {
130-
effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey));
132+
String value = applicationSpec.getSparkConf().get(confKey);
133+
if (confKey.startsWith("spark.kubernetes.") && confKey.endsWith("container.image")) {
134+
value = value.replace("{{SPARK_VERSION}}", sparkVersion);
135+
}
136+
effectiveSparkConf.set(confKey, value);
131137
}
132138
}
133139
if (!confOverrides.isEmpty()) {
@@ -159,8 +165,6 @@ protected SparkAppDriverConf buildDriverConf(
159165
sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
160166
String appId = generateSparkAppId(app);
161167
effectiveSparkConf.setIfMissing("spark.app.id", appId);
162-
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
163-
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
164168
return SparkAppDriverConf.create(
165169
effectiveSparkConf,
166170
sparkVersion,

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorker.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323

2424
import org.apache.spark.SparkConf;
25+
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
2526

2627
/** Worker for submitting Spark clusters. */
2728
public class SparkClusterSubmissionWorker {
@@ -34,12 +35,19 @@ public class SparkClusterSubmissionWorker {
3435
*/
3536
public SparkClusterResourceSpec getResourceSpec(
3637
SparkCluster cluster, Map<String, String> confOverrides) {
38+
RuntimeVersions versions = cluster.getSpec().getRuntimeVersions();
39+
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
3740
SparkConf effectiveSparkConf = new SparkConf();
3841

3942
Map<String, String> confFromSpec = cluster.getSpec().getSparkConf();
4043
if (!confFromSpec.isEmpty()) {
4144
for (Map.Entry<String, String> entry : confFromSpec.entrySet()) {
4245
effectiveSparkConf.set(entry.getKey(), entry.getValue());
46+
String value = entry.getValue();
47+
if ("spark.kubernetes.container.image".equals(entry.getKey())) {
48+
value = value.replace("{{SPARK_VERSION}}", sparkVersion);
49+
}
50+
effectiveSparkConf.set(entry.getKey(), value);
4351
}
4452
}
4553

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
4545
import org.apache.spark.deploy.k8s.submit.RMainAppResource;
4646
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
47+
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
4748
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
4849
import org.apache.spark.k8s.operator.status.ApplicationStatus;
4950
import org.apache.spark.k8s.operator.status.AttemptInfo;
@@ -260,6 +261,31 @@ void checkAppIdWhenUserSpecifiedInSparkConf() {
260261

261262
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
262263
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
263-
assertEquals(conf.appId(), "foo");
264+
assertEquals("foo", conf.appId());
265+
}
266+
267+
@Test
268+
void supportSparkVersionPlaceHolder() {
269+
SparkApplication mockApp = mock(SparkApplication.class);
270+
ApplicationSpec mockSpec = mock(ApplicationSpec.class);
271+
RuntimeVersions mockRuntimeVersions = mock(RuntimeVersions.class);
272+
Map<String, String> appProps = new HashMap<>();
273+
appProps.put("spark.kubernetes.container.image", "apache/spark:{{SPARK_VERSION}}");
274+
appProps.put("spark.kubernetes.driver.container.image", "apache/spark:{{SPARK_VERSION}}");
275+
appProps.put("spark.kubernetes.executor.container.image", "apache/spark:{{SPARK_VERSION}}");
276+
appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
277+
ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
278+
when(mockSpec.getSparkConf()).thenReturn(appProps);
279+
when(mockApp.getSpec()).thenReturn(mockSpec);
280+
when(mockApp.getMetadata()).thenReturn(appMeta);
281+
when(mockSpec.getRuntimeVersions()).thenReturn(mockRuntimeVersions);
282+
when(mockRuntimeVersions.getSparkVersion()).thenReturn("dev");
283+
284+
SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
285+
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
286+
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.container.image"));
287+
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.driver.container.image"));
288+
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.executor.container.image"));
289+
assertEquals("apache/spark:{{SPARK_VERSION}}", conf.get("spark.kubernetes.key"));
264290
}
265291
}

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import static org.mockito.Mockito.*;
2424

2525
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.Map;
2628

2729
import io.fabric8.kubernetes.api.model.ObjectMeta;
2830
import org.junit.jupiter.api.BeforeEach;
@@ -41,7 +43,7 @@ class SparkClusterSubmissionWorkerTest {
4143
ClusterTolerations clusterTolerations = new ClusterTolerations();
4244
MasterSpec masterSpec;
4345
WorkerSpec workerSpec;
44-
RuntimeVersions runtimeVersions = new RuntimeVersions();
46+
RuntimeVersions runtimeVersions;
4547

4648
@BeforeEach
4749
void setUp() {
@@ -50,6 +52,7 @@ void setUp() {
5052
clusterSpec = mock(ClusterSpec.class);
5153
masterSpec = mock(MasterSpec.class);
5254
workerSpec = mock(WorkerSpec.class);
55+
runtimeVersions = mock(RuntimeVersions.class);
5356
when(cluster.getMetadata()).thenReturn(objectMeta);
5457
when(cluster.getSpec()).thenReturn(clusterSpec);
5558
when(objectMeta.getNamespace()).thenReturn("my-namespace");
@@ -58,6 +61,10 @@ void setUp() {
5861
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
5962
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
6063
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
64+
when(runtimeVersions.getSparkVersion()).thenReturn("dev");
65+
Map<String, String> sparkConf = new HashMap<>();
66+
sparkConf.put("spark.kubernetes.container.image", "apache/spark:{{SPARK_VERSION}}");
67+
when(clusterSpec.getSparkConf()).thenReturn(sparkConf);
6168
}
6269

6370
@Test
@@ -70,4 +77,19 @@ void testGetResourceSpec() {
7077
assertNotNull(spec.getWorkerStatefulSet());
7178
assertNotNull(spec.getHorizontalPodAutoscaler());
7279
}
80+
81+
@Test
82+
void supportSparkVersionPlaceHolder() {
83+
SparkClusterSubmissionWorker worker = new SparkClusterSubmissionWorker();
84+
SparkClusterResourceSpec spec = worker.getResourceSpec(cluster, Collections.emptyMap());
85+
assertEquals(
86+
"apache/spark:dev",
87+
spec.getMasterStatefulSet()
88+
.getSpec()
89+
.getTemplate()
90+
.getSpec()
91+
.getContainers()
92+
.get(0)
93+
.getImage());
94+
}
7395
}

0 commit comments

Comments
 (0)