Skip to content

Commit c07aad8

Browse files
committed
Bump Java Operator SDK to v5.1
Signed-off-by: Attila Mészáros <[email protected]>
1 parent c95ed16 commit c07aad8

File tree

7 files changed

+55
-57
lines changed

7 files changed

+55
-57
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
[versions]
1818
fabric8 = "7.3.1"
1919
lombok = "1.18.38"
20-
operator-sdk = "4.9.0"
20+
operator-sdk = "5.1.0"
2121
okhttp = "4.12.0"
2222
dropwizard-metrics = "4.2.30"
2323
spark = "4.0.0"

spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener;
2424
import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;
2525

26+
import java.time.Duration;
2627
import java.util.ArrayList;
2728
import java.util.Arrays;
2829
import java.util.HashSet;
@@ -168,8 +169,8 @@ protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider)
168169
overrider.withKubernetesClient(client);
169170
overrider.withStopOnInformerErrorDuringStartup(
170171
SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
171-
overrider.withTerminationTimeoutSeconds(
172-
SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
172+
overrider.withReconciliationTerminationTimeout(
173+
Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()));
173174
int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
174175
if (parallelism > 0) {
175176
log.info("Configuring operator with {} reconciliation threads.", parallelism);

spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,16 @@
1919

2020
package org.apache.spark.k8s.operator.config;
2121

22-
import java.util.Map;
22+
import java.util.List;
2323
import java.util.Set;
2424
import java.util.function.Function;
2525

2626
import io.fabric8.kubernetes.api.model.ConfigMap;
27-
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
27+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
2828
import io.javaoperatorsdk.operator.api.reconciler.Context;
2929
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
30-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
3130
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
3231
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
33-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
3432
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
3533
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
3634
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
@@ -46,10 +44,7 @@
4644
@ControllerConfiguration
4745
@RequiredArgsConstructor
4846
@Slf4j
49-
public class SparkOperatorConfigMapReconciler
50-
implements Reconciler<ConfigMap>,
51-
ErrorStatusHandler<ConfigMap>,
52-
EventSourceInitializer<ConfigMap> {
47+
public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> {
5348
private final Function<Set<String>, Boolean> namespaceUpdater;
5449
private final String operatorNamespace;
5550
private final Function<Void, Set<String>> watchedNamespacesGetter;
@@ -62,14 +57,15 @@ public ErrorStatusUpdateControl<ConfigMap> updateErrorStatus(
6257
}
6358

6459
@Override
65-
public Map<String, EventSource> prepareEventSources(EventSourceContext<ConfigMap> context) {
66-
EventSource configMapEventSource =
60+
public List<EventSource<?, ConfigMap>> prepareEventSources(
61+
EventSourceContext<ConfigMap> context) {
62+
var configMapEventSource =
6763
new InformerEventSource<>(
68-
InformerConfiguration.from(ConfigMap.class, context)
69-
.withNamespaces(operatorNamespace)
64+
InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class)
65+
.withNamespaces(Set.of(operatorNamespace))
7066
.build(),
7167
context);
72-
return EventSourceInitializer.nameEventSources(configMapEventSource);
68+
return List.of(configMapEventSource);
7369
}
7470

7571
@Override

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,25 @@
2121

2222
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
2323
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
24+
import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
2425
import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
2526

2627
import java.util.ArrayList;
2728
import java.util.Collections;
2829
import java.util.List;
29-
import java.util.Map;
3030

3131
import io.fabric8.kubernetes.api.model.Pod;
32-
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
32+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
3333
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
3434
import io.javaoperatorsdk.operator.api.reconciler.Context;
3535
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
3636
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
37-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
3837
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
3938
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
40-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
4139
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
4240
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
4341
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
4442
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
45-
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
4643
import lombok.RequiredArgsConstructor;
4744
import lombok.extern.slf4j.Slf4j;
4845

@@ -74,11 +71,7 @@
7471
@ControllerConfiguration
7572
@Slf4j
7673
@RequiredArgsConstructor
77-
public class SparkAppReconciler
78-
implements Reconciler<SparkApplication>,
79-
ErrorStatusHandler<SparkApplication>,
80-
EventSourceInitializer<SparkApplication>,
81-
Cleaner<SparkApplication> {
74+
public class SparkAppReconciler implements Reconciler<SparkApplication>, Cleaner<SparkApplication> {
8275
private final SparkAppSubmissionWorker submissionWorker;
8376
private final SparkAppStatusRecorder sparkAppStatusRecorder;
8477
private final SentinelManager<SparkApplication> sentinelManager;
@@ -135,16 +128,17 @@ public ErrorStatusUpdateControl<SparkApplication> updateErrorStatus(
135128
}
136129

137130
@Override
138-
public Map<String, EventSource> prepareEventSources(
131+
public List<EventSource<?, SparkApplication>> prepareEventSources(
139132
EventSourceContext<SparkApplication> context) {
140133
EventSource podEventSource =
141134
new InformerEventSource<>(
142-
InformerConfiguration.from(Pod.class, context)
143-
.withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME))
135+
InformerEventSourceConfiguration.from(Pod.class, SparkApplication.class)
136+
.withSecondaryToPrimaryMapper(
137+
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
144138
.withLabelSelector(commonResourceLabelsStr())
145139
.build(),
146140
context);
147-
return EventSourceInitializer.nameEventSources(podEventSource);
141+
return List.of(podEventSource);
148142
}
149143

150144
protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {

spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,29 @@
1919

2020
package org.apache.spark.k8s.operator.reconciler;
2121

22+
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
2223
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
24+
import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
2325
import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
2426

2527
import java.util.ArrayList;
2628
import java.util.List;
27-
import java.util.Map;
2829

2930
import io.fabric8.kubernetes.api.model.Pod;
30-
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
31+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
3132
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
3233
import io.javaoperatorsdk.operator.api.reconciler.Context;
3334
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
3435
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
35-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
3636
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
3737
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
38-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
3938
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
4039
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
4140
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
4241
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
43-
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
4442
import lombok.RequiredArgsConstructor;
4543
import lombok.extern.slf4j.Slf4j;
4644

47-
import org.apache.spark.k8s.operator.Constants;
4845
import org.apache.spark.k8s.operator.SparkCluster;
4946
import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker;
5047
import org.apache.spark.k8s.operator.context.SparkClusterContext;
@@ -61,11 +58,7 @@
6158
@ControllerConfiguration
6259
@Slf4j
6360
@RequiredArgsConstructor
64-
public class SparkClusterReconciler
65-
implements Reconciler<SparkCluster>,
66-
ErrorStatusHandler<SparkCluster>,
67-
EventSourceInitializer<SparkCluster>,
68-
Cleaner<SparkCluster> {
61+
public class SparkClusterReconciler implements Reconciler<SparkCluster>, Cleaner<SparkCluster> {
6962
private final SparkClusterSubmissionWorker submissionWorker;
7063
private final SparkClusterStatusRecorder sparkClusterStatusRecorder;
7164
private final SentinelManager<SparkCluster> sentinelManager;
@@ -118,16 +111,17 @@ public ErrorStatusUpdateControl<SparkCluster> updateErrorStatus(
118111
}
119112

120113
@Override
121-
public Map<String, EventSource> prepareEventSources(EventSourceContext<SparkCluster> context) {
114+
public List<EventSource<?, SparkCluster>> prepareEventSources(
115+
EventSourceContext<SparkCluster> context) {
122116
EventSource podEventSource =
123117
new InformerEventSource<>(
124-
InformerConfiguration.from(Pod.class, context)
118+
InformerEventSourceConfiguration.from(Pod.class, SparkCluster.class)
125119
.withSecondaryToPrimaryMapper(
126-
Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME))
120+
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
127121
.withLabelSelector(commonResourceLabelsStr())
128122
.build(),
129123
context);
130-
return EventSourceInitializer.nameEventSources(podEventSource);
124+
return List.of(podEventSource);
131125
}
132126

133127
protected List<ClusterReconcileStep> getReconcileSteps(final SparkCluster cluster) {

spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import java.util.Set;
3939
import java.util.stream.Collectors;
4040

41+
import io.fabric8.kubernetes.api.model.HasMetadata;
42+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
43+
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
4144
import org.apache.commons.lang3.StringUtils;
4245

4346
import org.apache.spark.k8s.operator.Constants;
@@ -144,4 +147,25 @@ public static List<SparkClusterStatusListener> getClusterStatusListener() {
144147
public static String commonResourceLabelsStr() {
145148
return labelsAsStr(commonManagedResourceLabels());
146149
}
150+
151+
public static <T extends HasMetadata>
152+
SecondaryToPrimaryMapper<T> basicLabelSecondaryToPrimaryMapper(String nameKey) {
153+
return resource -> {
154+
final var metadata = resource.getMetadata();
155+
if (metadata == null) {
156+
return Collections.emptySet();
157+
} else {
158+
final var map = metadata.getLabels();
159+
if (map == null) {
160+
return Collections.emptySet();
161+
}
162+
var name = map.get(nameKey);
163+
if (name == null) {
164+
return Collections.emptySet();
165+
}
166+
var namespace = resource.getMetadata().getNamespace();
167+
return Set.of(new ResourceID(name, namespace));
168+
}
169+
};
170+
}
147171
}

spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
3737
import io.javaoperatorsdk.operator.Operator;
3838
import io.javaoperatorsdk.operator.RuntimeInfo;
39-
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
4039
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
4140
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
4241
import io.javaoperatorsdk.operator.health.Status;
@@ -190,16 +189,6 @@ public String getTargetNamespace() {
190189
}
191190
}));
192191

193-
return new InformerWrappingEventSourceHealthIndicator() {
194-
@Override
195-
public Map<String, InformerHealthIndicator> informerHealthIndicators() {
196-
return informers;
197-
}
198-
199-
@Override
200-
public ResourceConfiguration getInformerConfiguration() {
201-
return null;
202-
}
203-
};
192+
return () -> informers;
204193
}
205194
}

0 commit comments

Comments
 (0)