Skip to content

Commit be3b79b

Browse files
committed
[FLINK-35108] Do not trigger deployment recovery for finished/failed jobs
1 parent a3ef048 commit be3b79b

File tree

2 files changed

+37
-11
lines changed

2 files changed

+37
-11
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.reconciler.deployment;
1919

2020
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.api.common.JobStatus;
2122
import org.apache.flink.autoscaler.JobAutoScaler;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -505,9 +506,16 @@ protected boolean shouldRecoverDeployment(Configuration conf, FlinkDeployment de
505506

506507
private boolean jmMissingForRunningDeployment(FlinkDeployment deployment) {
507508
var deployedJob = ReconciliationUtils.getDeployedSpec(deployment).getJob();
508-
return (deployedJob == null || deployedJob.getState() == JobState.RUNNING)
509-
&& (deployment.getStatus().getJobManagerDeploymentStatus()
510-
== JobManagerDeploymentStatus.MISSING);
509+
var status = deployment.getStatus();
510+
var jobStatus = status.getJobStatus();
511+
boolean sessionCluster = deployedJob == null;
512+
boolean nonTerminalApplication =
513+
!sessionCluster
514+
&& deployedJob.getState() == JobState.RUNNING
515+
&& !JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
516+
boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
517+
return jmShouldBeRunning
518+
&& (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING);
511519
}
512520

513521
protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.jetbrains.annotations.NotNull;
8383
import org.junit.jupiter.api.Assertions;
8484
import org.junit.jupiter.api.Test;
85+
import org.junit.jupiter.api.function.ThrowingConsumer;
8586
import org.junit.jupiter.params.ParameterizedTest;
8687
import org.junit.jupiter.params.provider.EnumSource;
8788
import org.junit.jupiter.params.provider.MethodSource;
@@ -941,18 +942,32 @@ public void testSetOwnerReference() throws Exception {
941942
}
942943

943944
@Test
944-
public void testTerminalJmTtl() throws Exception {
945-
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
945+
public void testTerminalJmTtlOnSuspend() throws Throwable {
946+
testTerminalJmTtl(
947+
dep -> {
948+
getJobSpec(dep).setState(JobState.SUSPENDED);
949+
reconciler.reconcile(dep, context);
950+
});
951+
}
952+
953+
@Test
954+
public void testTerminalJmTtlOnFinished() throws Throwable {
955+
testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FINISHED"));
956+
}
957+
958+
@Test
959+
public void testTerminalJmTtlOnFailed() throws Throwable {
960+
testTerminalJmTtl(dep -> dep.getStatus().getJobStatus().setState("FAILED"));
961+
}
962+
963+
public void testTerminalJmTtl(ThrowingConsumer<FlinkDeployment> deploymentSetup)
964+
throws Throwable {
965+
var deployment = TestUtils.buildApplicationCluster();
946966
getJobSpec(deployment).setUpgradeMode(UpgradeMode.SAVEPOINT);
947967
reconciler.reconcile(deployment, context);
948968
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
949-
950-
getJobSpec(deployment).setState(JobState.SUSPENDED);
951-
reconciler.reconcile(deployment, context);
969+
deploymentSetup.accept(deployment);
952970
var status = deployment.getStatus();
953-
assertEquals(
954-
org.apache.flink.api.common.JobStatus.FINISHED.toString(),
955-
status.getJobStatus().getState());
956971
assertEquals(JobManagerDeploymentStatus.READY, status.getJobManagerDeploymentStatus());
957972

958973
deployment
@@ -976,6 +991,9 @@ public void testTerminalJmTtl() throws Exception {
976991
.setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)), ZoneId.systemDefault()));
977992
reconciler.reconcile(deployment, context);
978993
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
994+
// Make sure we don't resubmit
995+
reconciler.reconcile(deployment, context);
996+
assertEquals(JobManagerDeploymentStatus.MISSING, status.getJobManagerDeploymentStatus());
979997
}
980998

981999
@ParameterizedTest

0 commit comments

Comments
 (0)