Skip to content

Commit 188a27d

Browse files
author
nishita-pattanayak
committed
[FLINK-32033][Kubernetes-Operator] Fix Lifecycle status in case of MISSING/ERROR JM status
1 parent 5b3856b commit 188a27d

File tree

2 files changed

+99
-1
lines changed

2 files changed

+99
-1
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,30 @@ public ResourceLifecycleState getLifecycleState() {
9090
return ResourceLifecycleState.FAILED;
9191
}
9292

93+
// Check for unrecoverable deployments that should be marked as FAILED
94+
if (this instanceof FlinkDeploymentStatus) {
95+
FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this;
96+
var jmStatus = deploymentStatus.getJobManagerDeploymentStatus();
97+
98+
// ERROR deployments are in terminal error state and should always be FAILED
99+
if (jmStatus == JobManagerDeploymentStatus.ERROR) {
100+
return ResourceLifecycleState.FAILED;
101+
}
102+
103+
// MISSING deployments should be FAILED if they're clearly unrecoverable
104+
if (jmStatus == JobManagerDeploymentStatus.MISSING) {
105+
// Mark as FAILED if error message clearly indicates deployment failure (any time)
106+
if (StringUtils.isNotEmpty(error)) {
107+
return ResourceLifecycleState.FAILED;
108+
}
109+
// Also mark as FAILED if stable deployment is missing without any error
110+
// (indicating it was deleted externally)
111+
else if (reconciliationStatus.isLastReconciledSpecStable()) {
112+
return ResourceLifecycleState.FAILED;
113+
}
114+
}
115+
}
116+
93117
if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
94118
return ResourceLifecycleState.ROLLED_BACK;
95119
} else if (reconciliationStatus.isLastReconciledSpecStable()) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
2727
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2828
import org.apache.flink.kubernetes.operator.api.spec.JobState;
29+
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
2930
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
3031
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3132
import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
@@ -71,8 +72,28 @@ public void lifecycleStateTest() {
7172
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
7273
assertEquals(DEPLOYED, application.getStatus().getLifecycleState());
7374

75+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
7476
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
75-
assertEquals(STABLE, application.getStatus().getLifecycleState());
77+
assertEquals(
78+
STABLE,
79+
application.getStatus().getLifecycleState(),
80+
"JobManager Deployment is in DEPLOYING state, hence application is STABLE");
81+
82+
application
83+
.getStatus()
84+
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
85+
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
86+
assertEquals(
87+
STABLE,
88+
application.getStatus().getLifecycleState(),
89+
"JobManager Deployment is in DEPLOYED_NOT_READY state, hence application is STABLE");
90+
91+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
92+
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
93+
assertEquals(
94+
STABLE,
95+
application.getStatus().getLifecycleState(),
96+
"JobManager Deployment is in READY state, hence application is STABLE");
7697

7798
application.getStatus().setError("errr");
7899
assertEquals(STABLE, application.getStatus().getLifecycleState());
@@ -337,4 +358,57 @@ private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
337358
}
338359
return histos;
339360
}
361+
362+
@Test
363+
public void testUnrecoverableDeploymentLifecycleState() {
364+
var application = TestUtils.buildApplicationCluster();
365+
366+
// Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment =
367+
// false)
368+
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
369+
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
370+
371+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
372+
application.getStatus().setError(null);
373+
assertEquals(
374+
FAILED,
375+
application.getStatus().getLifecycleState(),
376+
"ERROR deployment should always be FAILED (terminal error state)");
377+
378+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
379+
application.getStatus().setError("JobManager deployment failed to start");
380+
assertEquals(
381+
FAILED,
382+
application.getStatus().getLifecycleState(),
383+
"ERROR deployment with error message should also be FAILED");
384+
385+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
386+
application
387+
.getStatus()
388+
.setError("JobManager deployment was deleted and cannot be recovered");
389+
assertEquals(
390+
FAILED,
391+
application.getStatus().getLifecycleState(),
392+
"MISSING deployment with error should be FAILED");
393+
394+
application.getStatus().setError(null);
395+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
396+
assertEquals(
397+
FAILED,
398+
application.getStatus().getLifecycleState(),
399+
"MISSING deployment with stable reconciliation should be FAILED");
400+
401+
application.getStatus().setError(null);
402+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
403+
// Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment
404+
application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
405+
application
406+
.getStatus()
407+
.getReconciliationStatus()
408+
.setLastStableSpec(null); // Mark as not stable
409+
assertEquals(
410+
DEPLOYED,
411+
application.getStatus().getLifecycleState(),
412+
"MISSING deployment before stability should not be FAILED yet (still deploying)");
413+
}
340414
}

0 commit comments

Comments
 (0)