Skip to content

Commit 0b9d17f

Browse files
committed
[SPARK-49838] Add spark-version label to Spark Cluster resources
### What changes were proposed in this pull request? This PR aims to add `spark-version` label to `Spark Cluster` resources like `Spark Application` ### Why are the changes needed? `spark-version` is an important label to distinguish and search in the production environment. This PR will add `spark-version` to the following resources. - `Master Service` - `Master Statefulset` - `Master Pod` - `Worker Service` - `Worker Statefulset` - `Worker HorizontalPodAutoscaler` - `Worker Pod` We can use `spark-version` as a selector like the following. ``` $ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE cluster-with-hpa-master-svc ClusterIP None <none> 8080/TCP,7077/TCP,6066/TCP 39s cluster-with-hpa-worker-svc ClusterIP None <none> 8081/TCP 39s kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 6d12h $ kubectl get svc -l spark-version=4.0.0-preview2 NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE cluster-with-hpa-master-svc ClusterIP None <none> 8080/TCP,7077/TCP,6066/TCP 89s cluster-with-hpa-worker-svc ClusterIP None <none> 8081/TCP 89s $ kubectl get pod -l spark-version=4.0.0-preview2 NAME READY STATUS RESTARTS AGE cluster-with-hpa-master-0 1/1 Running 0 17m cluster-with-hpa-worker-0 1/1 Running 0 17m ``` ### Does this PR introduce _any_ user-facing change? No, this is not released yet. ### How was this patch tested? Pass the CIs with the revised test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#142 from dongjoon-hyun/SPARK-49838. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6c6a260 commit 0b9d17f

File tree

5 files changed

+77
-4
lines changed

5 files changed

+77
-4
lines changed

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class Constants {
3434
public static final String LABEL_SPARK_ROLE_CLUSTER_VALUE = "cluster";
3535
public static final String LABEL_SPARK_ROLE_MASTER_VALUE = "master";
3636
public static final String LABEL_SPARK_ROLE_WORKER_VALUE = "worker";
37+
public static final String LABEL_SPARK_VERSION_NAME = "spark-version";
3738
public static final String SENTINEL_RESOURCE_DUMMY_FIELD = "sentinel.dummy.number";
3839

3940
public static final String DRIVER_SPARK_CONTAINER_PROP_KEY =

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_CLUSTER_VALUE;
2525
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
2626
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
27+
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
2728
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
2829
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
2930
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
@@ -111,6 +112,7 @@ public static Map<String, String> executorLabels(final SparkApplication app) {
111112
public static Map<String, String> sparkClusterResourceLabels(final SparkCluster cluster) {
112113
Map<String, String> labels = commonManagedResourceLabels();
113114
labels.put(Constants.LABEL_SPARK_CLUSTER_NAME, cluster.getMetadata().getName());
115+
labels.put(LABEL_SPARK_VERSION_NAME, cluster.getSpec().getRuntimeVersions().getSparkVersion());
114116
return labels;
115117
}
116118

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
6161
String namespace = conf.get(Config.KUBERNETES_NAMESPACE().key(), clusterNamespace);
6262
String image = conf.get(Config.CONTAINER_IMAGE().key(), "apache/spark:4.0.0-preview2");
6363
ClusterSpec spec = cluster.getSpec();
64+
String version = spec.getRuntimeVersions().getSparkVersion();
6465
StringBuilder options = new StringBuilder();
6566
for (Tuple2<String, String> t : conf.getAll()) {
6667
options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
@@ -69,15 +70,24 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
6970
WorkerSpec workerSpec = spec.getWorkerSpec();
7071
masterService =
7172
buildMasterService(
72-
clusterName, namespace, masterSpec.getServiceMetadata(), masterSpec.getServiceSpec());
73+
clusterName,
74+
namespace,
75+
version,
76+
masterSpec.getServiceMetadata(),
77+
masterSpec.getServiceSpec());
7378
workerService =
7479
buildWorkerService(
75-
clusterName, namespace, workerSpec.getServiceMetadata(), workerSpec.getServiceSpec());
80+
clusterName,
81+
namespace,
82+
version,
83+
workerSpec.getServiceMetadata(),
84+
workerSpec.getServiceSpec());
7685
masterStatefulSet =
7786
buildMasterStatefulSet(
7887
scheduler,
7988
clusterName,
8089
namespace,
90+
version,
8191
image,
8292
options.toString(),
8393
masterSpec.getStatefulSetMetadata(),
@@ -87,6 +97,7 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
8797
scheduler,
8898
clusterName,
8999
namespace,
100+
version,
90101
image,
91102
spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
92103
options.toString(),
@@ -96,11 +107,12 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
96107
}
97108

98109
private static Service buildMasterService(
99-
String name, String namespace, ObjectMeta metadata, ServiceSpec serviceSpec) {
110+
String name, String namespace, String version, ObjectMeta metadata, ServiceSpec serviceSpec) {
100111
return new ServiceBuilder()
101112
.withNewMetadataLike(metadata)
102113
.withName(name + "-master-svc")
103114
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
115+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
104116
.withNamespace(namespace)
105117
.endMetadata()
106118
.withNewSpecLike(serviceSpec)
@@ -127,11 +139,12 @@ private static Service buildMasterService(
127139
}
128140

129141
private static Service buildWorkerService(
130-
String name, String namespace, ObjectMeta metadata, ServiceSpec serviceSpec) {
142+
String name, String namespace, String version, ObjectMeta metadata, ServiceSpec serviceSpec) {
131143
return new ServiceBuilder()
132144
.withNewMetadataLike(metadata)
133145
.withName(name + "-worker-svc")
134146
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
147+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
135148
.withNamespace(namespace)
136149
.endMetadata()
137150
.withNewSpecLike(serviceSpec)
@@ -151,6 +164,7 @@ private static StatefulSet buildMasterStatefulSet(
151164
String scheduler,
152165
String name,
153166
String namespace,
167+
String version,
154168
String image,
155169
String options,
156170
ObjectMeta objectMeta,
@@ -160,6 +174,7 @@ private static StatefulSet buildMasterStatefulSet(
160174
.withNewMetadataLike(objectMeta)
161175
.withName(name + "-master")
162176
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
177+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
163178
.withNamespace(namespace)
164179
.endMetadata()
165180
.withNewSpecLike(statefulSetSpec)
@@ -171,6 +186,7 @@ private static StatefulSet buildMasterStatefulSet(
171186
.editOrNewTemplate()
172187
.editOrNewMetadata()
173188
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
189+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
174190
.endMetadata()
175191
.editOrNewSpec()
176192
.withSchedulerName(scheduler)
@@ -213,6 +229,7 @@ private static StatefulSet buildWorkerStatefulSet(
213229
String scheduler,
214230
String name,
215231
String namespace,
232+
String version,
216233
String image,
217234
int initWorkers,
218235
String options,
@@ -223,6 +240,7 @@ private static StatefulSet buildWorkerStatefulSet(
223240
.withNewMetadataLike(metadata)
224241
.withName(name + "-worker")
225242
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
243+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
226244
.withNamespace(namespace)
227245
.endMetadata()
228246
.withNewSpecLike(statefulSetSpec)
@@ -235,6 +253,7 @@ private static StatefulSet buildWorkerStatefulSet(
235253
.editOrNewTemplate()
236254
.editOrNewMetadata()
237255
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
256+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
238257
.endMetadata()
239258
.editOrNewSpec()
240259
.withSchedulerName(scheduler)
@@ -320,6 +339,7 @@ private static Optional<HorizontalPodAutoscaler> buildHorizontalPodAutoscaler(
320339
.withNewMetadata()
321340
.withNamespace(namespace)
322341
.withName(clusterName + "-worker-hpa")
342+
.addToLabels(LABEL_SPARK_VERSION_NAME, spec.getRuntimeVersions().getSparkVersion())
323343
.endMetadata()
324344
.withNewSpecLike(horizontalPodAutoscalerSpec)
325345
.withNewScaleTargetRef()

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22+
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
2223
import static org.junit.jupiter.api.Assertions.assertEquals;
2324
import static org.junit.jupiter.api.Assertions.assertTrue;
2425
import static org.mockito.Mockito.mock;
@@ -41,6 +42,7 @@
4142
import org.apache.spark.k8s.operator.spec.ClusterSpec;
4243
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
4344
import org.apache.spark.k8s.operator.spec.MasterSpec;
45+
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
4446
import org.apache.spark.k8s.operator.spec.WorkerInstanceConfig;
4547
import org.apache.spark.k8s.operator.spec.WorkerSpec;
4648

@@ -52,6 +54,7 @@ class SparkClusterResourceSpecTest {
5254
ServiceSpec serviceSpec;
5355
MasterSpec masterSpec;
5456
WorkerSpec workerSpec;
57+
RuntimeVersions runtimeVersions = new RuntimeVersions();
5558
SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace", "other-namespace");
5659
ClusterTolerations clusterTolerations = new ClusterTolerations();
5760

@@ -71,6 +74,8 @@ void setUp() {
7174
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
7275
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
7376
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
77+
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
78+
runtimeVersions.setSparkVersion("4.0.0");
7479
when(masterSpec.getStatefulSetSpec()).thenReturn(statefulSetSpec);
7580
when(masterSpec.getStatefulSetMetadata()).thenReturn(objectMeta);
7681
when(masterSpec.getServiceSpec()).thenReturn(serviceSpec);
@@ -86,16 +91,19 @@ void testMasterService() {
8691
Service service1 = new SparkClusterResourceSpec(cluster, new SparkConf()).getMasterService();
8792
assertEquals("my-namespace", service1.getMetadata().getNamespace());
8893
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
94+
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
8995

9096
Service service2 = new SparkClusterResourceSpec(cluster, sparkConf).getMasterService();
9197
assertEquals("other-namespace", service2.getMetadata().getNamespace());
98+
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
9299
}
93100

94101
@Test
95102
void testWorkerService() {
96103
Service service1 = new SparkClusterResourceSpec(cluster, new SparkConf()).getWorkerService();
97104
assertEquals("my-namespace", service1.getMetadata().getNamespace());
98105
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
106+
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
99107

100108
Service service2 = new SparkClusterResourceSpec(cluster, sparkConf).getMasterService();
101109
assertEquals("other-namespace", service2.getMetadata().getNamespace());
@@ -119,6 +127,7 @@ void testWorkerServiceWithTemplate() {
119127
assertEquals("my-namespace", service1.getMetadata().getNamespace());
120128
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
121129
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
130+
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
122131
assertEquals("foo", service1.getSpec().getExternalName());
123132
}
124133

@@ -140,6 +149,7 @@ void testMasterServiceWithTemplate() {
140149
assertEquals("my-namespace", service1.getMetadata().getNamespace());
141150
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
142151
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
152+
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
143153
assertEquals("foo", service1.getSpec().getExternalName());
144154
}
145155

@@ -149,6 +159,15 @@ void testMasterStatefulSet() {
149159
StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
150160
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
151161
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
162+
assertEquals("4.0.0", statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
163+
assertEquals(
164+
"4.0.0",
165+
statefulSet1
166+
.getSpec()
167+
.getTemplate()
168+
.getMetadata()
169+
.getLabels()
170+
.get(LABEL_SPARK_VERSION_NAME));
152171

153172
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
154173
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
@@ -185,8 +204,17 @@ void testMasterStatefulSetWithTemplate() {
185204
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
186205
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
187206
assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
207+
assertEquals("4.0.0", statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
188208
assertEquals(1, statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
189209
assertEquals(2, statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
210+
assertEquals(
211+
"4.0.0",
212+
statefulSet1
213+
.getSpec()
214+
.getTemplate()
215+
.getMetadata()
216+
.getLabels()
217+
.get(LABEL_SPARK_VERSION_NAME));
190218
}
191219

192220
@Test
@@ -195,6 +223,15 @@ void testWorkerStatefulSet() {
195223
StatefulSet statefulSet = spec.getWorkerStatefulSet();
196224
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
197225
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
226+
assertEquals("4.0.0", statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
227+
assertEquals(
228+
"4.0.0",
229+
statefulSet
230+
.getSpec()
231+
.getTemplate()
232+
.getMetadata()
233+
.getLabels()
234+
.get(LABEL_SPARK_VERSION_NAME));
198235

199236
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
200237
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
@@ -230,6 +267,15 @@ void testWorkerStatefulSetWithTemplate() {
230267
StatefulSet statefulSet = spec.getWorkerStatefulSet();
231268
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
232269
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
270+
assertEquals("4.0.0", statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
271+
assertEquals(
272+
"4.0.0",
273+
statefulSet
274+
.getSpec()
275+
.getTemplate()
276+
.getMetadata()
277+
.getLabels()
278+
.get(LABEL_SPARK_VERSION_NAME));
233279
}
234280

235281
@Test
@@ -255,6 +301,7 @@ void testHorizontalPodAutoscaler() {
255301
assertEquals("HorizontalPodAutoscaler", hpa.getKind());
256302
assertEquals("my-namespace", hpa.getMetadata().getNamespace());
257303
assertEquals("cluster-name-worker-hpa", hpa.getMetadata().getName());
304+
assertEquals("4.0.0", hpa.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
258305
assertEquals(1, hpa.getSpec().getMinReplicas());
259306
assertEquals(3, hpa.getSpec().getMaxReplicas());
260307
}

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterSubmissionWorkerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.spark.k8s.operator.spec.ClusterSpec;
3232
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
3333
import org.apache.spark.k8s.operator.spec.MasterSpec;
34+
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
3435
import org.apache.spark.k8s.operator.spec.WorkerSpec;
3536

3637
class SparkClusterSubmissionWorkerTest {
@@ -40,6 +41,7 @@ class SparkClusterSubmissionWorkerTest {
4041
ClusterTolerations clusterTolerations = new ClusterTolerations();
4142
MasterSpec masterSpec;
4243
WorkerSpec workerSpec;
44+
RuntimeVersions runtimeVersions = new RuntimeVersions();
4345

4446
@BeforeEach
4547
void setUp() {
@@ -55,6 +57,7 @@ void setUp() {
5557
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
5658
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
5759
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
60+
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
5861
}
5962

6063
@Test

0 commit comments

Comments
 (0)