Skip to content
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Indicate whether the job should be drained when stopping with savepoint.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
operatorConfig("session.block-on-unmanaged-jobs")
.booleanType()
.defaultValue(true)
.withDescription(
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
Expand All @@ -25,11 +26,16 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,6 +126,61 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}

/**
* Detects unmanaged jobs running in the session cluster. Unmanaged jobs are jobs that exist in
* the Flink cluster but are not managed by FlinkSessionJob resources.
*/
private Set<JobID> getUnmanagedJobs(
FlinkResourceContext<FlinkDeployment> ctx, Set<FlinkSessionJob> sessionJobs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need to pass sessionJobs here. if the flag is enabled, any running job should simply block it. We can simplify this logic a lot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply replace this method with something like getNonTerminalJobIds() or boolean anyNonTerminalJobs()

That would be enough for this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will simplify this further. Thanks for the review

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I have pushed another commit to address the comments here. I have stuck with getNonTerminalJobIds() to ensure the Event contains the list of job IDs that are not terminated for better observability for the user.

LOG.info(
"Starting unmanaged job detection for session cluster: {}",
ctx.getResource().getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be on debug level, also no need to include resource name/info in the log message. Its in the MDC already

try {
// Get all jobs running in the Flink cluster
var flinkService = ctx.getFlinkService();
var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
var allJobs =
clusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get()
.getJobs();

// Get job IDs managed by FlinkSessionJob resources
Set<JobID> managedJobIds =
sessionJobs.stream()
.map(job -> job.getStatus().getJobStatus().getJobId())
.filter(jobId -> jobId != null)
.map(JobID::fromHexString)
.collect(Collectors.toSet());

// Get job IDs that are not managed by FlinkSessionJob resources and are currently
// running
// FAILED(TerminalState.GLOBALLY)
// CANCELED(TerminalState.GLOBALLY)
// FINISHED(TerminalState.GLOBALLY)
// Above terminal states are not considered since they are no longer active jobs
Set<JobID> unmanagedJobIds =
allJobs.stream()
.filter(
job ->
!job.getStatus()
.isGloballyTerminalState()) // Only consider
// running jobs
.map(JobDetails::getJobId)
.filter(jobId -> !managedJobIds.contains(jobId))
.collect(Collectors.toSet());

LOG.info("Detected {} unmanaged job IDs: {}", unmanagedJobIds.size(), unmanagedJobIds);
return unmanagedJobIds;
} catch (Exception e) {
LOG.warn("Failed to detect unmanaged jobs in session cluster", e);
return Set.of();
}
}

@Override
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
Set<FlinkSessionJob> sessionJobs =
Expand All @@ -143,13 +204,41 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
}
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
} else {
LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}

// Check for unmanaged jobs if the option is enabled (Enabled by default)
boolean blockOnUnmanagedJobs =
ctx.getObserveConfig()
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
if (blockOnUnmanagedJobs) {
Set<JobID> unmanagedJobs = getUnmanagedJobs(ctx, sessionJobs);
if (!unmanagedJobs.isEmpty()) {
var error =
String.format(
"The session cluster has unmanaged jobs %s that should be cancelled first",
unmanagedJobs.stream()
.map(JobID::toHexString)
.collect(Collectors.toList()));
LOG.warn(error);
if (eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient())) {
LOG.warn(error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are logging the error twice, you don't need to log it at all as the event triggering already logs it.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the if branch and the logging. Event triggering already creates logs we don't need both I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I see this for sessionjob event as well , should we remove here too.

return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
}
}

LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in this existing logic, instead of getting the deployconfig, here we should just use ctx.getObserveConfig()

return DeleteControl.defaultDelete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.runtime.client.JobStatusMessage;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -36,13 +43,16 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -139,4 +149,164 @@ public void testSetOwnerReference() throws Exception {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}

@Test
public void testGetUnmanagedJobs() throws Exception {
// Create a session cluster deployment with blocking enabled
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
.getSpec()
.getFlinkConfiguration()
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");

assertEquals(
"true",
deployment
.getSpec()
.getFlinkConfiguration()
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));

// First, deploy the session cluster to set up proper state
reconciler.reconcile(deployment, flinkService.getContext());

// Verify deployment is in DEPLOYED state
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());

// Create different types of jobs
JobID managedJobId1 = new JobID();
JobID managedJobId2 = new JobID();
JobID unmanagedRunningJobId1 = new JobID();
JobID unmanagedTerminatedJobId = new JobID();
JobID unmanagedRunningJobId2 = new JobID();

// Add a mix of managed and unmanaged jobs to the testing service
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId1,
"managed-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId2,
"managed-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId1,
"unmanaged-running-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedTerminatedJobId,
"unmanaged-terminated-job",
JobStatus.CANCELED,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId2,
"unmanaged-running-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));

// Create FlinkSessionJob resources for the managed jobs
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
managedSessionJob1.getMetadata().setName("managed-session-job-1");
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
kubernetesClient.resource(managedSessionJob1).createOrReplace();

FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
managedSessionJob2.getMetadata().setName("managed-session-job-2");
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
kubernetesClient.resource(managedSessionJob2).createOrReplace();

Set<FlinkSessionJob> sessionJobs = new HashSet<>();
sessionJobs.add(managedSessionJob1);
sessionJobs.add(managedSessionJob2);

// Test with blocking enabled - should identify running unmanaged jobs correctly
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

// Use reflection to access the private getUnmanagedJobs method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use reflection for this, we can make the method protected with the @visiblefor testing annotation

var sessionReconciler = reconciler.getReconciler();
var getUnmanagedJobsMethod =
sessionReconciler
.getClass()
.getDeclaredMethod(
"getUnmanagedJobs", FlinkResourceContext.class, Set.class);
getUnmanagedJobsMethod.setAccessible(true);

@SuppressWarnings("unchecked")
var unmanagedJobs =
(java.util.Set<String>)
getUnmanagedJobsMethod.invoke(
sessionReconciler, resourceContext, sessionJobs);

// Verify only RUNNING unmanaged jobs are identified - should be 2
assertEquals(2, unmanagedJobs.size(), "Should identify exactly 2 running unmanaged jobs");

// Verify that none of the managed job IDs are in the unmanaged list
Set<String> managedJobIds =
sessionJobs.stream()
.map(job -> job.getStatus().getJobStatus().getJobId())
.collect(Collectors.toSet());

for (String managedJobId : managedJobIds) {
assertFalse(
unmanagedJobs.contains(managedJobId),
"Managed job " + managedJobId + " should NOT be in unmanaged jobs");
}

// Verify managed jobs are properly set up (we should have 2 managed jobs)
assertEquals(2, managedJobIds.size(), "Should have 2 managed jobs configured");

// Test scenario with no running unmanaged jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(unmanagedRunningJobId1)
|| job.f1.getJobId().equals(unmanagedRunningJobId2));

@SuppressWarnings("unchecked")
var unmanagedJobsAfterRemoval =
(java.util.Set<String>)
getUnmanagedJobsMethod.invoke(
sessionReconciler, resourceContext, sessionJobs);

assertEquals(
0,
unmanagedJobsAfterRemoval.size(),
"Should have no unmanaged jobs when only managed and terminated jobs exist");
}
}