Skip to content

Commit 02afd1d

Browse files
committed
[FLINK-37869][Observer] Fix finished bounded stream jobs can't be cleanup by the Flink Kubernetes Operator
1 parent d5f4753 commit 02afd1d

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
560560
&& e.getMessage().contains("Checkpointing has not been enabled")) {
561561
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
562562
return Optional.empty();
563+
} else if (e instanceof ExecutionException
564+
&& e.getMessage() != null
565+
&& e.getMessage()
566+
.contains(String.format("Job %s not found", jobId.toString()))) {
567+
LOG.warn("Job {} not found", jobId, e);
568+
return Optional.empty();
563569
}
564570
throw new ReconciliationException("Could not observe latest savepoint information", e);
565571
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public class TestingFlinkService extends AbstractFlinkService {
148148
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
149149
private final Map<Long, String> checkpointStats = new HashMap<>();
150150
@Setter private boolean throwCheckpointingDisabledError = false;
151+
@Setter private boolean throwJobNotFoundError = false;
151152
@Setter private Throwable jobFailedErr;
152153

153154
@Getter private int desiredReplicas = 0;
@@ -646,6 +647,13 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
646647
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
647648
}
648649

650+
if (throwJobNotFoundError) {
651+
throw new ExecutionException(
652+
new RestClientException(
653+
String.format("Job %s not found", jobId.toString()),
654+
HttpResponseStatus.NOT_FOUND));
655+
}
656+
649657
if (checkpointInfo != null) {
650658
return checkpointInfo;
651659
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,4 +944,35 @@ public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Except
944944
deployment.getMetadata().getNamespace(),
945945
"Checkpointing has not been enabled"));
946946
}
947+
948+
@Test
949+
public void getLastCheckpointShouldHandleJobNotFound() throws Exception {
950+
Configuration conf =
951+
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
952+
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
953+
bringToReadyStatus(deployment);
954+
955+
deployment
956+
.getStatus()
957+
.getJobStatus()
958+
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
959+
var jobs = flinkService.listJobs();
960+
var oldStatus = jobs.get(0).f1;
961+
jobs.get(0).f1 =
962+
new JobStatusMessage(
963+
oldStatus.getJobId(),
964+
oldStatus.getJobName(),
965+
org.apache.flink.api.common.JobStatus.FINISHED,
966+
oldStatus.getStartTime());
967+
968+
flinkService.setThrowJobNotFoundError(true);
969+
observer.observe(deployment, readyContext);
970+
971+
assertEquals(
972+
0,
973+
countErrorEvents(
974+
EventRecorder.Reason.CheckpointError.name(),
975+
deployment.getMetadata().getNamespace(),
976+
String.format("Job {} not found", jobs.get(0).f1.getJobId())));
977+
}
947978
}

0 commit comments

Comments
 (0)