Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,16 @@ public SparkAppResourceSpec getResourceSpec(
protected SparkAppDriverConf buildDriverConf(
SparkApplication app, Map<String, String> confOverrides) {
ApplicationSpec applicationSpec = app.getSpec();
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
SparkConf effectiveSparkConf = new SparkConf();
if (!applicationSpec.getSparkConf().isEmpty()) {
for (String confKey : applicationSpec.getSparkConf().keySet()) {
effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey));
String value = applicationSpec.getSparkConf().get(confKey);
if (confKey.startsWith("spark.kubernetes.") && confKey.endsWith("container.image")) {
value = value.replace("{{SPARK_VERSION}}", sparkVersion);
}
effectiveSparkConf.set(confKey, value);
}
}
if (!confOverrides.isEmpty()) {
Expand Down Expand Up @@ -159,8 +165,6 @@ protected SparkAppDriverConf buildDriverConf(
sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
String appId = generateSparkAppId(app);
effectiveSparkConf.setIfMissing("spark.app.id", appId);
RuntimeVersions versions = applicationSpec.getRuntimeVersions();
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
return SparkAppDriverConf.create(
effectiveSparkConf,
sparkVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;

/** Worker for submitting Spark clusters. */
public class SparkClusterSubmissionWorker {
Expand All @@ -34,12 +35,19 @@ public class SparkClusterSubmissionWorker {
*/
public SparkClusterResourceSpec getResourceSpec(
SparkCluster cluster, Map<String, String> confOverrides) {
RuntimeVersions versions = cluster.getSpec().getRuntimeVersions();
String sparkVersion = (versions != null) ? versions.getSparkVersion() : "UNKNOWN";
SparkConf effectiveSparkConf = new SparkConf();

Map<String, String> confFromSpec = cluster.getSpec().getSparkConf();
if (!confFromSpec.isEmpty()) {
for (Map.Entry<String, String> entry : confFromSpec.entrySet()) {
effectiveSparkConf.set(entry.getKey(), entry.getValue());
String value = entry.getValue();
if ("spark.kubernetes.container.image".equals(entry.getKey())) {
value = value.replace("{{SPARK_VERSION}}", sparkVersion);
}
effectiveSparkConf.set(entry.getKey(), value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
import org.apache.spark.deploy.k8s.submit.RMainAppResource;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.status.AttemptInfo;
Expand Down Expand Up @@ -260,6 +261,31 @@ void checkAppIdWhenUserSpecifiedInSparkConf() {

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals(conf.appId(), "foo");
assertEquals("foo", conf.appId());
}

@Test
void supportSparkVersionPlaceHolder() {
SparkApplication mockApp = mock(SparkApplication.class);
ApplicationSpec mockSpec = mock(ApplicationSpec.class);
RuntimeVersions mockRuntimeVersions = mock(RuntimeVersions.class);
Map<String, String> appProps = new HashMap<>();
appProps.put("spark.kubernetes.container.image", "apache/spark:{{SPARK_VERSION}}");
appProps.put("spark.kubernetes.driver.container.image", "apache/spark:{{SPARK_VERSION}}");
appProps.put("spark.kubernetes.executor.container.image", "apache/spark:{{SPARK_VERSION}}");
appProps.put("spark.kubernetes.key", "apache/spark:{{SPARK_VERSION}}");
ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
when(mockSpec.getSparkConf()).thenReturn(appProps);
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
when(mockSpec.getRuntimeVersions()).thenReturn(mockRuntimeVersions);
when(mockRuntimeVersions.getSparkVersion()).thenReturn("dev");

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.container.image"));
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.driver.container.image"));
assertEquals("apache/spark:dev", conf.get("spark.kubernetes.executor.container.image"));
assertEquals("apache/spark:{{SPARK_VERSION}}", conf.get("spark.kubernetes.key"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.mockito.Mockito.*;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -41,7 +43,7 @@ class SparkClusterSubmissionWorkerTest {
ClusterTolerations clusterTolerations = new ClusterTolerations();
MasterSpec masterSpec;
WorkerSpec workerSpec;
RuntimeVersions runtimeVersions = new RuntimeVersions();
RuntimeVersions runtimeVersions;

@BeforeEach
void setUp() {
Expand All @@ -50,6 +52,7 @@ void setUp() {
clusterSpec = mock(ClusterSpec.class);
masterSpec = mock(MasterSpec.class);
workerSpec = mock(WorkerSpec.class);
runtimeVersions = mock(RuntimeVersions.class);
when(cluster.getMetadata()).thenReturn(objectMeta);
when(cluster.getSpec()).thenReturn(clusterSpec);
when(objectMeta.getNamespace()).thenReturn("my-namespace");
Expand All @@ -58,6 +61,10 @@ void setUp() {
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
when(runtimeVersions.getSparkVersion()).thenReturn("dev");
Map<String, String> sparkConf = new HashMap<>();
sparkConf.put("spark.kubernetes.container.image", "apache/spark:{{SPARK_VERSION}}");
when(clusterSpec.getSparkConf()).thenReturn(sparkConf);
}

@Test
Expand All @@ -70,4 +77,19 @@ void testGetResourceSpec() {
assertNotNull(spec.getWorkerStatefulSet());
assertNotNull(spec.getHorizontalPodAutoscaler());
}

@Test
void supportSparkVersionPlaceHolder() {
SparkClusterSubmissionWorker worker = new SparkClusterSubmissionWorker();
SparkClusterResourceSpec spec = worker.getResourceSpec(cluster, Collections.emptyMap());
assertEquals(
"apache/spark:dev",
spec.getMasterStatefulSet()
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getImage());
}
}
Loading