From 0617f76e867054ccb4f91aba8d41cef2dbbea7e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 19 May 2025 17:36:14 +0200 Subject: [PATCH 01/12] migrate to JOSDK v5.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../kubernetes/operator/FlinkOperator.java | 7 +- .../KubernetesOperatorConfigOptions.java | 6 +- .../controller/FlinkDeploymentController.java | 10 +-- .../controller/FlinkSessionJobController.java | 7 +- .../FlinkStateSnapshotController.java | 13 ++-- .../metrics/OperatorJosdkMetrics.java | 20 ++++-- .../operator/utils/EventSourceUtils.java | 66 ++++++++++++------- .../TestingFlinkDeploymentController.java | 10 +-- .../TestingFlinkSessionJobController.java | 12 ---- pom.xml | 11 +++- 10 files changed, 88 insertions(+), 74 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 4bd2836f7a..a8524fa5db 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -66,6 +66,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -150,10 +151,10 @@ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) { overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager)); } - overrider.withTerminationTimeoutSeconds( - (int) + overrider.withReconciliationTerminationTimeout( + Duration.ofSeconds( conf.get(KubernetesOperatorConfigOptions.OPERATOR_TERMINATION_TIMEOUT) - .toSeconds()); + .toSeconds())); overrider.withStopOnInformerErrorDuringStartup( conf.get(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index f4203fb7be..393957b0ac 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -496,13 +496,13 @@ public static String operatorConfigKey(String key) { .withDescription( "Whether informer errors should stop operator startup. If false, the startup will ignore recoverable errors, caused for example by RBAC issues and will retry periodically."); + public static final int DEFAULT_TERMINATION_TIMEOUT_SECONDS = 10; + @Documentation.Section(SECTION_ADVANCED) public static final ConfigOption OPERATOR_TERMINATION_TIMEOUT = operatorConfig("termination.timeout") .durationType() - .defaultValue( - Duration.ofSeconds( - ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS)) + .defaultValue(Duration.ofSeconds(DEFAULT_TERMINATION_TIMEOUT_SECONDS)) .withDescription( "Operator shutdown timeout before reconciliation threads are killed."); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index 51235b8ebe..6eddc1a76c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -43,16 +43,15 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -62,10 +61,7 @@ /** Controller that runs the main reconcile loop for Flink deployments. */ @ControllerConfiguration public class FlinkDeploymentController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); private final Set validators; @@ -185,7 +181,7 @@ private void triggerErrorEvent( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { List eventSources = new ArrayList<>(); eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 7454864fe7..24e2752f91 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -40,10 +40,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import org.slf4j.Logger; @@ -52,6 +50,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.List; import java.util.Optional; import java.util.Set; @@ -59,8 +58,6 @@ @ControllerConfiguration() public class FlinkSessionJobController implements io.javaoperatorsdk.operator.api.reconciler.Reconciler, - ErrorStatusHandler, - EventSourceInitializer, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class); @@ -179,7 +176,7 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { List eventSources = new ArrayList<>(); eventSources.add(EventSourceUtils.getFlinkDeploymentInformerEventSource(context)); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java index 1a516c3d9f..dd7c644b9e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotController.java @@ -34,10 +34,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -48,6 +46,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -55,10 +54,7 @@ @RequiredArgsConstructor @ControllerConfiguration public class FlinkStateSnapshotController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { private static final Logger LOG = LoggerFactory.getLogger(FlinkStateSnapshotController.class); @@ -154,10 +150,9 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext context) { - return EventSourceInitializer.nameEventSources( - EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context)); + return List.of(EventSourceUtils.getFlinkStateSnapshotInformerEventSources(context)); } /** diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java index 5350e3cd76..66c191d6ee 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java @@ -28,6 +28,7 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -108,7 +109,8 @@ public void cleanupDoneFor(ResourceID resourceID, Map metadata) @Override public void reconcileCustomResource( - ResourceID resourceID, RetryInfo retryInfoNullable, Map metadata) { + HasMetadata resource, RetryInfo retryInfoNullable, Map metadata) { + var resourceID = ResourceID.fromResource(resource); counter(getResourceMg(resourceID, metadata), RECONCILIATION).inc(); if (retryInfoNullable != null) { @@ -117,14 +119,22 @@ public void reconcileCustomResource( } @Override - public void finishedReconciliation(ResourceID resourceID, Map metadata) { - counter(getResourceMg(resourceID, metadata), RECONCILIATION, "finished").inc(); + public void finishedReconciliation(HasMetadata resource, Map metadata) { + counter( + getResourceMg(ResourceID.fromResource(resource), metadata), + RECONCILIATION, + "finished") + .inc(); } @Override public void failedReconciliation( - ResourceID resourceID, Exception exception, Map metadata) { - counter(getResourceMg(resourceID, metadata), RECONCILIATION, "failed").inc(); + HasMetadata resource, Exception exception, Map metadata) { + counter( + getResourceMg(ResourceID.fromResource(resource), metadata), + RECONCILIATION, + "failed") + .inc(); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index 24ecb462d0..1c153ed759 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -28,15 +28,15 @@ import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController; import org.apache.flink.kubernetes.utils.Constants; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import java.util.Collections; import java.util.List; @@ -92,11 +92,11 @@ public static InformerEventSource getDeploymentInfo .collect(Collectors.joining(",")); var configuration = - InformerConfiguration.from(Deployment.class, context) + InformerEventSourceConfiguration.from(Deployment.class, FlinkDeployment.class) .withLabelSelector(labelSelector) - .withSecondaryToPrimaryMapper(Mappers.fromLabel(Constants.LABEL_APP_KEY)) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withSecondaryToPrimaryMapper(fromLabel(Constants.LABEL_APP_KEY)) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); @@ -114,8 +114,8 @@ public static InformerEventSource getDeploymentInfo flinkDeployment.getMetadata().getName(), flinkDeployment.getMetadata().getNamespace()))); - InformerConfiguration configuration = - InformerConfiguration.from(FlinkSessionJob.class, context) + var configuration = + InformerEventSourceConfiguration.from(FlinkSessionJob.class, FlinkSessionJob.class) .withSecondaryToPrimaryMapper( sessionJob -> context @@ -132,8 +132,8 @@ public static InformerEventSource getDeploymentInfo .stream() .map(ResourceID::fromResource) .collect(Collectors.toSet())) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); @@ -150,8 +150,8 @@ public static InformerEventSource getDeploymentInfo sessionJob.getSpec().getDeploymentName(), sessionJob.getMetadata().getNamespace()))); - InformerConfiguration configuration = - InformerConfiguration.from(FlinkDeployment.class, context) + var configuration = + InformerEventSourceConfiguration.from(FlinkDeployment.class, FlinkSessionJob.class) .withSecondaryToPrimaryMapper( flinkDeployment -> context @@ -179,8 +179,8 @@ public static InformerEventSource getDeploymentInfo sessionJob .getMetadata() .getNamespace()))) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); return new InformerEventSource<>(configuration, context); } @@ -201,8 +201,9 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( savepoint.getMetadata().getNamespace())); }); - InformerConfiguration configurationFlinkSessionJob = - InformerConfiguration.from(FlinkSessionJob.class, context) + var configurationFlinkSessionJob = + InformerEventSourceConfiguration.from( + FlinkSessionJob.class, FlinkStateSnapshot.class) .withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context)) .withPrimaryToSecondaryMapper( (PrimaryToSecondaryMapper) @@ -218,14 +219,15 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( .getSnapshotJobReferenceResourceId( snapshot)); }) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); var flinkSessionJobEventSource = new InformerEventSource<>(configurationFlinkSessionJob, context); - InformerConfiguration configurationFlinkDeployment = - InformerConfiguration.from(FlinkDeployment.class, context) + var configurationFlinkDeployment = + InformerEventSourceConfiguration.from( + FlinkDeployment.class, FlinkStateSnapshot.class) .withSecondaryToPrimaryMapper(getSnapshotPrimaryMapper(context)) .withPrimaryToSecondaryMapper( (PrimaryToSecondaryMapper) @@ -258,8 +260,8 @@ public static EventSource[] getFlinkStateSnapshotInformerEventSources( .getSnapshotJobReferenceResourceId( snapshot)); }) - .withNamespacesInheritedFromController(context) - .followNamespaceChanges(true) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) .build(); var flinkDeploymentEventSource = new InformerEventSource<>(configurationFlinkDeployment, context); @@ -283,6 +285,26 @@ SecondaryToPrimaryMapper getSnapshotPrimaryMapper( .collect(Collectors.toSet()); } + public static SecondaryToPrimaryMapper fromLabel(String nameKey) { + return resource -> { + final var metadata = resource.getMetadata(); + if (metadata == null) { + return Collections.emptySet(); + } else { + final var map = metadata.getLabels(); + if (map == null) { + return Collections.emptySet(); + } + var name = map.get(nameKey); + if (name == null) { + return Collections.emptySet(); + } + var namespace = resource.getMetadata().getNamespace(); + return Set.of(new ResourceID(name, namespace)); + } + }; + } + private static String indexKey(String name, String namespace) { return name + "#" + namespace; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 09885b965a..5eca939457 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -43,10 +43,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -56,16 +54,14 @@ import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.function.BiConsumer; /** A wrapper around {@link FlinkDeploymentController} used by unit tests. */ public class TestingFlinkDeploymentController - implements Reconciler, - ErrorStatusHandler, - EventSourceInitializer, - Cleaner { + implements Reconciler, Cleaner { @Getter private ReconcilerFactory reconcilerFactory; private FlinkDeploymentController flinkDeploymentController; @@ -165,7 +161,7 @@ public ErrorStatusUpdateControl updateErrorStatus( } @Override - public Map prepareEventSources( + public List> prepareEventSources( EventSourceContext eventSourceContext) { throw new UnsupportedOperationException(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java index 2b200de1de..7ba5cd48dc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java @@ -42,13 +42,9 @@ import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; import lombok.Getter; import java.util.HashMap; @@ -59,8 +55,6 @@ /** A wrapper around {@link FlinkSessionJobController} used by unit tests. */ public class TestingFlinkSessionJobController implements io.javaoperatorsdk.operator.api.reconciler.Reconciler, - ErrorStatusHandler, - EventSourceInitializer, Cleaner { @Getter private CanaryResourceManager canaryResourceManager; @@ -154,12 +148,6 @@ public DeleteControl cleanup( return flinkSessionJobController.cleanup(cloned, context); } - @Override - public Map prepareEventSources( - EventSourceContext eventSourceContext) { - return null; - } - public Queue events() { return flinkResourceEventCollector.events; } diff --git a/pom.xml b/pom.xml index 463c0b457d..ebc648c619 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ under the License. 3.3.2 5.0.0 - 4.9.4 + 5.1.0-SNAPSHOT 1.1.1 6.13.2 @@ -163,6 +163,15 @@ under the License. + + + snapshots-repo + https://oss.sonatype.org/content/repositories/snapshots + false + true + + +