Skip to content

Commit e3cc007

Browse files
fixes issue where multiple sparkclusters in one namespace will lead to wrong worker assignment
1 parent 34ae166 commit e3cc007

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import static org.apache.spark.k8s.operator.Constants.*;
2323

24-
import java.util.Collections;
24+
import java.util.Map;
2525
import java.util.Optional;
2626

2727
import scala.Tuple2;
@@ -117,8 +117,9 @@ private static Service buildMasterService(
117117
.endMetadata()
118118
.withNewSpecLike(serviceSpec)
119119
.withClusterIP("None")
120+
.withClusterIP("None")
120121
.withSelector(
121-
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE))
122+
Map.of(LABEL_SPARK_CLUSTER_NAME, name, LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE))
122123
.addNewPort()
123124
.withName("web")
124125
.withPort(8080)
@@ -150,7 +151,7 @@ private static Service buildWorkerService(
150151
.withNewSpecLike(serviceSpec)
151152
.withClusterIP("None")
152153
.withSelector(
153-
Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE))
154+
Map.of(LABEL_SPARK_CLUSTER_NAME, name, LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE))
154155
.addNewPort()
155156
.withName("web")
156157
.withPort(8081)
@@ -174,7 +175,7 @@ private static StatefulSet buildMasterStatefulSet(
174175
.withNewMetadataLike(objectMeta)
175176
.withName(name + "-master")
176177
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
177-
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
178+
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
178179
.withNamespace(namespace)
179180
.endMetadata()
180181
.withNewSpecLike(statefulSetSpec)
@@ -186,6 +187,7 @@ private static StatefulSet buildMasterStatefulSet(
186187
.editOrNewTemplate()
187188
.editOrNewMetadata()
188189
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
190+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
189191
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
190192
.endMetadata()
191193
.editOrNewSpec()
@@ -253,6 +255,7 @@ private static StatefulSet buildWorkerStatefulSet(
253255
.editOrNewTemplate()
254256
.editOrNewMetadata()
255257
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
258+
.addToLabels(LABEL_SPARK_CLUSTER_NAME, name)
256259
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
257260
.endMetadata()
258261
.editOrNewSpec()

spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
package org.apache.spark.k8s.operator;
2121

22-
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
22+
import static org.apache.spark.k8s.operator.Constants.*;
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.when;
2727

28+
import java.util.Map;
2829
import java.util.Optional;
2930

3031
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -129,6 +130,9 @@ void testWorkerServiceWithTemplate() {
129130
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
130131
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
131132
assertEquals("foo", service1.getSpec().getExternalName());
133+
assertEquals(
134+
Map.of(LABEL_SPARK_CLUSTER_NAME, "cluster-name", LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE),
135+
service1.getSpec().getSelector());
132136
}
133137

134138
@Test
@@ -151,6 +155,9 @@ void testMasterServiceWithTemplate() {
151155
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
152156
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
153157
assertEquals("foo", service1.getSpec().getExternalName());
158+
assertEquals(
159+
Map.of(LABEL_SPARK_CLUSTER_NAME, "cluster-name", LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE),
160+
service1.getSpec().getSelector());
154161
}
155162

156163
@Test
@@ -172,6 +179,9 @@ void testMasterStatefulSet() {
172179
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
173180
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
174181
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
182+
assertEquals("cluster-name", statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_CLUSTER_NAME));
183+
assertEquals(LABEL_SPARK_ROLE_MASTER_VALUE, statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
184+
175185
}
176186

177187
@Test
@@ -236,6 +246,8 @@ void testWorkerStatefulSet() {
236246
SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
237247
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
238248
assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace());
249+
assertEquals("cluster-name", statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_CLUSTER_NAME));
250+
assertEquals(LABEL_SPARK_ROLE_WORKER_VALUE, statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME));
239251
}
240252

241253
@Test

0 commit comments

Comments
 (0)