diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java index 834739c3..3a323f23 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java @@ -117,7 +117,8 @@ private static Service buildMasterService( .endMetadata() .withNewSpecLike(serviceSpec) .withClusterIP("None") - .withSelector( + .addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name)) + .addToSelector( Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)) .addNewPort() .withName("web") @@ -149,7 +150,8 @@ private static Service buildWorkerService( .endMetadata() .withNewSpecLike(serviceSpec) .withClusterIP("None") - .withSelector( + .addToSelector(Collections.singletonMap(LABEL_SPARK_CLUSTER_NAME, name)) + .addToSelector( Collections.singletonMap(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)) .addNewPort() .withName("web") @@ -186,6 +188,7 @@ private static StatefulSet buildMasterStatefulSet( .editOrNewTemplate() .editOrNewMetadata() .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE) + .addToLabels(LABEL_SPARK_CLUSTER_NAME, name) .addToLabels(LABEL_SPARK_VERSION_NAME, version) .endMetadata() .editOrNewSpec() @@ -253,6 +256,7 @@ private static StatefulSet buildWorkerStatefulSet( .editOrNewTemplate() .editOrNewMetadata() .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE) + .addToLabels(LABEL_SPARK_CLUSTER_NAME, name) .addToLabels(LABEL_SPARK_VERSION_NAME, version) .endMetadata() .editOrNewSpec() diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java index ace31384..7335ff15 100644 --- a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java @@ -19,12 +19,13 @@ package org.apache.spark.k8s.operator; -import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME; +import static org.apache.spark.k8s.operator.Constants.*; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Map; import java.util.Optional; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -129,6 +130,13 @@ void testWorkerServiceWithTemplate() { assertEquals("bar", service1.getMetadata().getLabels().get("foo")); assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME)); assertEquals("foo", service1.getSpec().getExternalName()); + assertEquals( + Map.of( + LABEL_SPARK_CLUSTER_NAME, + "cluster-name", + LABEL_SPARK_ROLE_NAME, + LABEL_SPARK_ROLE_WORKER_VALUE), + service1.getSpec().getSelector()); } @Test @@ -151,6 +159,13 @@ void testMasterServiceWithTemplate() { assertEquals("bar", service1.getMetadata().getLabels().get("foo")); assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME)); assertEquals("foo", service1.getSpec().getExternalName()); + assertEquals( + Map.of( + LABEL_SPARK_CLUSTER_NAME, + "cluster-name", + LABEL_SPARK_ROLE_NAME, + LABEL_SPARK_ROLE_MASTER_VALUE), + service1.getSpec().getSelector()); } @Test @@ -172,6 +187,17 @@ void testMasterStatefulSet() { SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf); StatefulSet statefulSet2 = spec2.getMasterStatefulSet(); assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace()); + assertEquals( + "cluster-name", + statefulSet2 + .getSpec() + .getTemplate() + .getMetadata() + .getLabels() + .get(LABEL_SPARK_CLUSTER_NAME)); + assertEquals( + LABEL_SPARK_ROLE_MASTER_VALUE, + statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME)); } @Test @@ -236,6 +262,17 @@ void testWorkerStatefulSet() { SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf); StatefulSet statefulSet2 = spec2.getWorkerStatefulSet(); assertEquals("other-namespace", statefulSet2.getMetadata().getNamespace()); + assertEquals( + "cluster-name", + statefulSet2 + .getSpec() + .getTemplate() + .getMetadata() + .getLabels() + .get(LABEL_SPARK_CLUSTER_NAME)); + assertEquals( + LABEL_SPARK_ROLE_WORKER_VALUE, + statefulSet2.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_ROLE_NAME)); } @Test