Skip to content

Commit f874b03

Browse files
luca-p-castelligyfora
authored andcommitted
[FLINK-37320] [Observer] FINISHED finite streaming jobs incorrectly being set to RECONCILING
1 parent 9c93f04 commit f874b03

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ protected void observeJmDeployment(FlinkResourceContext<FlinkDeployment> ctx) {
152152
}
153153

154154
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
155-
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
155+
156+
if (!ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
157+
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
158+
}
156159

157160
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
158161
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,4 +876,26 @@ public void validateLastReconciledClearedOnInitialFailure() {
876876
observer.observe(deployment, TestUtils.createEmptyContext());
877877
assertTrue(reconStatus.isBeforeFirstDeployment());
878878
}
879+
880+
@Test
881+
public void jobStatusNotOverwrittenWhenTerminal() throws Exception {
882+
Configuration conf =
883+
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
884+
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
885+
bringToReadyStatus(deployment);
886+
887+
deployment
888+
.getStatus()
889+
.getJobStatus()
890+
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
891+
892+
// Simulate missing deployment
893+
var emptyContext = TestUtils.createEmptyContext();
894+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
895+
observer.observe(deployment, emptyContext);
896+
897+
assertEquals(
898+
org.apache.flink.api.common.JobStatus.FINISHED,
899+
deployment.getStatus().getJobStatus().getState());
900+
}
879901
}

0 commit comments

Comments
 (0)