Skip to content

Commit 93204b2

Browse files
csviridongjoon-hyun
authored andcommitted
[SPARK-52755] Remove ConfigMap informer from config reconciler
### What changes were proposed in this pull request? PR removes unnecessary informer for the configuration `SparkOperatorConfigMapReconciler`. It adds a sanity test using `KubeApiTest`. ### Why are the changes needed? Having additional informer might effect performance, memory consumption, also maintains an additiona websocket connection to Kubernetes API. ### Does this PR introduce _any_ user-facing change? No, but introduces new way of testing. ### How was this patch tested? With an integration test. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#281 from csviri/remove-informer-config-reconciler. Authored-by: Attila Mészáros <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 2895467 commit 93204b2

File tree

5 files changed

+90
-21
lines changed

5 files changed

+90
-21
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ shadow-jar-plugin = "8.3.6"
4242
kubernetes-client = { group = "io.fabric8", name = "kubernetes-client", version.ref = "fabric8" }
4343
kubernetes-httpclient-okhttp = { group = "io.fabric8", name = "kubernetes-httpclient-okhttp", version.ref = "fabric8" }
4444
kubernetes-server-mock = { group = "io.fabric8", name = "kubernetes-server-mock", version.ref = "fabric8" }
45+
kube-api-test-client-inject = {group = "io.fabric8", name = "kube-api-test-client-inject", version.ref = "fabric8"}
4546
crd-generator-apt = { group = "io.fabric8", name = "crd-generator-apt", version.ref = "fabric8" }
4647
okhttp = { group = "com.squareup.okhttp3", name = "okhttp", version.ref = "okhttp" }
4748
mockwebserver = { group = "com.squareup.okhttp3", name = "mockwebserver", version.ref = "okhttp" }

spark-operator/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ dependencies {
8080
testRuntimeOnly(libs.junit.platform.launcher)
8181

8282
testImplementation(libs.mockito.core)
83+
testImplementation(libs.kube.api.test.client.inject)
8384
}
8485

8586
test {

spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,7 @@ protected Operator registerSparkOperatorConfMonitor() {
128128
confSelector);
129129
op.register(
130130
new SparkOperatorConfigMapReconciler(
131-
this::updateWatchingNamespaces,
132-
SparkOperatorConf.OPERATOR_NAMESPACE.getValue(),
133-
unused -> getWatchedNamespaces()),
131+
this::updateWatchingNamespaces, unused -> getWatchedNamespaces()),
134132
c -> {
135133
c.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
136134
c.settingNamespaces(operatorNamespace);

spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,15 @@
1919

2020
package org.apache.spark.k8s.operator.config;
2121

22-
import java.util.List;
2322
import java.util.Set;
2423
import java.util.function.Function;
2524

2625
import io.fabric8.kubernetes.api.model.ConfigMap;
27-
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
2826
import io.javaoperatorsdk.operator.api.reconciler.Context;
2927
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
3028
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
31-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
3229
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3330
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
34-
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
35-
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
3631
import lombok.RequiredArgsConstructor;
3732
import lombok.extern.slf4j.Slf4j;
3833

@@ -46,7 +41,6 @@
4641
@Slf4j
4742
public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> {
4843
private final Function<Set<String>, Boolean> namespaceUpdater;
49-
private final String operatorNamespace;
5044
private final Function<Void, Set<String>> watchedNamespacesGetter;
5145

5246
@Override
@@ -56,18 +50,6 @@ public ErrorStatusUpdateControl<ConfigMap> updateErrorStatus(
5650
return ErrorStatusUpdateControl.noStatusUpdate();
5751
}
5852

59-
@Override
60-
public List<EventSource<?, ConfigMap>> prepareEventSources(
61-
EventSourceContext<ConfigMap> context) {
62-
var configMapEventSource =
63-
new InformerEventSource<>(
64-
InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class)
65-
.withNamespaces(Set.of(operatorNamespace))
66-
.build(),
67-
context);
68-
return List.of(configMapEventSource);
69-
}
70-
7153
@Override
7254
public UpdateControl<ConfigMap> reconcile(ConfigMap resource, Context<ConfigMap> context)
7355
throws Exception {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.k8s.operator.config;
21+
22+
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.RECONCILER_INTERVAL_SECONDS;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.awaitility.Awaitility.await;
25+
import static org.mockito.Mockito.mock;
26+
27+
import java.util.Map;
28+
import java.util.function.Function;
29+
30+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
31+
import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
32+
import io.fabric8.kubernetes.api.model.ConfigMap;
33+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
34+
import io.fabric8.kubernetes.client.KubernetesClient;
35+
import io.javaoperatorsdk.operator.Operator;
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.BeforeEach;
38+
import org.junit.jupiter.api.Test;
39+
40+
@SuppressFBWarnings(
41+
value = {"UWF_UNWRITTEN_FIELD"},
42+
justification = "Unwritten fields are covered by Kubernetes mock client")
43+
@EnableKubeAPIServer
44+
class SparkOperatorConfigMapReconcilerTest {
45+
46+
public static final Long TARGET_RECONCILER_INTERVAL = 60L;
47+
48+
private static KubernetesClient client;
49+
50+
Operator operator;
51+
52+
@BeforeEach
53+
@SuppressWarnings("unchecked")
54+
void startController() {
55+
var reconciler =
56+
new SparkOperatorConfigMapReconciler(mock(Function.class), mock(Function.class));
57+
operator = new Operator(o -> o.withKubernetesClient(client));
58+
operator.register(reconciler);
59+
operator.start();
60+
}
61+
62+
@AfterEach
63+
void stopController() {
64+
operator.stop();
65+
}
66+
67+
@Test
68+
@SuppressWarnings("PMD.UnitTestShouldIncludeAssert")
69+
void sanityTest() {
70+
client.resource(testConfigMap()).create();
71+
72+
await()
73+
.untilAsserted(
74+
() -> {
75+
assertThat(RECONCILER_INTERVAL_SECONDS.getValue()).isEqualTo(60L);
76+
});
77+
}
78+
79+
ConfigMap testConfigMap() {
80+
ConfigMap configMap = new ConfigMap();
81+
configMap.setMetadata(
82+
new ObjectMetaBuilder().withName("spark-conf").withNamespace("default").build());
83+
configMap.setData(
84+
Map.of(RECONCILER_INTERVAL_SECONDS.getKey(), TARGET_RECONCILER_INTERVAL.toString()));
85+
return configMap;
86+
}
87+
}

0 commit comments

Comments
 (0)