Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ private long getMaxCountForSnapshotType(
}

private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {

var status = ctx.getResource().getStatus();
var jobStatus = status.getJobStatus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
try {
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
} catch (Exception e) {
if (e instanceof RestClientException
&& e.getMessage() != null
&& e.getMessage().contains("Checkpointing has not been enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
return Optional.empty();
}
throw new ReconciliationException("Could not observe latest savepoint information", e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class TestingFlinkService extends AbstractFlinkService {
@Getter private final Map<String, Boolean> savepointTriggers = new HashMap<>();
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
private final Map<Long, String> checkpointStats = new HashMap<>();
@Setter private boolean throwCheckpointingDisabledError = false;

@Getter private int desiredReplicas = 0;
@Getter private int cancelJobCallCount = 0;
Expand Down Expand Up @@ -593,6 +594,10 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>,
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
if (throwCheckpointingDisabledError) {
throw new RestClientException(
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
}

if (checkpointInfo != null) {
return checkpointInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,4 +898,35 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception {
org.apache.flink.api.common.JobStatus.FINISHED,
deployment.getStatus().getJobStatus().getState());
}

@Test
public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Exception {
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
bringToReadyStatus(deployment);

deployment
.getStatus()
.getJobStatus()
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
var jobs = flinkService.listJobs();
var oldStatus = jobs.get(0).f1;
jobs.get(0).f1 =
new JobStatusMessage(
oldStatus.getJobId(),
oldStatus.getJobName(),
org.apache.flink.api.common.JobStatus.FINISHED,
oldStatus.getStartTime());

flinkService.setThrowCheckpointingDisabledError(true);
observer.observe(deployment, readyContext);

assertEquals(
0,
countErrorEvents(
EventRecorder.Reason.CheckpointError.name(),
deployment.getMetadata().getNamespace(),
"Checkpointing has not been enabled"));
}
}
Loading