Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@ public ResourceLifecycleState getLifecycleState() {
return ResourceLifecycleState.FAILED;
}

// Check for unrecoverable deployments that should be marked as FAILED
if (this instanceof FlinkDeploymentStatus) {
FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this;
var jmDeployStatus = deploymentStatus.getJobManagerDeploymentStatus();

// ERROR/MISSING deployments are in terminal error state
// [Configmaps deleted -> require manual restore] and should always be FAILED
if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
|| jmDeployStatus == JobManagerDeploymentStatus.ERROR)
&& StringUtils.isNotEmpty(error)
&& (error.toLowerCase()
.contains(
"it is possible that the job has finished or terminally failed, or the configmaps have been deleted")
|| error.toLowerCase().contains("manual restore required")
|| error.toLowerCase().contains("ha metadata not available")
|| error.toLowerCase()
.contains(
"ha data is not available to make stateful upgrades"))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we checking this specific error?
In any case we are the ones triggering this error so please create a constant in the AbstractJobReconciler and use that here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora this seems to be the only case when we know that the cluster cannot recover on its own and needs a manual restore. hence used this. Will set this as a constant instead for a cleaner code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora

  1. There are multiple instances where HA metadata not available is written in different forms like HA metadata not available and HA data is not available. Should we maintain a uniformity in these by changing these exception messages using a constant (now that it is available).

  2. Also currently flink-operator-api does not have flink-operator as a dependency -> to use the constants in AbstractJobReconciler we would have to import it as a dependency as the status change logic resides in flink-operator-api.
    Should I still go ahead with this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible let's use a single constant, and we can keep that constant in the operator api module so the reconciler can use it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora
I have added 3 constants for error messages which are frequently used and would mean that they are terminal, and referenced those in the reconcilers to maintain uniformity. I have also tried to keep the net changes minimum (Although a few error messages would differ slightly). Do let me know if this looks good?

return ResourceLifecycleState.FAILED;
}
}

if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) {
return ResourceLifecycleState.ROLLED_BACK;
} else if (reconciliationStatus.isLastReconciledSpecStable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
Expand Down Expand Up @@ -337,4 +338,50 @@ private Map<ResourceLifecycleState, List<Histogram>> initTimeHistos() {
}
return histos;
}

@Test
public void testUnrecoverableDeploymentLifecycleState() {
var application = TestUtils.buildApplicationCluster();

// Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment =
// false)
ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration());
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();

application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
application
.getStatus()
.setError(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ "Manual restore required.");
assertEquals(
FAILED,
application.getStatus().getLifecycleState(),
"ERROR deployment with `configmaps have been deleted` error should always be FAILED (terminal error state)");

application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
application
.getStatus()
.setError(
"HA metadata not available to restore from last state. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ");
assertEquals(
FAILED,
application.getStatus().getLifecycleState(),
"MISSING deployment with error should be FAILED");

application.getStatus().setError(null);
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
// Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment
application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
application
.getStatus()
.getReconciliationStatus()
.setLastStableSpec(null); // Mark as not stable
assertEquals(
DEPLOYED,
application.getStatus().getLifecycleState(),
"MISSING deployment before stability should not be FAILED yet (still deploying)");
}
}