Skip to content

Commit 0abaa1f

Browse files
Move exception handling to getLastCheckpoint in the service
1 parent 71f53d8 commit 0abaa1f

File tree

4 files changed

+23
-36
lines changed

4 files changed

+23
-36
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@
3737
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3838
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
3939
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
40-
import org.apache.flink.runtime.rest.util.RestClientException;
4140
import org.apache.flink.util.CollectionUtil;
42-
import org.apache.flink.util.ExceptionUtils;
4341

4442
import org.slf4j.Logger;
4543
import org.slf4j.LoggerFactory;
@@ -446,30 +444,17 @@ private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId)
446444
var status = ctx.getResource().getStatus();
447445
var jobStatus = status.getJobStatus();
448446

449-
try {
450-
ctx.getFlinkService()
451-
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
452-
.ifPresentOrElse(
453-
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
454-
() -> {
455-
if (ReconciliationUtils.isJobCancelled(status)) {
456-
// For cancelled jobs the observed savepoint is always definite,
457-
// so if empty we know the job doesn't have any
458-
// checkpoints/savepoints
459-
jobStatus.setUpgradeSavepointPath(null);
460-
}
461-
});
462-
} catch (Exception e) {
463-
if (ExceptionUtils.findThrowable(e, RestClientException.class)
464-
.map(ex -> ex.getMessage().contains("Checkpointing has not been enabled"))
465-
.orElse(false)) {
466-
LOG.warn(
467-
"Checkpointing not enabled for job {}, skipping checkpoint observation",
468-
jobId,
469-
e);
470-
return;
471-
}
472-
throw e;
473-
}
447+
ctx.getFlinkService()
448+
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
449+
.ifPresentOrElse(
450+
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
451+
() -> {
452+
if (ReconciliationUtils.isJobCancelled(status)) {
453+
// For cancelled jobs the observed savepoint is always definite,
454+
// so if empty we know the job doesn't have any
455+
// checkpoints/savepoints
456+
jobStatus.setUpgradeSavepointPath(null);
457+
}
458+
});
474459
}
475460
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546546
try {
547547
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548548
} catch (Exception e) {
549+
if (e instanceof RestClientException
550+
&& e.getMessage() != null
551+
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552+
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
553+
return Optional.empty();
554+
}
549555
throw new ReconciliationException("Could not observe latest savepoint information", e);
550556
}
551557

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
4444
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
4545
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
46-
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
4746
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
4847
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
4948
import org.apache.flink.kubernetes.operator.observer.CheckpointStatsResult;
@@ -576,13 +575,6 @@ public void disposeSavepoint(String savepointPath, Configuration conf) throws Ex
576575

577576
@Override
578577
public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
579-
if (throwCheckpointingDisabledError) {
580-
throw new ReconciliationException(
581-
"Could not observe latest savepoint information",
582-
new RestClientException(
583-
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
584-
}
585-
586578
jobs.stream()
587579
.filter(js -> js.f1.getJobId().equals(jobId))
588580
.findAny()
@@ -602,6 +594,10 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
602594
Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
603595
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
604596
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597+
if (throwCheckpointingDisabledError) {
598+
throw new RestClientException(
599+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
600+
}
605601

606602
if (checkpointInfo != null) {
607603
return checkpointInfo;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,7 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception {
900900
}
901901

902902
@Test
903-
public void observeLatestCheckpointShouldSkipWhenCheckpointingDisabled() throws Exception {
903+
public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Exception {
904904
Configuration conf =
905905
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
906906
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);

0 commit comments

Comments
 (0)