Skip to content

Commit 9eb3c38

Browse files
[FLINK-37370] [Observer] Finished batch jobs throw ReconciliationException and never reach FINISHED in the CR
1 parent 042c27e commit 9eb3c38

File tree

4 files changed

+42
-1
lines changed

4 files changed

+42
-1
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ private long getMaxCountForSnapshotType(
441441
}
442442

443443
private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
444-
445444
var status = ctx.getResource().getStatus();
446445
var jobStatus = status.getJobStatus();
447446

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546546
try {
547547
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548548
} catch (Exception e) {
549+
if (e instanceof RestClientException
550+
&& e.getMessage() != null
551+
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552+
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
553+
return Optional.empty();
554+
}
549555
throw new ReconciliationException("Could not observe latest savepoint information", e);
550556
}
551557

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public class TestingFlinkService extends AbstractFlinkService {
138138
@Getter private final Map<String, Boolean> savepointTriggers = new HashMap<>();
139139
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
140140
private final Map<Long, String> checkpointStats = new HashMap<>();
141+
@Setter private boolean throwCheckpointingDisabledError = false;
141142

142143
@Getter private int desiredReplicas = 0;
143144
@Getter private int cancelJobCallCount = 0;
@@ -593,6 +594,10 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
593594
Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
594595
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
595596
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597+
if (throwCheckpointingDisabledError) {
598+
throw new RestClientException(
599+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
600+
}
596601

597602
if (checkpointInfo != null) {
598603
return checkpointInfo;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,4 +898,35 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception {
898898
org.apache.flink.api.common.JobStatus.FINISHED,
899899
deployment.getStatus().getJobStatus().getState());
900900
}
901+
902+
@Test
903+
public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Exception {
904+
Configuration conf =
905+
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
906+
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
907+
bringToReadyStatus(deployment);
908+
909+
deployment
910+
.getStatus()
911+
.getJobStatus()
912+
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
913+
var jobs = flinkService.listJobs();
914+
var oldStatus = jobs.get(0).f1;
915+
jobs.get(0).f1 =
916+
new JobStatusMessage(
917+
oldStatus.getJobId(),
918+
oldStatus.getJobName(),
919+
org.apache.flink.api.common.JobStatus.FINISHED,
920+
oldStatus.getStartTime());
921+
922+
flinkService.setThrowCheckpointingDisabledError(true);
923+
observer.observe(deployment, readyContext);
924+
925+
assertEquals(
926+
0,
927+
countErrorEvents(
928+
EventRecorder.Reason.CheckpointError.name(),
929+
deployment.getMetadata().getNamespace(),
930+
"Checkpointing has not been enabled"));
931+
}
901932
}

0 commit comments

Comments
 (0)