-
Notifications
You must be signed in to change notification settings - Fork 498
[FLINK-37869][Observer] Fix finished bounded stream jobs can't be cle… #1003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…anup by the Flink Kubernetes Operator
|
Hi @1996fanrui , could you help take a look this PR in your free time? |
The flink cluster resource will be deleted after |
| } else if (e instanceof ExecutionException | ||
| && e.getMessage() != null | ||
| && e.getMessage() | ||
| .contains(String.format("Job %s not found", jobId.toString()))) { | ||
| LOG.warn("Job {} not found", jobId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @qinf for the PR.
I'm not sure if "Job %s not found" means the job is finished. Is it possible that the job failed?
Or could you please add some background in PR description to explain what the relationship is between getLastCheckpoint and clean job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui Thanks for the quick feedback, I have added the background. I also add a example here.
For FINISHED job 6f4257aaf163d19a6fa1519479795c11:
- when request
GET /jobs/:jobid->GET /jobs/6f4257aaf163d19a6fa1519479795c11, the response is:
{
"jid": "6f4257aaf163d19a6fa1519479795c11",
"name": "app442315instance1204699",
"isStoppable": false,
"state": "FINISHED",
"start-time": 1754445330065,
"end-time": 1754445366256,
"duration": 36191,
"maxParallelism": -1,
"now": 1754445409626,
"timestamps": {
"CREATED": 1754445338012,
"RUNNING": 1754445338616,
"CANCELED": 0,
"INITIALIZING": 1754445330065,
"RECONCILING": 0,
"FINISHED": 1754445366256,
"FAILED": 0,
"CANCELLING": 0,
"FAILING": 0,
"RESTARTING": 0,
"SUSPENDED": 0
},
......
}- when request
GET /jobs/:jobid/checkpoints->GET /jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints, the response is:
{
"errors": [
"org.apache.flink.runtime.rest.NotFoundException: Job 6f4257aaf163d19a6fa1519479795c11 not found\n\tat org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat ..."
}Maybe the current solution catching the exception in getLastCheckpoint() is somewhat unintuitive here. Another solution is catching the exception Job not found in observeLatestCheckpoint()
private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
var status = ctx.getResource().getStatus();
var jobStatus = status.getJobStatus();
ctx.getFlinkService()
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
.ifPresentOrElse(
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
() -> {
if (ReconciliationUtils.isJobCancelled(status)) {
// For cancelled jobs the observed savepoint is always definite,
// so if empty we know the job doesn't have any
// checkpoints/savepoints
jobStatus.setUpgradeSavepointPath(null);
}
});
}And can check the job is FINISHED by
JobStatus.FINISHED == resource.getStatus().getJobStatus().getState();Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the background
Sorry, I did not see any change in PR description. Is there anything I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui Sorry for that, I re-submit the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Background
- When a job is observed by the observer, the observer() method in AbstractFlinkResourceObserver is triggered
- Once the JM deployment is ready, AbstractFlinkDeploymentObserver#observeFlinkCluster() observes the Flink cluster
- SnapshotObserver#observeSavepointStatus() monitors the savepoint status
- SnapshotObserver#observeLatestCheckpoint() tracks the last checkpoint of jobs with a globally terminal state:
- A "Job not found" exception occurs when calling getLastCheckpoint() for a FINISHED job.
- In reality, the job can be retrieved via the GET /jobs/:jobid request, but the GET /jobs/:jobid/checkpoints request will throw a "Job not found" exception.
When job is finished, the control loop will be stopped due to "Job not found" exception, it caused the clean up is not called, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1996fanrui Yes, due to the "Job not found" exception, the flink cluster resouce remains in the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @qinf for the explanation!
It sounds good to me.
1996fanrui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, LGTM
Hey @gyfora , would you mind reviewing this PR when you are available? thanks
FYI: https://github.com/apache/flink-kubernetes-operator/pull/1003/files#r2257115396 is some background.
| } else if (e instanceof ExecutionException | ||
| && e.getMessage() != null | ||
| && e.getMessage() | ||
| .contains(String.format("Job %s not found", jobId.toString()))) { | ||
| LOG.warn("Job {} not found", jobId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @qinf for the explanation!
It sounds good to me.
|
@qinf @1996fanrui , wouldn't it cause a problem that we cannot observe the latest checkpoint of bounded jobs? As far as I understand bounded jobs can also take checkpoints so there may be a "latest checkpoint". If the error is ignored and the checkpoint is not recorded wouldn't that cause an inconsistency during the next upgrade (we start from an earlier checkpoint)? |
@gyfora Thanks for your feedback! You mentioned "the latest checkpoint for a bounded stream job", are you referring to this feature Checkpointing with parts of the graph finished? IIUC parts of the job graph have finished but the job is not FINISHED, we can request checkpoint information for a job that hasn't finished. Please feel free to correct me if I've misunderstood anything. |
|
@qinf , I am not talking about that feature. I am talking in general. |
@gyfora , get your meaning. But seems we cann't get checkpoint info for a FINISHED job, the job is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case this is a bug on the Flink side, not an operator side issue. Merging this PR would basically introduce a logic error on the operator which would lead to state loss during upgrades of bounded jobs.
So overall -1 for merging this
|
@gyfora Got it, thanks for your response. |
|
@qinf , I think we should cache the last checkpoint info somewhere even after |
|
@gyfora Thanks for your kind reminder, I'd like working on this. And I will create a new issue to following this. |
What is the purpose of the change
Fix finished bounded stream jobs can't be cleanup by the Flink Kubernetes Operator FLINK-37869
background
observer()method inAbstractFlinkResourceObserveris triggeredAbstractFlinkDeploymentObserver#observeFlinkCluster()observes the Flink clusterSnapshotObserver#observeSavepointStatus()monitors the savepoint statusSnapshotObserver#observeLatestCheckpoint()tracks the last checkpoint of jobs with a globally terminal state:getLastCheckpoint()for a FINISHED job.GET /jobs/:jobidrequest, but theGET /jobs/:jobid/checkpointsrequest will throw a "Job not found" exception.Brief change log
Verifying this change
This change added tests and can be verified as follows:
getLastCheckpointShouldHandleJobNotFoundDoes this pull request potentially affect one of the following parts:
CustomResourceDescriptors: (no)Documentation