diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java index 4255b09a1b..8f079415a2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java @@ -441,7 +441,6 @@ private long getMaxCountForSnapshotType( } private void observeLatestCheckpoint(FlinkResourceContext ctx, String jobId) { - var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 4d7634ea4c..42e85a1ac2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -546,6 +546,12 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { try { latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0; } catch (Exception e) { + if (e instanceof RestClientException + && e.getMessage() != null + && e.getMessage().contains("Checkpointing has not been enabled")) { + LOG.warn("Checkpointing not enabled for job {}", jobId, e); + return Optional.empty(); + } throw new ReconciliationException("Could not observe latest savepoint information", e); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 42e3ca243a..fec1dfa64e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -138,6 +138,7 @@ public class TestingFlinkService extends AbstractFlinkService { @Getter private final Map savepointTriggers = new HashMap<>(); @Getter private final Map checkpointTriggers = new HashMap<>(); private final Map checkpointStats = new HashMap<>(); + @Setter private boolean throwCheckpointingDisabledError = false; @Getter private int desiredReplicas = 0; @Getter private int cancelJobCallCount = 0; @@ -593,6 +594,10 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { Optional, Optional> getCheckpointInfo(JobID jobId, Configuration conf) throws Exception { + if (throwCheckpointingDisabledError) { + throw new RestClientException( + "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST); + } if (checkpointInfo != null) { return checkpointInfo; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 83e7fad5fe..aa4898d827 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -898,4 +898,35 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception { org.apache.flink.api.common.JobStatus.FINISHED, deployment.getStatus().getJobStatus().getState()); } + + @Test + public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Exception { + Configuration conf = + configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); + flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false); + bringToReadyStatus(deployment); + + deployment + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.FINISHED); + var jobs = flinkService.listJobs(); + var oldStatus = jobs.get(0).f1; + jobs.get(0).f1 = + new JobStatusMessage( + oldStatus.getJobId(), + oldStatus.getJobName(), + org.apache.flink.api.common.JobStatus.FINISHED, + oldStatus.getStartTime()); + + flinkService.setThrowCheckpointingDisabledError(true); + observer.observe(deployment, readyContext); + + assertEquals( + 0, + countErrorEvents( + EventRecorder.Reason.CheckpointError.name(), + deployment.getMetadata().getNamespace(), + "Checkpointing has not been enabled")); + } }