Skip to content

Conversation

@qinf
Copy link

@qinf qinf commented Aug 2, 2025

What is the purpose of the change

Fix finished bounded stream jobs can't be cleanup by the Flink Kubernetes Operator FLINK-37869

background

  1. When a job is observed by the observer, the observer() method in AbstractFlinkResourceObserver is triggered
  2. Once the JM deployment is ready, AbstractFlinkDeploymentObserver#observeFlinkCluster() observes the Flink cluster
  3. SnapshotObserver#observeSavepointStatus() monitors the savepoint status
  4. SnapshotObserver#observeLatestCheckpoint() tracks the last checkpoint of jobs with a globally terminal state:
    • A "Job not found" exception occurs when calling getLastCheckpoint() for a FINISHED job.
    • In reality, the job can be retrieved via the GET /jobs/:jobid request, but the GET /jobs/:jobid/checkpoints request will throw a "Job not found" exception.

Brief change log

  • Catch the exception and add log when observing Flink cluster for FINISHED bounded stream job

Verifying this change

This change added tests and can be verified as follows:

  • Added unittest getLastCheckpointShouldHandleJobNotFound
  • Manually verified the change by running bounded stream job on Kubernetes cluster, the Flink cluster resource was deleted when the job finished

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (no)
  • Core observer or reconciler logic that is regularly executed: (yes)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@qinf
Copy link
Author

qinf commented Aug 3, 2025

Hi @1996fanrui , could you help take a look this PR in your free time?

@qinf
Copy link
Author

qinf commented Aug 3, 2025

Hi @1996fanrui , could you help take a look this PR in your free time?

The flink cluster resource will be deleted after kubernetes.operator.jm-deployment.shutdown-ttl passed. (org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler#cleanupTerminalJmAfterTtl)

Comment on lines +563 to +567
} else if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage()
.contains(String.format("Job %s not found", jobId.toString()))) {
LOG.warn("Job {} not found", jobId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qinf for the PR.

I'm not sure if "Job %s not found" means the job is finished. Is it possible that the job failed?

Or could you please add some background in PR description to explain what the relationship is between getLastCheckpoint and clean job?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Thanks for the quick feedback, I have added the background. I also add a example here.
For FINISHED job 6f4257aaf163d19a6fa1519479795c11:

  1. when request GET /jobs/:jobid -> GET /jobs/6f4257aaf163d19a6fa1519479795c11, the response is:
{
  "jid": "6f4257aaf163d19a6fa1519479795c11",
  "name": "app442315instance1204699",
  "isStoppable": false,
  "state": "FINISHED",
  "start-time": 1754445330065,
  "end-time": 1754445366256,
  "duration": 36191,
  "maxParallelism": -1,
  "now": 1754445409626,
  "timestamps": {
  "CREATED": 1754445338012,
  "RUNNING": 1754445338616,
  "CANCELED": 0,
  "INITIALIZING": 1754445330065,
  "RECONCILING": 0,
  "FINISHED": 1754445366256,
  "FAILED": 0,
  "CANCELLING": 0,
  "FAILING": 0,
  "RESTARTING": 0,
  "SUSPENDED": 0
  },
......
}
  1. when request GET /jobs/:jobid/checkpoints -> GET /jobs/6f4257aaf163d19a6fa1519479795c11/checkpoints, the response is:
{
"errors": [
"org.apache.flink.runtime.rest.NotFoundException: Job 6f4257aaf163d19a6fa1519479795c11 not found\n\tat org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler.lambda$handleRequest$2(AbstractCheckpointStatsHandler.java:102)\n\tat java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)\n\tat java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)\n\tat ..."
}

Maybe the current solution catching the exception in getLastCheckpoint() is somewhat unintuitive here. Another solution is catching the exception Job not found in observeLatestCheckpoint()

private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
        var status = ctx.getResource().getStatus();
        var jobStatus = status.getJobStatus();
        ctx.getFlinkService()
                .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
                .ifPresentOrElse(
                        snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
                        () -> {
                            if (ReconciliationUtils.isJobCancelled(status)) {
                                // For cancelled jobs the observed savepoint is always definite,
                                // so if empty we know the job doesn't have any
                                // checkpoints/savepoints
                                jobStatus.setUpgradeSavepointPath(null);
                            }
                        });
    }

And can check the job is FINISHED by

JobStatus.FINISHED == resource.getStatus().getJobStatus().getState();

Do you have any suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the background

Sorry, I did not see any change in PR description. Is there anything I missed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Sorry for that, I re-submit the background.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background

  • When a job is observed by the observer, the observer() method in AbstractFlinkResourceObserver is triggered
  • Once the JM deployment is ready, AbstractFlinkDeploymentObserver#observeFlinkCluster() observes the Flink cluster
  • SnapshotObserver#observeSavepointStatus() monitors the savepoint status
  • SnapshotObserver#observeLatestCheckpoint() tracks the last checkpoint of jobs with a globally terminal state:
    • A "Job not found" exception occurs when calling getLastCheckpoint() for a FINISHED job.
    • In reality, the job can be retrieved via the GET /jobs/:jobid request, but the GET /jobs/:jobid/checkpoints request will throw a "Job not found" exception.

When job is finished, the control loop will be stopped due to "Job not found" exception, it caused the clean up is not called, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui Yes, due to the "Job not found" exception, the flink cluster resouce remains in the cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qinf for the explanation!

It sounds good to me.

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, LGTM

Hey @gyfora , would you mind reviewing this PR when you are available? thanks

FYI: https://github.com/apache/flink-kubernetes-operator/pull/1003/files#r2257115396 is some background.

Comment on lines +563 to +567
} else if (e instanceof ExecutionException
&& e.getMessage() != null
&& e.getMessage()
.contains(String.format("Job %s not found", jobId.toString()))) {
LOG.warn("Job {} not found", jobId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @qinf for the explanation!

It sounds good to me.

@gyfora
Copy link
Contributor

gyfora commented Aug 10, 2025

@qinf @1996fanrui , wouldn't it cause a problem that we cannot observe the latest checkpoint of bounded jobs? As far as I understand bounded jobs can also take checkpoints so there may be a "latest checkpoint".

If the error is ignored and the checkpoint is not recorded wouldn't that cause an inconsistency during the next upgrade (we start from an earlier checkpoint)?

@qinf
Copy link
Author

qinf commented Aug 11, 2025

@qinf @1996fanrui , wouldn't it cause a problem that we cannot observe the latest checkpoint of bounded jobs? As far as I understand bounded jobs can also take checkpoints so there may be a "latest checkpoint".

If the error is ignored and the checkpoint is not recorded wouldn't that cause an inconsistency during the next upgrade (we start from an earlier checkpoint)?

@gyfora Thanks for your feedback! You mentioned "the latest checkpoint for a bounded stream job", are you referring to this feature Checkpointing with parts of the graph finished? IIUC parts of the job graph have finished but the job is not FINISHED, we can request checkpoint information for a job that hasn't finished.

Please feel free to correct me if I've misunderstood anything.

@gyfora
Copy link
Contributor

gyfora commented Aug 11, 2025

@qinf , I am not talking about that feature. I am talking in general.
If a bounded streaming job has checkpointing enabled, when inputs are finished a final checkpoint is taken, then the job goes into FINISHED state. How are we going to observe this checkpoint for a correct stateful upgrade?

@qinf
Copy link
Author

qinf commented Aug 12, 2025

@qinf , I am not talking about that feature. I am talking in general. If a bounded streaming job has checkpointing enabled, when inputs are finished a final checkpoint is taken, then the job goes into FINISHED state. How are we going to observe this checkpoint for a correct stateful upgrade?

@gyfora , get your meaning. But seems we cann't get checkpoint info for a FINISHED job, the job is unregister(jobId) from JobManagerRunnerRegistry. If operator need the checkpoint for a correct stateful upgrade the Flink JobManager should return the checkpoint info for a FINISHED job.

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case this is a bug on the Flink side, not an operator side issue. Merging this PR would basically introduce a logic error on the operator which would lead to state loss during upgrades of bounded jobs.

So overall -1 for merging this

@qinf
Copy link
Author

qinf commented Aug 12, 2025

@gyfora Got it, thanks for your response.

@qinf qinf closed this Aug 12, 2025
@gyfora
Copy link
Contributor

gyfora commented Aug 12, 2025

@qinf , I think we should cache the last checkpoint info somewhere even after unregister so that the JM can return this information. I would be happy to review if you are interested in working on this :)

@qinf
Copy link
Author

qinf commented Aug 13, 2025

@gyfora Thanks for your kind reminder, I'd like working on this. And I will create a new issue to following this.

@1996fanrui
Copy link
Member

Thanks @gyfora for the advice! and thanks @qinf for the follow up!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants