From 02afd1d5dfff58a6e3e898e59594706be2f332b8 Mon Sep 17 00:00:00 2001 From: "eason.qin" Date: Thu, 31 Jul 2025 15:24:55 +0800 Subject: [PATCH] [FLINK-37869][Observer] Fix finished bounded stream jobs can't be cleanup by the Flink Kubernetes Operator --- .../service/AbstractFlinkService.java | 6 ++++ .../operator/TestingFlinkService.java | 8 +++++ .../deployment/ApplicationObserverTest.java | 31 +++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 820fbced76..b4c25b4469 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -560,6 +560,12 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { && e.getMessage().contains("Checkpointing has not been enabled")) { LOG.warn("Checkpointing not enabled for job {}", jobId, e); return Optional.empty(); + } else if (e instanceof ExecutionException + && e.getMessage() != null + && e.getMessage() + .contains(String.format("Job %s not found", jobId.toString()))) { + LOG.warn("Job {} not found", jobId, e); + return Optional.empty(); } throw new ReconciliationException("Could not observe latest savepoint information", e); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index c020c53d98..b3b901820f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -148,6 +148,7 @@ public class TestingFlinkService extends AbstractFlinkService { @Getter private final Map checkpointTriggers = new HashMap<>(); private final Map checkpointStats = new HashMap<>(); @Setter private boolean throwCheckpointingDisabledError = false; + @Setter private boolean throwJobNotFoundError = false; @Setter private Throwable jobFailedErr; @Getter private int desiredReplicas = 0; @@ -646,6 +647,13 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST)); } + if (throwJobNotFoundError) { + throw new ExecutionException( + new RestClientException( + String.format("Job %s not found", jobId.toString()), + HttpResponseStatus.NOT_FOUND)); + } + if (checkpointInfo != null) { return checkpointInfo; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 68233d397e..4fc9e6ed01 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -944,4 +944,35 @@ public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Except deployment.getMetadata().getNamespace(), "Checkpointing has not been enabled")); } + + @Test + public void getLastCheckpointShouldHandleJobNotFound() throws Exception { + Configuration conf = + configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); + flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false); + bringToReadyStatus(deployment); + + deployment + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.FINISHED); + var jobs = flinkService.listJobs(); + var oldStatus = jobs.get(0).f1; + jobs.get(0).f1 = + new JobStatusMessage( + oldStatus.getJobId(), + oldStatus.getJobName(), + org.apache.flink.api.common.JobStatus.FINISHED, + oldStatus.getStartTime()); + + flinkService.setThrowJobNotFoundError(true); + observer.observe(deployment, readyContext); + + assertEquals( + 0, + countErrorEvents( + EventRecorder.Reason.CheckpointError.name(), + deployment.getMetadata().getNamespace(), + String.format("Job {} not found", jobs.get(0).f1.getJobId()))); + } }