diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index 38d4b831c6..713e132ad2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -152,7 +152,10 @@ protected void observeJmDeployment(FlinkResourceContext ctx) { } deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); - deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING); + + if (!ReconciliationUtils.isJobInTerminalState(deploymentStatus)) { + deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING); + } if (previousJmStatus != JobManagerDeploymentStatus.MISSING && previousJmStatus != JobManagerDeploymentStatus.ERROR) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index b9adee6242..83e7fad5fe 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -876,4 +876,26 @@ public void validateLastReconciledClearedOnInitialFailure() { observer.observe(deployment, TestUtils.createEmptyContext()); assertTrue(reconStatus.isBeforeFirstDeployment()); } + + @Test + public void jobStatusNotOverwrittenWhenTerminal() throws Exception { + Configuration conf = + configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); + flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false); + bringToReadyStatus(deployment); + + deployment + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.FINISHED); + + // Simulate missing deployment + var emptyContext = TestUtils.createEmptyContext(); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + observer.observe(deployment, emptyContext); + + assertEquals( + org.apache.flink.api.common.JobStatus.FINISHED, + deployment.getStatus().getJobStatus().getState()); + } }