Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
&& e.getMessage().contains("Checkpointing has not been enabled")) {
LOG.warn("Checkpointing not enabled for job {}", jobId, e);
return Optional.empty();
} else if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage()
.contains(String.format("Job %s not found", jobId.toString()))) {
LOG.warn("Job {} not found", jobId, e);
Comment on lines +563 to +567
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qinf for the PR.

I'm not sure if "Job %s not found" means the job is finished. Is it possible that the job failed?

Or could you please add some background in PR description to explain what the relationship is between getLastCheckpoint and clean job?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Thanks for the quick feedback, I have added the background. I also add a example here.
For FINISHED job 6f4257aaf163d19a6fa1519479795c11:

  1. when request GET /jobs/:jobid -> GET /jobs/6f4257aaf163d19a6fa1519479795c11, the response is:
{
  "jid": "6f4257aaf163d19a6fa1519479795c11",
  "name": "app442315instance1204699",
  "isStoppable": false,
  "state": "FINISHED",
  "start-time": 1754445330065,
  "end-time": 1754445366256,
  "duration": 36191,
  "maxParallelism": -1,
  "now": 1754445409626,
  "timestamps": {
  "CREATED": 1754445338012,
  "RUNNING": 1754445338616,
  "CANCELED": 0,
  "INITIALIZING": 1754445330065,
  "RECONCILING": 0,
  "FINISHED": 1754445366256,
  "FAILED": 0,
  "CANCELLING": 0,
  "FAILING": 0,
  "RESTARTING": 0,
  "SUSPENDED": 0
  },
......
}
  1. when request GET /jobs/:jobid/checkpoints -> GET /jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints, the response is:
{
"errors": [
"org.apache.flink.runtime.rest.NotFoundException: Job 6f4257aaf163d19a6fa1519479795c11 not found\n\tat org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat ..."
}

Maybe the current solution catching the exception in getLastCheckpoint() is somewhat unintuitive here. Another solution is catching the exception Job not found in observeLatestCheckpoint()

private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
        var status = ctx.getResource().getStatus();
        var jobStatus = status.getJobStatus();
        ctx.getFlinkService()
                .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
                .ifPresentOrElse(
                        snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
                        () -> {
                            if (ReconciliationUtils.isJobCancelled(status)) {
                                // For cancelled jobs the observed savepoint is always definite,
                                // so if empty we know the job doesn't have any
                                // checkpoints/savepoints
                                jobStatus.setUpgradeSavepointPath(null);
                            }
                        });
    }

And can check the job is FINISHED by

JobStatus.FINISHED == resource.getStatus().getJobStatus().getState();

Do you have any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the background

Sorry, I did not see any change in PR description. Is there anything I missed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Sorry for that, I re-submit the background.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background

  • When a job is observed by the observer, the observer() method in AbstractFlinkResourceObserver is triggered
  • Once the JM deployment is ready, AbstractFlinkDeploymentObserver#observeFlinkCluster() observes the Flink cluster
  • SnapshotObserver#observeSavepointStatus() monitors the savepoint status
  • SnapshotObserver#observeLatestCheckpoint() tracks the last checkpoint of jobs with a globally terminal state:
    • A "Job not found" exception occurs when calling getLastCheckpoint() for a FINISHED job.
    • In reality, the job can be retrieved via the GET /jobs/:jobid request, but the GET /jobs/:jobid/checkpoints request will throw a "Job not found" exception.

When job is finished, the control loop will be stopped due to "Job not found" exception, it caused the clean up is not called, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Yes, due to the "Job not found" exception, the flink cluster resouce remains in the cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qinf for the explanation!

It sounds good to me.

return Optional.empty();
}
throw new ReconciliationException("Could not observe latest savepoint information", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class TestingFlinkService extends AbstractFlinkService {
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
private final Map<Long, String> checkpointStats = new HashMap<>();
@Setter private boolean throwCheckpointingDisabledError = false;
@Setter private boolean throwJobNotFoundError = false;
@Setter private Throwable jobFailedErr;

@Getter private int desiredReplicas = 0;
Expand Down Expand Up @@ -646,6 +647,13 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
}

if (throwJobNotFoundError) {
throw new ExecutionException(
new RestClientException(
String.format("Job %s not found", jobId.toString()),
HttpResponseStatus.NOT_FOUND));
}

if (checkpointInfo != null) {
return checkpointInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,4 +944,35 @@ public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Except
deployment.getMetadata().getNamespace(),
"Checkpointing has not been enabled"));
}

@Test
public void getLastCheckpointShouldHandleJobNotFound() throws Exception {
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
bringToReadyStatus(deployment);

deployment
.getStatus()
.getJobStatus()
.setState(org.apache.flink.api.common.JobStatus.FINISHED);
var jobs = flinkService.listJobs();
var oldStatus = jobs.get(0).f1;
jobs.get(0).f1 =
new JobStatusMessage(
oldStatus.getJobId(),
oldStatus.getJobName(),
org.apache.flink.api.common.JobStatus.FINISHED,
oldStatus.getStartTime());

flinkService.setThrowJobNotFoundError(true);
observer.observe(deployment, readyContext);

assertEquals(
0,
countErrorEvents(
EventRecorder.Reason.CheckpointError.name(),
deployment.getMetadata().getNamespace(),
String.format("Job {} not found", jobs.get(0).f1.getJobId())));
}
}
Loading