Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
@SuperBuilder
public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {

// Frequent error message constants for resource failure reporting
public static final String MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED =
"It is possible that the job has finished or terminally failed, or the configmaps have been deleted.";
public static final String MSG_HA_METADATA_NOT_AVAILABLE = "HA metadata is not available";
public static final String MSG_MANUAL_RESTORE_REQUIRED = "Manual restore required.";

/** Last observed status of the Flink job on Application/Session cluster. */
private JobStatus jobStatus = new JobStatus();

Expand Down Expand Up @@ -90,6 +96,23 @@ public ResourceLifecycleState getLifecycleState() {
return ResourceLifecycleState.FAILED;
}

// Check for unrecoverable deployments that should be marked as FAILED if the error contains
// the following substrings
if (this instanceof FlinkDeploymentStatus) {
FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this;
var jmDeployStatus = deploymentStatus.getJobManagerDeploymentStatus();

// ERROR/MISSING deployments are in terminal error state and should always be FAILED
if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
&& error != null
&& (error.contains(MSG_MANUAL_RESTORE_REQUIRED)
|| error.contains(MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED)
|| error.contains(MSG_HA_METADATA_NOT_AVAILABLE))) {
return ResourceLifecycleState.FAILED;
}
}

if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
return ResourceLifecycleState.ROLLED_BACK;
} else if (reconciliationStatus.isLastReconciledSpecStable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.Optional;
import java.util.function.Predicate;

import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE;
import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
Expand Down Expand Up @@ -227,7 +229,8 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext<CR> ctx, Configuration d

if (!SnapshotUtils.lastSavepointKnown(status)) {
throw new UpgradeFailureException(
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.",
"Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. "
+ MSG_MANUAL_RESTORE_REQUIRED,
"UpgradeFailed");
}
LOG.info("Job is in terminal state, ready for upgrade from observed latest state");
Expand Down Expand Up @@ -360,7 +363,8 @@ private boolean allowLastStateCancel(FlinkResourceContext<CR> ctx) {

var conf = ctx.getObserveConfig();
if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
LOG.info("HA metadata not available, cancel will be used instead of last-state");
LOG.info(
"{}, cancel will be used instead of last-state", MSG_HA_METADATA_NOT_AVAILABLE);
return true;
}
return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;

/** Reconciler Flink Application deployments. */
Expand Down Expand Up @@ -114,9 +117,11 @@ protected JobUpgrade getJobUpgrade(
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
&& !flinkService.isHaMetadataAvailable(deployConfig)) {
throw new UpgradeFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
"JobManager deployment is missing and "
+ MSG_HA_METADATA_NOT_AVAILABLE
+ " to make stateful upgrades. "
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+ MSG_MANUAL_RESTORE_REQUIRED,
"UpgradeFailed");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@
import java.util.jar.Manifest;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
Expand Down Expand Up @@ -567,7 +570,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
.getExternalPointer()
.equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) {
throw new UpgradeFailureException(
"Latest checkpoint not externally addressable, manual recovery required.",
"Latest checkpoint not externally addressable, " + MSG_MANUAL_RESTORE_REQUIRED,
"CheckpointNotFound");
}
return latestCheckpointOpt.map(
Expand Down Expand Up @@ -1022,8 +1025,9 @@ protected static Configuration removeOperatorConfigs(Configuration config) {
private void validateHaMetadataExists(Configuration conf) {
if (!isHaMetadataAvailable(conf)) {
throw new UpgradeFailureException(
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ",
MSG_HA_METADATA_NOT_AVAILABLE
+ " to restore from last state."
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED,
"RestoreFailed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;

/** Flink service mock for tests. */
public class TestingFlinkService extends AbstractFlinkService {

Expand Down Expand Up @@ -244,9 +248,10 @@ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) thr
protected void validateHaMetadataExists(Configuration conf) {
if (!isHaMetadataAvailable(conf)) {
throw new UpgradeFailureException(
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.",
MSG_HA_METADATA_NOT_AVAILABLE
+ " to restore from last state. "
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+ MSG_MANUAL_RESTORE_REQUIRED,
"RestoreFailed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void verifyApplicationJmRecovery(FlinkVersion flinkVersion, UpgradeMode u
.getStatus()
.getError()
.contains(
"JobManager deployment is missing and HA data is not available to make stateful upgrades."));
"JobManager deployment is missing and HA metadata is not available"));
} else {
flinkService.setPortReady(true);
testController.reconcile(appCluster, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ public void testInitialHaError() throws Exception {
.flinkResourceEvents()
.poll()
.getMessage()
.contains("HA metadata not available to restore from last state."));
.contains("HA metadata is not available to restore from last state."));

testController.flinkResourceEvents().clear();
testController.reconcile(appCluster, context);
Expand All @@ -1068,7 +1068,7 @@ public void testInitialHaError() throws Exception {
.flinkResourceEvents()
.poll()
.getMessage()
.contains("HA metadata not available to restore from last state."));
.contains("HA metadata is not available to restore from last state."));

flinkService.setHaDataAvailable(true);
testController.flinkResourceEvents().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
Expand All @@ -52,6 +53,9 @@
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE;
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED;
import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -337,4 +341,53 @@ private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
}
return histos;
}

@Test
public void testUnrecoverableDeploymentLifecycleState() {
var application = TestUtils.buildApplicationCluster();

// Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment =
// false)
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();

application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
application
.getStatus()
.setError(
"\"JobManager deployment is missing and "
+ MSG_HA_METADATA_NOT_AVAILABLE
+ " to make stateful upgrades. "
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+ MSG_MANUAL_RESTORE_REQUIRED);
assertEquals(
FAILED,
application.getStatus().getLifecycleState(),
"ERROR deployment with `configmaps have been deleted` error should always be FAILED (terminal error state)");

application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
application
.getStatus()
.setError(
MSG_HA_METADATA_NOT_AVAILABLE
+ " to restore from last state. "
+ MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED);
assertEquals(
FAILED,
application.getStatus().getLifecycleState(),
"MISSING deployment with error should be FAILED");

application.getStatus().setError(null);
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
// Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment
application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
application
.getStatus()
.getReconciliationStatus()
.setLastStableSpec(null); // Mark as not stable
assertEquals(
DEPLOYED,
application.getStatus().getLifecycleState(),
"MISSING deployment before stability should not be FAILED yet (still deploying)");
}
}