diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 507399db..4be9c35b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ [versions] fabric8 = "7.3.1" lombok = "1.18.38" -operator-sdk = "4.9.0" +operator-sdk = "5.1.1" okhttp = "4.12.0" dropwizard-metrics = "4.2.30" spark = "4.0.0" diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java index 00619d7a..d0f750a0 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java @@ -23,6 +23,7 @@ import static org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener; import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -168,8 +169,8 @@ protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) overrider.withKubernetesClient(client); overrider.withStopOnInformerErrorDuringStartup( SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue()); - overrider.withTerminationTimeoutSeconds( - SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()); + overrider.withReconciliationTerminationTimeout( + Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue())); int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue(); if (parallelism > 0) { log.info("Configuring operator with {} reconciliation threads.", parallelism); @@ -187,6 +188,7 @@ protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) overrider.withMetrics(operatorJosdkMetrics); metricsSystem.registerSource(operatorJosdkMetrics); } + overrider.withUseSSAToPatchPrimaryResource(false); } protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider overrider) { @@ -198,6 +200,7 @@ protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider overri overrider.withInformerStoppedHandler( (informer, ex) -> log.error("Dynamic config informer stopped: operator will not accept config updates.")); + overrider.withUseSSAToPatchPrimaryResource(false); } protected void overrideControllerConfigs(ControllerConfigurationOverrider overrider) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 182f4238..491057d4 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -19,18 +19,16 @@ package org.apache.spark.k8s.operator.config; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.function.Function; import io.fabric8.kubernetes.api.model.ConfigMap; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -46,10 +44,7 @@ @ControllerConfiguration @RequiredArgsConstructor @Slf4j -public class SparkOperatorConfigMapReconciler - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer { +public class SparkOperatorConfigMapReconciler implements Reconciler { private final Function, Boolean> namespaceUpdater; private final String operatorNamespace; private final Function> watchedNamespacesGetter; @@ -62,14 +57,15 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources(EventSourceContext context) { - EventSource configMapEventSource = + public List> prepareEventSources( + EventSourceContext context) { + var configMapEventSource = new InformerEventSource<>( - InformerConfiguration.from(ConfigMap.class, context) - .withNamespaces(operatorNamespace) + InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class) + .withNamespaces(Set.of(operatorNamespace)) .build(), context); - return EventSourceInitializer.nameEventSources(configMapEventSource); + return List.of(configMapEventSource); } @Override diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java index 89c52134..9b1323b7 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java @@ -21,28 +21,25 @@ import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper; import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import io.fabric8.kubernetes.api.model.Pod; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -74,11 +71,7 @@ @ControllerConfiguration @Slf4j @RequiredArgsConstructor -public class SparkAppReconciler - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { +public class SparkAppReconciler implements Reconciler, Cleaner { private final SparkAppSubmissionWorker submissionWorker; private final SparkAppStatusRecorder sparkAppStatusRecorder; private final SentinelManager sentinelManager; @@ -135,16 +128,17 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { EventSource podEventSource = new InformerEventSource<>( - InformerConfiguration.from(Pod.class, context) - .withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME)) + InformerEventSourceConfiguration.from(Pod.class, SparkApplication.class) + .withSecondaryToPrimaryMapper( + basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME)) .withLabelSelector(commonResourceLabelsStr()) .build(), context); - return EventSourceInitializer.nameEventSources(podEventSource); + return List.of(podEventSource); } protected List getReconcileSteps(final SparkApplication app) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java index fde1a982..f99d5580 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java @@ -19,32 +19,29 @@ package org.apache.spark.k8s.operator.reconciler; +import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper; import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr; import java.util.ArrayList; import java.util.List; -import java.util.Map; import io.fabric8.kubernetes.api.model.Pod; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.spark.k8s.operator.Constants; import org.apache.spark.k8s.operator.SparkCluster; import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker; import org.apache.spark.k8s.operator.context.SparkClusterContext; @@ -61,11 +58,7 @@ @ControllerConfiguration @Slf4j @RequiredArgsConstructor -public class SparkClusterReconciler - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { +public class SparkClusterReconciler implements Reconciler, Cleaner { private final SparkClusterSubmissionWorker submissionWorker; private final SparkClusterStatusRecorder sparkClusterStatusRecorder; private final SentinelManager sentinelManager; @@ -118,16 +111,17 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources(EventSourceContext context) { + public List> prepareEventSources( + EventSourceContext context) { EventSource podEventSource = new InformerEventSource<>( - InformerConfiguration.from(Pod.class, context) + InformerEventSourceConfiguration.from(Pod.class, SparkCluster.class) .withSecondaryToPrimaryMapper( - Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME)) + basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME)) .withLabelSelector(commonResourceLabelsStr()) .build(), context); - return EventSourceInitializer.nameEventSources(podEventSource); + return List.of(podEventSource); } protected List getReconcileSteps(final SparkCluster cluster) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java index 28c1cf6b..7ab3574f 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java @@ -38,6 +38,9 @@ import java.util.Set; import java.util.stream.Collectors; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import org.apache.commons.lang3.StringUtils; import org.apache.spark.k8s.operator.Constants; @@ -144,4 +147,25 @@ public static List getClusterStatusListener() { public static String commonResourceLabelsStr() { return labelsAsStr(commonManagedResourceLabels()); } + + public static + SecondaryToPrimaryMapper basicLabelSecondaryToPrimaryMapper(String nameKey) { + return resource -> { + final var metadata = resource.getMetadata(); + if (metadata == null) { + return Collections.emptySet(); + } else { + final var map = metadata.getLabels(); + if (map == null) { + return Collections.emptySet(); + } + var name = map.get(nameKey); + if (name == null) { + return Collections.emptySet(); + } + var namespace = resource.getMetadata().getNamespace(); + return Set.of(new ResourceID(name, namespace)); + } + }; + } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java index a8b45874..958a8d20 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java @@ -36,7 +36,6 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.RuntimeInfo; -import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.Status; @@ -190,16 +189,6 @@ public String getTargetNamespace() { } })); - return new InformerWrappingEventSourceHealthIndicator() { - @Override - public Map informerHealthIndicators() { - return informers; - } - - @Override - public ResourceConfiguration getInformerConfiguration() { - return null; - } - }; + return () -> informers; } }