Skip to content

Conversation

@nishita-09
Copy link
Contributor

@nishita-09 nishita-09 commented Jul 10, 2025

What is the purpose of the change

This pull request adds a configuration to allow session cluster cleanup blocking in case unmanaged jobs are present. That is the FlinkDeployment deletion is blocked in presence of FlinkSessionJobs as well as if jobs submitted through CLI are in non_terminal states.

Brief change log

(for example:)

  • Added getUnmanagedJobs() method to detect CLI-submitted jobs not managed by FlinkSessionJob resources
  • Modified cleanupInternal() to check for unmanaged jobs when blocking is enabled
  • Improved error messages and logging.

Verifying this change

This change added tests and can be verified as follows:

  • Added testGetUnmanagedJobs test for validating getUnmanagedJobs

    • This test would validate if the function correctly identifies jobs which are not controlled by SessionJob and are in non-terminal state.
  • Manually verified the change by running a cluster with 2 JobManagers and submitted a SessionJob as well as CLI Jobs.

    • Config: session.block-on-unmanaged-jobs: true

    • Deleted flinkDeployment -> Generates Event CleanupFailed in flinkDeployment due to presence of sessionjobs

    • Deleted flinkSessionJob -> Generates Event CleanupFailed in flinkDeployment due to presence of unmanaged Jobs -> flinkSessionJob was deleted.

    • Cancelled CLI submitted job -> Generates Event Cleanup after ReconcileInterval -> CLI job was cancelled and then the flinkDeployment was cleaned up.

Screenshot 2025-07-10 at 10 04 51 PM Screenshot 2025-07-10 at 10 06 54 PM Screenshot 2025-07-14 at 2 15 32 PM
  • Config: session.block-on-unmanaged-jobs: false
  • Manually verified the change by running a cluster with 2 JobManagers and submitted a SessionJob as well as CLI Jobs.
  • Deleted flinkDeployment -> Generates Event CleanupFailed in flinkDeployment due to presence of sessionjobs
  • Deleted flinkSessionJob -> Generates Event Cleanup inspite of running CLI jobs being present. -> SessionJob is deleted , followed by flinkDeployment .

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@nishita-09 nishita-09 changed the title [FLINK-28648][operator] Fix Cleanup Process for Session Cluster [FLINK-28648][operator] Allow session deletion to block on any running job Jul 10, 2025
@nishita-09 nishita-09 changed the title [FLINK-28648][operator] Allow session deletion to block on any running job [FLINK-28648][Kubernetes Operator] Allow session deletion to block on any running job Jul 10, 2025
@nishita-09
Copy link
Contributor Author

@gyfora The build was failing due to missing documentation about added configuration. I have added those in the new commit now.

var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

// Use reflection to access the private getUnmanagedJobs method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use reflection for this, we can make the method protected with the @visiblefor testing annotation

* the Flink cluster but are not managed by FlinkSessionJob resources.
*/
private Set<JobID> getUnmanagedJobs(
FlinkResourceContext<FlinkDeployment> ctx, Set<FlinkSessionJob> sessionJobs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need to pass sessionJobs here. if the flag is enabled, any running job should simply block it. We can simplify this logic a lot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply replace this method with something like getNonTerminalJobIds() or boolean anyNonTerminalJobs()

That would be enough for this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will simplify this further. Thanks for the review

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I have pushed another commit to address the comments here. I have stuck with getNonTerminalJobIds() to ensure the Event contains the list of job IDs that are not terminated for better observability for the user.

Comment on lines 135 to 137
LOG.info(
"Starting unmanaged job detection for session cluster: {}",
ctx.getResource().getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be on debug level, also no need to include resource name/info in the log message. Its in the MDC already

Comment on lines 222 to 230
LOG.warn(error);
if (eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient())) {
LOG.warn(error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are logging the error twice, you don't need to log it at all as the event triggering already logs it.

@nishita-09 nishita-09 requested a review from gyfora July 14, 2025 09:30
@nishita-09
Copy link
Contributor Author

@gyfora I ran these tests on my local, they seem to be passing. I am not sure what is causing the issue here. Can you have a look at it if possible?

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 2 minor comments, otherwise looks good!

Comment on lines 200 to 208
if (eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient())) {
LOG.warn(error);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the if branch and the logging. Event triggering already creates logs we don't need both I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I see this for sessionjob event as well , should we remove here too.

Comment on lines 215 to 218
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in this existing logic, instead of getting the deployconfig, here we should just use ctx.getObserveConfig()

@nishita-09 nishita-09 requested a review from gyfora July 21, 2025 10:09
@nishita-09
Copy link
Contributor Author

Added 2 minor comments, otherwise looks good!

@gyfora Thank you for reviewing, I have made the changes. Please do trigger the workflows on your end. Also do let me know if we should also remove the LOG.warn() statement in sessionjob section too?

@gyfora
Copy link
Contributor

gyfora commented Jul 21, 2025

Added 2 minor comments, otherwise looks good!

@gyfora Thank you for reviewing, I have made the changes. Please do trigger the workflows on your end. Also do let me know if we should also remove the LOG.warn() statement in sessionjob section too?

We can leave it as is for now :)

@gyfora gyfora merged commit ef02fa8 into apache:main Jul 21, 2025
121 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants