Skip to content

Commit d5f4753

Browse files
authored
[FLINK-32033] Fix Lifecycle Status of FlinkDeployment Resource in case of MISSING/ERROR JM status
1 parent 45192f5 commit d5f4753

File tree

8 files changed

+108
-14
lines changed

8 files changed

+108
-14
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@
3737
@SuperBuilder
3838
public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
3939

40+
// Frequent error message constants for resource failure reporting
41+
public static final String MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED =
42+
"It is possible that the job has finished or terminally failed, or the configmaps have been deleted.";
43+
public static final String MSG_HA_METADATA_NOT_AVAILABLE = "HA metadata is not available";
44+
public static final String MSG_MANUAL_RESTORE_REQUIRED = "Manual restore required.";
45+
4046
/** Last observed status of the Flink job on Application/Session cluster. */
4147
private JobStatus jobStatus = new JobStatus();
4248

@@ -90,6 +96,23 @@ public ResourceLifecycleState getLifecycleState() {
9096
return ResourceLifecycleState.FAILED;
9197
}
9298

99+
// Check for unrecoverable deployments that should be marked as FAILED if the error contains
100+
// the following substrings
101+
if (this instanceof FlinkDeploymentStatus) {
102+
FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this;
103+
var jmDeployStatus = deploymentStatus.getJobManagerDeploymentStatus();
104+
105+
// ERROR/MISSING deployments are in terminal error state and should always be FAILED
106+
if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
107+
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
108+
&& error != null
109+
&& (error.contains(MSG_MANUAL_RESTORE_REQUIRED)
110+
|| error.contains(MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED)
111+
|| error.contains(MSG_HA_METADATA_NOT_AVAILABLE))) {
112+
return ResourceLifecycleState.FAILED;
113+
}
114+
}
115+
93116
if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
94117
return ResourceLifecycleState.ROLLED_BACK;
95118
} else if (reconciliationStatus.isLastReconciledSpecStable()) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import java.util.Optional;
6060
import java.util.function.Predicate;
6161

62+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
63+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
6264
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
6365
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE;
6466
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
@@ -227,7 +229,8 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d
227229

228230
if (!SnapshotUtils.lastSavepointKnown(status)) {
229231
throw new UpgradeFailureException(
230-
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.",
232+
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. "
233+
+ MSG_MANUAL_RESTORE_REQUIRED,
231234
"UpgradeFailed");
232235
}
233236
LOG.info("Job is in terminal state, ready for upgrade from observed latest state");
@@ -360,7 +363,8 @@ private boolean allowLastStateCancel(FlinkResourceContext<CR> ctx) {
360363

361364
var conf = ctx.getObserveConfig();
362365
if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
363-
LOG.info("HA metadata not available, cancel will be used instead of last-state");
366+
LOG.info(
367+
"{}, cancel will be used instead of last-state", MSG_HA_METADATA_NOT_AVAILABLE);
364368
return true;
365369
}
366370
return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@
5959
import java.util.Optional;
6060
import java.util.UUID;
6161

62+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
63+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
64+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
6265
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
6366

6467
/** Reconciler Flink Application deployments. */
@@ -114,9 +117,11 @@ protected JobUpgrade getJobUpgrade(
114117
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
115118
&& !flinkService.isHaMetadataAvailable(deployConfig)) {
116119
throw new UpgradeFailureException(
117-
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
118-
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
119-
+ "Manual restore required.",
120+
"JobManager deployment is missing and "
121+
+ MSG_HA_METADATA_NOT_AVAILABLE
122+
+ " to make stateful upgrades. "
123+
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
124+
+ MSG_MANUAL_RESTORE_REQUIRED,
120125
"UpgradeFailed");
121126
}
122127

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@
155155
import java.util.jar.Manifest;
156156
import java.util.stream.Collectors;
157157

158+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
159+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
160+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
158161
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
159162
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
160163
import static org.apache.flink.util.ExceptionUtils.findThrowable;
@@ -567,7 +570,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
567570
.getExternalPointer()
568571
.equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) {
569572
throw new UpgradeFailureException(
570-
"Latest checkpoint not externally addressable, manual recovery required.",
573+
"Latest checkpoint not externally addressable, " + MSG_MANUAL_RESTORE_REQUIRED,
571574
"CheckpointNotFound");
572575
}
573576
return latestCheckpointOpt.map(
@@ -1022,8 +1025,9 @@ protected static Configuration removeOperatorConfigs(Configuration config) {
10221025
private void validateHaMetadataExists(Configuration conf) {
10231026
if (!isHaMetadataAvailable(conf)) {
10241027
throw new UpgradeFailureException(
1025-
"HA metadata not available to restore from last state. "
1026-
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ",
1028+
MSG_HA_METADATA_NOT_AVAILABLE
1029+
+ " to restore from last state."
1030+
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED,
10271031
"RestoreFailed");
10281032
}
10291033
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@
105105
import java.util.function.Consumer;
106106
import java.util.stream.Collectors;
107107

108+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
109+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
110+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
111+
108112
/** Flink service mock for tests. */
109113
public class TestingFlinkService extends AbstractFlinkService {
110114

@@ -244,9 +248,10 @@ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) thr
244248
protected void validateHaMetadataExists(Configuration conf) {
245249
if (!isHaMetadataAvailable(conf)) {
246250
throw new UpgradeFailureException(
247-
"HA metadata not available to restore from last state. "
248-
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
249-
+ "Manual restore required.",
251+
MSG_HA_METADATA_NOT_AVAILABLE
252+
+ " to restore from last state. "
253+
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
254+
+ MSG_MANUAL_RESTORE_REQUIRED,
250255
"RestoreFailed");
251256
}
252257
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void verifyApplicationJmRecovery(FlinkVersion flinkVersion, UpgradeMode u
121121
.getStatus()
122122
.getError()
123123
.contains(
124-
"JobManager deployment is missing and HA data is not available to make stateful upgrades."));
124+
"JobManager deployment is missing and HA metadata is not available"));
125125
} else {
126126
flinkService.setPortReady(true);
127127
testController.reconcile(appCluster, context);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,7 @@ public void testInitialHaError() throws Exception {
10541054
.flinkResourceEvents()
10551055
.poll()
10561056
.getMessage()
1057-
.contains("HA metadata not available to restore from last state."));
1057+
.contains("HA metadata is not available to restore from last state."));
10581058

10591059
testController.flinkResourceEvents().clear();
10601060
testController.reconcile(appCluster, context);
@@ -1068,7 +1068,7 @@ public void testInitialHaError() throws Exception {
10681068
.flinkResourceEvents()
10691069
.poll()
10701070
.getMessage()
1071-
.contains("HA metadata not available to restore from last state."));
1071+
.contains("HA metadata is not available to restore from last state."));
10721072

10731073
flinkService.setHaDataAvailable(true);
10741074
testController.flinkResourceEvents().clear();

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
2727
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2828
import org.apache.flink.kubernetes.operator.api.spec.JobState;
29+
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
2930
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
3031
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
3132
import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
@@ -52,6 +53,9 @@
5253
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE;
5354
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED;
5455
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING;
56+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
57+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
58+
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
5559
import static org.junit.jupiter.api.Assertions.assertEquals;
5660
import static org.junit.jupiter.api.Assertions.assertNotEquals;
5761
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -337,4 +341,53 @@ private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
337341
}
338342
return histos;
339343
}
344+
345+
@Test
346+
public void testUnrecoverableDeploymentLifecycleState() {
347+
var application = TestUtils.buildApplicationCluster();
348+
349+
// Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment =
350+
// false)
351+
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
352+
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
353+
354+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
355+
application
356+
.getStatus()
357+
.setError(
358+
"\"JobManager deployment is missing and "
359+
+ MSG_HA_METADATA_NOT_AVAILABLE
360+
+ " to make stateful upgrades. "
361+
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
362+
+ MSG_MANUAL_RESTORE_REQUIRED);
363+
assertEquals(
364+
FAILED,
365+
application.getStatus().getLifecycleState(),
366+
"ERROR deployment with `configmaps have been deleted` error should always be FAILED (terminal error state)");
367+
368+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
369+
application
370+
.getStatus()
371+
.setError(
372+
MSG_HA_METADATA_NOT_AVAILABLE
373+
+ " to restore from last state. "
374+
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED);
375+
assertEquals(
376+
FAILED,
377+
application.getStatus().getLifecycleState(),
378+
"MISSING deployment with error should be FAILED");
379+
380+
application.getStatus().setError(null);
381+
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
382+
// Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment
383+
application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
384+
application
385+
.getStatus()
386+
.getReconciliationStatus()
387+
.setLastStableSpec(null); // Mark as not stable
388+
assertEquals(
389+
DEPLOYED,
390+
application.getStatus().getLifecycleState(),
391+
"MISSING deployment before stability should not be FAILED yet (still deploying)");
392+
}
340393
}

0 commit comments

Comments
 (0)