Skip to content

Commit 2a77f6c

Browse files
fixes issue where multiple sparkclusters in one namespace will lead to wrong worker assignment
1 parent 34ae166 commit 2a77f6c

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import static org.apache.spark.k8s.operator.Constants.*;
2323

24-
import java.util.Collections;
24+
import java.util.Map;
2525
import java.util.Optional;
2626

2727
import scala.Tuple2;
@@ -117,8 +117,13 @@ private static Service buildMasterService(
117117
.endMetadata()
118118
.withNewSpecLike(serviceSpec)
119119
.withClusterIP("None")
120+
.withClusterIP("None")
120121
.withSelector(
121-
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE))
122+
Map.of(
123+
LABEL_SPARK_CLUSTER_NAME,
124+
name,
125+
LABEL_SPARK_ROLE_NAME,
126+
LABEL_SPARK_ROLE_MASTER_VALUE))
122127
.addNewPort()
123128
.withName("web")
124129
.withPort(8080)
@@ -150,7 +155,11 @@ private static Service buildWorkerService(
150155
.withNewSpecLike(serviceSpec)
151156
.withClusterIP("None")
152157
.withSelector(
153-
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE))
158+
Map.of(
159+
LABEL_SPARK_CLUSTER_NAME,
160+
name,
161+
LABEL_SPARK_ROLE_NAME,
162+
LABEL_SPARK_ROLE_WORKER_VALUE))
154163
.addNewPort()
155164
.withName("web")
156165
.withPort(8081)
@@ -186,6 +195,7 @@ private static StatefulSet buildMasterStatefulSet(
186195
.editOrNewTemplate()
187196
.editOrNewMetadata()
188197
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
198+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
189199
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
190200
.endMetadata()
191201
.editOrNewSpec()
@@ -253,6 +263,7 @@ private static StatefulSet buildWorkerStatefulSet(
253263
.editOrNewTemplate()
254264
.editOrNewMetadata()
255265
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
266+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
256267
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
257268
.endMetadata()
258269
.editOrNewSpec()

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22-
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
22+
import static org.apache.spark.k8s.operator.Constants.*;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.when;
2727

28+
import java.util.Map;
2829
import java.util.Optional;
2930

3031
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -129,6 +130,13 @@ void testWorkerServiceWithTemplate() {
129130
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
130131
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
131132
assertEquals("foo", service1.getSpec().getExternalName());
133+
assertEquals(
134+
Map.of(
135+
LABEL_SPARK_CLUSTER_NAME,
136+
"cluster-name",
137+
LABEL_SPARK_ROLE_NAME,
138+
LABEL_SPARK_ROLE_WORKER_VALUE),
139+
service1.getSpec().getSelector());
132140
}
133141

134142
@Test
@@ -151,6 +159,13 @@ void testMasterServiceWithTemplate() {
151159
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
152160
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
153161
assertEquals("foo", service1.getSpec().getExternalName());
162+
assertEquals(
163+
Map.of(
164+
LABEL_SPARK_CLUSTER_NAME,
165+
"cluster-name",
166+
LABEL_SPARK_ROLE_NAME,
167+
LABEL_SPARK_ROLE_MASTER_VALUE),
168+
service1.getSpec().getSelector());
154169
}
155170

156171
@Test
@@ -172,6 +187,17 @@ void testMasterStatefulSet() {
172187
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
173188
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
174189
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
190+
assertEquals(
191+
"cluster-name",
192+
statefulSet2
193+
.getSpec()
194+
.getTemplate()
195+
.getMetadata()
196+
.getLabels()
197+
.get(LABEL_SPARK_CLUSTER_NAME));
198+
assertEquals(
199+
LABEL_SPARK_ROLE_MASTER_VALUE,
200+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
175201
}
176202

177203
@Test
@@ -236,6 +262,17 @@ void testWorkerStatefulSet() {
236262
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
237263
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
238264
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
265+
assertEquals(
266+
"cluster-name",
267+
statefulSet2
268+
.getSpec()
269+
.getTemplate()
270+
.getMetadata()
271+
.getLabels()
272+
.get(LABEL_SPARK_CLUSTER_NAME));
273+
assertEquals(
274+
LABEL_SPARK_ROLE_WORKER_VALUE,
275+
statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
239276
}
240277

241278
@Test

0 commit comments

Comments
 (0)