Skip to content

Commit 628dbdb

Browse files
schmaxXximilian송재창 AI Data개발팀
authored andcommitted
[SPARK-52997] Fixes wrong worker assignment if multiple clusters are deployed to the same namespace
### What changes were proposed in this pull request? Updated the podSelector for master and worker services to include both clusterRole and name labels. ### Why are the changes needed? Using only clusterRole caused service misrouting when multiple Spark clusters were deployed in the same namespace. Adding name ensures correct pod targeting. ### Does this PR introduce any user-facing change? No, this is an internal fix to service selectors. ### How was this patch tested? Tested with multiple clusters in the same namespace. Verified each service only matched its own pods via kubectl describe service. Also adapted unit tests to reflect new behaviour ### Was this patch authored or co-authored using generative AI tooling? Yes, PR metadata was assisted by AI, but code changes were made manually. Closes apache#291 from schmaxXximilian/main. Authored-by: Schmöller Maximilian <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 7915164)
1 parent 3879459 commit 628dbdb

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ private static Service buildMasterService(
117117
.endMetadata()
118118
.withNewSpecLike(serviceSpec)
119119
.withClusterIP("None")
120-
.withSelector(
120+
.addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name))
121+
.addToSelector(
121122
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE))
122123
.addNewPort()
123124
.withName("web")
@@ -149,7 +150,8 @@ private static Service buildWorkerService(
149150
.endMetadata()
150151
.withNewSpecLike(serviceSpec)
151152
.withClusterIP("None")
152-
.withSelector(
153+
.addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name))
154+
.addToSelector(
153155
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE))
154156
.addNewPort()
155157
.withName("web")
@@ -186,6 +188,7 @@ private static StatefulSet buildMasterStatefulSet(
186188
.editOrNewTemplate()
187189
.editOrNewMetadata()
188190
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
191+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
189192
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
190193
.endMetadata()
191194
.editOrNewSpec()
@@ -253,6 +256,7 @@ private static StatefulSet buildWorkerStatefulSet(
253256
.editOrNewTemplate()
254257
.editOrNewMetadata()
255258
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
259+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
256260
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
257261
.endMetadata()
258262
.editOrNewSpec()

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22-
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
22+
import static org.apache.spark.k8s.operator.Constants.*;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.when;
2727

28+
import java.util.Map;
2829
import java.util.Optional;
2930

3031
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -129,6 +130,13 @@ void testWorkerServiceWithTemplate() {
129130
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
130131
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
131132
assertEquals("foo", service1.getSpec().getExternalName());
133+
assertEquals(
134+
Map.of(
135+
LABEL_SPARK_CLUSTER_NAME,
136+
"cluster-name",
137+
LABEL_SPARK_ROLE_NAME,
138+
LABEL_SPARK_ROLE_WORKER_VALUE),
139+
service1.getSpec().getSelector());
132140
}
133141

134142
@Test
@@ -151,6 +159,13 @@ void testMasterServiceWithTemplate() {
151159
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
152160
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
153161
assertEquals("foo", service1.getSpec().getExternalName());
162+
assertEquals(
163+
Map.of(
164+
LABEL_SPARK_CLUSTER_NAME,
165+
"cluster-name",
166+
LABEL_SPARK_ROLE_NAME,
167+
LABEL_SPARK_ROLE_MASTER_VALUE),
168+
service1.getSpec().getSelector());
154169
}
155170

156171
@Test
@@ -172,6 +187,17 @@ void testMasterStatefulSet() {
172187
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
173188
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
174189
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
190+
assertEquals(
191+
"cluster-name",
192+
statefulSet2
193+
.getSpec()
194+
.getTemplate()
195+
.getMetadata()
196+
.getLabels()
197+
.get(LABEL_SPARK_CLUSTER_NAME));
198+
assertEquals(
199+
LABEL_SPARK_ROLE_MASTER_VALUE,
200+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
175201
}
176202

177203
@Test
@@ -236,6 +262,17 @@ void testWorkerStatefulSet() {
236262
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
237263
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
238264
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
265+
assertEquals(
266+
"cluster-name",
267+
statefulSet2
268+
.getSpec()
269+
.getTemplate()
270+
.getMetadata()
271+
.getLabels()
272+
.get(LABEL_SPARK_CLUSTER_NAME));
273+
assertEquals(
274+
LABEL_SPARK_ROLE_WORKER_VALUE,
275+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
239276
}
240277

241278
@Test

0 commit comments

Comments
 (0)