Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[versions]
fabric8 = "7.3.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, but please revert this, @csviri . You can do this upgrade independently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, wait, your PR looks strange. We already have this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yes, sorry, did this rather hastily. Rebased now.

lombok = "1.18.38"
operator-sdk = "4.9.0"
operator-sdk = "5.1.1"
okhttp = "4.12.0"
dropwizard-metrics = "4.2.30"
spark = "4.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener;
import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -168,8 +169,8 @@ protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider)
overrider.withKubernetesClient(client);
overrider.withStopOnInformerErrorDuringStartup(
SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
overrider.withTerminationTimeoutSeconds(
SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
overrider.withReconciliationTerminationTimeout(
Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()));
int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
if (parallelism > 0) {
log.info("Configuring operator with {} reconciliation threads.", parallelism);
Expand All @@ -187,6 +188,7 @@ protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider)
overrider.withMetrics(operatorJosdkMetrics);
metricsSystem.registerSource(operatorJosdkMetrics);
}
overrider.withUseSSAToPatchPrimaryResource(false);
}

protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider overrider) {
Expand All @@ -198,6 +200,7 @@ protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider overri
overrider.withInformerStoppedHandler(
(informer, ex) ->
log.error("Dynamic config informer stopped: operator will not accept config updates."));
overrider.withUseSSAToPatchPrimaryResource(false);
}

protected void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@

package org.apache.spark.k8s.operator.config;

import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
Expand All @@ -46,10 +44,7 @@
@ControllerConfiguration
@RequiredArgsConstructor
@Slf4j
public class SparkOperatorConfigMapReconciler
implements Reconciler<ConfigMap>,
ErrorStatusHandler<ConfigMap>,
EventSourceInitializer<ConfigMap> {
public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> {
private final Function<Set<String>, Boolean> namespaceUpdater;
private final String operatorNamespace;
private final Function<Void, Set<String>> watchedNamespacesGetter;
Expand All @@ -62,14 +57,15 @@ public ErrorStatusUpdateControl<ConfigMap> updateErrorStatus(
}

@Override
public Map<String, EventSource> prepareEventSources(EventSourceContext<ConfigMap> context) {
EventSource configMapEventSource =
public List<EventSource<?, ConfigMap>> prepareEventSources(
EventSourceContext<ConfigMap> context) {
var configMapEventSource =
new InformerEventSource<>(
InformerConfiguration.from(ConfigMap.class, context)
.withNamespaces(operatorNamespace)
InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class)
.withNamespaces(Set.of(operatorNamespace))
.build(),
context);
return EventSourceInitializer.nameEventSources(configMapEventSource);
return List.of(configMapEventSource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,25 @@

import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import io.fabric8.kubernetes.api.model.Pod;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -74,11 +71,7 @@
@ControllerConfiguration
@Slf4j
@RequiredArgsConstructor
public class SparkAppReconciler
implements Reconciler<SparkApplication>,
ErrorStatusHandler<SparkApplication>,
EventSourceInitializer<SparkApplication>,
Cleaner<SparkApplication> {
public class SparkAppReconciler implements Reconciler<SparkApplication>, Cleaner<SparkApplication> {
private final SparkAppSubmissionWorker submissionWorker;
private final SparkAppStatusRecorder sparkAppStatusRecorder;
private final SentinelManager<SparkApplication> sentinelManager;
Expand Down Expand Up @@ -135,16 +128,17 @@ public ErrorStatusUpdateControl<SparkApplication> updateErrorStatus(
}

@Override
public Map<String, EventSource> prepareEventSources(
public List<EventSource<?, SparkApplication>> prepareEventSources(
EventSourceContext<SparkApplication> context) {
EventSource podEventSource =
new InformerEventSource<>(
InformerConfiguration.from(Pod.class, context)
.withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME))
InformerEventSourceConfiguration.from(Pod.class, SparkApplication.class)
.withSecondaryToPrimaryMapper(
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
.withLabelSelector(commonResourceLabelsStr())
.build(),
context);
return EventSourceInitializer.nameEventSources(podEventSource);
return List.of(podEventSource);
}

protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,29 @@

package org.apache.spark.k8s.operator.reconciler;

import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.fabric8.kubernetes.api.model.Pod;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkCluster;
import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker;
import org.apache.spark.k8s.operator.context.SparkClusterContext;
Expand All @@ -61,11 +58,7 @@
@ControllerConfiguration
@Slf4j
@RequiredArgsConstructor
public class SparkClusterReconciler
implements Reconciler<SparkCluster>,
ErrorStatusHandler<SparkCluster>,
EventSourceInitializer<SparkCluster>,
Cleaner<SparkCluster> {
public class SparkClusterReconciler implements Reconciler<SparkCluster>, Cleaner<SparkCluster> {
private final SparkClusterSubmissionWorker submissionWorker;
private final SparkClusterStatusRecorder sparkClusterStatusRecorder;
private final SentinelManager<SparkCluster> sentinelManager;
Expand Down Expand Up @@ -118,16 +111,17 @@ public ErrorStatusUpdateControl<SparkCluster> updateErrorStatus(
}

@Override
public Map<String, EventSource> prepareEventSources(EventSourceContext<SparkCluster> context) {
public List<EventSource<?, SparkCluster>> prepareEventSources(
EventSourceContext<SparkCluster> context) {
EventSource podEventSource =
new InformerEventSource<>(
InformerConfiguration.from(Pod.class, context)
InformerEventSourceConfiguration.from(Pod.class, SparkCluster.class)
.withSecondaryToPrimaryMapper(
Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME))
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
.withLabelSelector(commonResourceLabelsStr())
.build(),
context);
return EventSourceInitializer.nameEventSources(podEventSource);
return List.of(podEventSource);
}

protected List<ClusterReconcileStep> getReconcileSteps(final SparkCluster cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import java.util.Set;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import org.apache.commons.lang3.StringUtils;

import org.apache.spark.k8s.operator.Constants;
Expand Down Expand Up @@ -144,4 +147,25 @@ public static List<SparkClusterStatusListener> getClusterStatusListener() {
public static String commonResourceLabelsStr() {
return labelsAsStr(commonManagedResourceLabels());
}

public static <T extends HasMetadata>
SecondaryToPrimaryMapper<T> basicLabelSecondaryToPrimaryMapper(String nameKey) {
return resource -> {
final var metadata = resource.getMetadata();
if (metadata == null) {
return Collections.emptySet();
} else {
final var map = metadata.getLabels();
if (map == null) {
return Collections.emptySet();
}
var name = map.get(nameKey);
if (name == null) {
return Collections.emptySet();
}
var namespace = resource.getMetadata().getNamespace();
return Set.of(new ResourceID(name, namespace));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RuntimeInfo;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
Expand Down Expand Up @@ -190,16 +189,6 @@ public String getTargetNamespace() {
}
}));

return new InformerWrappingEventSourceHealthIndicator() {
@Override
public Map<String, InformerHealthIndicator> informerHealthIndicators() {
return informers;
}

@Override
public ResourceConfiguration getInformerConfiguration() {
return null;
}
};
return () -> informers;
}
}
Loading