From ec78f1ae9be574e9094cdf84c439ff35891c1cdb Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Thu, 20 Feb 2025 09:12:19 -0500 Subject: [PATCH] [FLINK-37320] [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING --- .../AbstractFlinkDeploymentObserver.java | 5 ++++- .../deployment/ApplicationObserverTest.java | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index 38d4b831c6..713e132ad2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -152,7 +152,10 @@ protected void observeJmDeployment(FlinkResourceContext ctx) { } deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); - deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING); + + if (!ReconciliationUtils.isJobInTerminalState(deploymentStatus)) { + deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING); + } if (previousJmStatus != JobManagerDeploymentStatus.MISSING && previousJmStatus != JobManagerDeploymentStatus.ERROR) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index b9adee6242..83e7fad5fe 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -876,4 +876,26 @@ public void validateLastReconciledClearedOnInitialFailure() { observer.observe(deployment, TestUtils.createEmptyContext()); assertTrue(reconStatus.isBeforeFirstDeployment()); } + + @Test + public void jobStatusNotOverwrittenWhenTerminal() throws Exception { + Configuration conf = + configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); + flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false); + bringToReadyStatus(deployment); + + deployment + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.FINISHED); + + // Simulate missing deployment + var emptyContext = TestUtils.createEmptyContext(); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + observer.observe(deployment, emptyContext); + + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + deployment.getStatus().getJobStatus().getState()); + } }