diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c4c7f55f..6e69d737 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,7 @@ shadow-jar-plugin = "8.3.6" kubernetes-client = { group = "io.fabric8", name = "kubernetes-client", version.ref = "fabric8" } kubernetes-httpclient-okhttp = { group = "io.fabric8", name = "kubernetes-httpclient-okhttp", version.ref = "fabric8" } kubernetes-server-mock = { group = "io.fabric8", name = "kubernetes-server-mock", version.ref = "fabric8" } +kube-api-test-client-inject = {group = "io.fabric8", name = "kube-api-test-client-inject", version.ref = "fabric8"} crd-generator-apt = { group = "io.fabric8", name = "crd-generator-apt", version.ref = "fabric8" } okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" } mockwebserver = { group = "com.squareup.okhttp3", name = "mockwebserver", version.ref = "okhttp" } diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle index 5d3a6907..b2106e1d 100644 --- a/spark-operator/build.gradle +++ b/spark-operator/build.gradle @@ -80,6 +80,7 @@ dependencies { testRuntimeOnly(libs.junit.platform.launcher) testImplementation(libs.mockito.core) + testImplementation(libs.kube.api.test.client.inject) } test { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java index 9edeb1d2..3d492bd6 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java @@ -128,9 +128,7 @@ protected Operator registerSparkOperatorConfMonitor() { confSelector); op.register( new SparkOperatorConfigMapReconciler( - this::updateWatchingNamespaces, - SparkOperatorConf.OPERATOR_NAMESPACE.getValue(), - unused -> getWatchedNamespaces()), + this::updateWatchingNamespaces, unused -> getWatchedNamespaces()), c -> { c.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter()); c.settingNamespaces(operatorNamespace); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 491057d4..1a282b3f 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -19,20 +19,15 @@ package org.apache.spark.k8s.operator.config; -import java.util.List; import java.util.Set; import java.util.function.Function; import io.fabric8.kubernetes.api.model.ConfigMap; -import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -46,7 +41,6 @@ @Slf4j public class SparkOperatorConfigMapReconciler implements Reconciler { private final Function, Boolean> namespaceUpdater; - private final String operatorNamespace; private final Function> watchedNamespacesGetter; @Override @@ -56,18 +50,6 @@ public ErrorStatusUpdateControl updateErrorStatus( return ErrorStatusUpdateControl.noStatusUpdate(); } - @Override - public List> prepareEventSources( - EventSourceContext context) { - var configMapEventSource = - new InformerEventSource<>( - InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class) - .withNamespaces(Set.of(operatorNamespace)) - .build(), - context); - return List.of(configMapEventSource); - } - @Override public UpdateControl reconcile(ConfigMap resource, Context context) throws Exception { diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java new file mode 100644 index 00000000..603d74bb --- /dev/null +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconcilerTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.config; + +import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; + +import java.util.Map; +import java.util.function.Function; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.fabric8.kubeapitest.junit.EnableKubeAPIServer; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.Operator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +@SuppressFBWarnings( + value = {"UWF_UNWRITTEN_FIELD"}, + justification = "Unwritten fields are covered by Kubernetes mock client") +@EnableKubeAPIServer +class SparkOperatorConfigMapReconcilerTest { + + public static final Long TARGET_RECONCILER_INTERVAL = 60L; + + private static KubernetesClient client; + + Operator operator; + + @BeforeEach + @SuppressWarnings("unchecked") + void startController() { + var reconciler = + new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class)); + operator = new Operator(o -> o.withKubernetesClient(client)); + operator.register(reconciler); + operator.start(); + } + + @AfterEach + void stopController() { + operator.stop(); + } + + @Test + @SuppressWarnings("PMD.UnitTestShouldIncludeAssert") + void sanityTest() { + client.resource(testConfigMap()).create(); + + await() + .untilAsserted( + () -> { + assertThat(RECONCILER_INTERVAL_SECONDS.getValue()).isEqualTo(60L); + }); + } + + ConfigMap testConfigMap() { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata( + new ObjectMetaBuilder().withName("spark-conf").withNamespace("default").build()); + configMap.setData( + Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString())); + return configMap; + } +}