Skip to content

Commit 7915164

Browse files
schmaxXximiliandongjoon-hyun
authored andcommitted
[SPARK-52997] Fixes wrong worker assignment if multiple clusters are deployed to the same namespace
### What changes were proposed in this pull request? Updated the podSelector for master and worker services to include both clusterRole and name labels. ### Why are the changes needed? Using only clusterRole caused service misrouting when multiple Spark clusters were deployed in the same namespace. Adding name ensures correct pod targeting. ### Does this PR introduce any user-facing change? No, this is an internal fix to service selectors. ### How was this patch tested? Tested with multiple clusters in the same namespace. Verified each service only matched its own pods via kubectl describe service. Also adapted unit tests to reflect new behaviour ### Was this patch authored or co-authored using generative AI tooling? Yes, PR metadata was assisted by AI, but code changes were made manually. Closes apache#291 from schmaxXximilian/main. Authored-by: Schmöller Maximilian <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b0d7d95 commit 7915164

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ private static Service buildMasterService(
133133
.endMetadata()
134134
.withNewSpecLike(serviceSpec)
135135
.withClusterIP("None")
136-
.withSelector(
136+
.addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name))
137+
.addToSelector(
137138
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE))
138139
.addNewPort()
139140
.withName("web")
@@ -175,7 +176,8 @@ private static Service buildWorkerService(
175176
.endMetadata()
176177
.withNewSpecLike(serviceSpec)
177178
.withClusterIP("None")
178-
.withSelector(
179+
.addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name))
180+
.addToSelector(
179181
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE))
180182
.addNewPort()
181183
.withName("web")
@@ -225,6 +227,7 @@ private static StatefulSet buildMasterStatefulSet(
225227
.editOrNewTemplate()
226228
.editOrNewMetadata()
227229
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
230+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
228231
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
229232
.endMetadata()
230233
.editOrNewSpec()
@@ -306,6 +309,7 @@ private static StatefulSet buildWorkerStatefulSet(
306309
.editOrNewTemplate()
307310
.editOrNewMetadata()
308311
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
312+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
309313
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
310314
.endMetadata()
311315
.editOrNewSpec()

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22-
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
22+
import static org.apache.spark.k8s.operator.Constants.*;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.when;
2727

28+
import java.util.Map;
2829
import java.util.Optional;
2930

3031
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -129,6 +130,13 @@ void testWorkerServiceWithTemplate() {
129130
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
130131
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
131132
assertEquals("foo", service1.getSpec().getExternalName());
133+
assertEquals(
134+
Map.of(
135+
LABEL_SPARK_CLUSTER_NAME,
136+
"cluster-name",
137+
LABEL_SPARK_ROLE_NAME,
138+
LABEL_SPARK_ROLE_WORKER_VALUE),
139+
service1.getSpec().getSelector());
132140
}
133141

134142
@Test
@@ -151,6 +159,13 @@ void testMasterServiceWithTemplate() {
151159
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
152160
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
153161
assertEquals("foo", service1.getSpec().getExternalName());
162+
assertEquals(
163+
Map.of(
164+
LABEL_SPARK_CLUSTER_NAME,
165+
"cluster-name",
166+
LABEL_SPARK_ROLE_NAME,
167+
LABEL_SPARK_ROLE_MASTER_VALUE),
168+
service1.getSpec().getSelector());
154169
}
155170

156171
@Test
@@ -172,6 +187,17 @@ void testMasterStatefulSet() {
172187
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
173188
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
174189
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
190+
assertEquals(
191+
"cluster-name",
192+
statefulSet2
193+
.getSpec()
194+
.getTemplate()
195+
.getMetadata()
196+
.getLabels()
197+
.get(LABEL_SPARK_CLUSTER_NAME));
198+
assertEquals(
199+
LABEL_SPARK_ROLE_MASTER_VALUE,
200+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
175201
}
176202

177203
@Test
@@ -236,6 +262,17 @@ void testWorkerStatefulSet() {
236262
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
237263
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
238264
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
265+
assertEquals(
266+
"cluster-name",
267+
statefulSet2
268+
.getSpec()
269+
.getTemplate()
270+
.getMetadata()
271+
.getLabels()
272+
.get(LABEL_SPARK_CLUSTER_NAME));
273+
assertEquals(
274+
LABEL_SPARK_ROLE_WORKER_VALUE,
275+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
239276
}
240277

241278
@Test

0 commit comments

Comments
 (0)