diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html
index 54c1eb6946..f81afa4f2a 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -194,6 +194,12 @@
Duration |
The interval before a savepoint trigger attempt is marked as unsuccessful. |
+
+ kubernetes.operator.session.block-on-unmanaged-jobs |
+ true |
+ Boolean |
+ Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI. |
+
kubernetes.operator.snapshot.resource.enabled |
true |
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 4a72d8b766..2ba0de9ed5 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -416,6 +416,12 @@
Duration |
The interval before a savepoint trigger attempt is marked as unsuccessful. |
+
+ kubernetes.operator.session.block-on-unmanaged-jobs |
+ true |
+ Boolean |
+ Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI. |
+
kubernetes.operator.snapshot.resource.enabled |
true |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 6f5891cc5d..bd5e0a46e8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Indicate whether the job should be drained when stopping with savepoint.");
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption BLOCK_ON_UNMANAGED_JOBS =
+ operatorConfig("session.block-on-unmanaged-jobs")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");
+
@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index c628bf940e..809fbfbd12 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -17,6 +17,8 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -25,11 +27,16 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.slf4j.Logger;
@@ -120,6 +127,37 @@ private void recoverSession(FlinkResourceContext ctx) throws Ex
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}
+ // Detects jobs which are not in globally terminated states
+ @VisibleForTesting
+ Set getNonTerminalJobs(FlinkResourceContext ctx) {
+ LOG.debug("Starting nonTerminal jobs detection for session cluster");
+ try {
+ // Get all jobs running in the Flink cluster
+ var flinkService = ctx.getFlinkService();
+ var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
+ var allJobs =
+ clusterClient
+ .sendRequest(
+ JobsOverviewHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance())
+ .get()
+ .getJobs();
+
+ // running job Ids
+ Set nonTerminalJobIds =
+ allJobs.stream()
+ .filter(job -> !job.getStatus().isGloballyTerminalState())
+ .map(JobDetails::getJobId)
+ .collect(Collectors.toSet());
+
+ return nonTerminalJobIds;
+ } catch (Exception e) {
+ LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
+ return Set.of();
+ }
+ }
+
@Override
public DeleteControl cleanupInternal(FlinkResourceContext ctx) {
Set sessionJobs =
@@ -143,13 +181,39 @@ public DeleteControl cleanupInternal(FlinkResourceContext ctx)
}
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
- } else {
- LOG.info("Stopping session cluster");
- var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
- ctx.getFlinkService()
- .deleteClusterDeployment(
- deployment.getMetadata(), deployment.getStatus(), conf, true);
- return DeleteControl.defaultDelete();
}
+
+ // Check for non-terminated jobs if the option is enabled (Enabled by default) , after
+ // sessionJobs are deleted
+ boolean blockOnUnmanagedJobs =
+ ctx.getObserveConfig()
+ .getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
+ if (blockOnUnmanagedJobs) {
+ Set nonTerminalJobs = getNonTerminalJobs(ctx);
+ if (!nonTerminalJobs.isEmpty()) {
+ var error =
+ String.format(
+ "The session cluster has non terminated jobs %s that should be cancelled first",
+ nonTerminalJobs.stream()
+ .map(JobID::toHexString)
+ .collect(Collectors.toList()));
+ eventRecorder.triggerEvent(
+ deployment,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.CleanupFailed,
+ EventRecorder.Component.Operator,
+ error,
+ ctx.getKubernetesClient());
+ return DeleteControl.noFinalizerRemoval()
+ .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
+ }
+ }
+
+ LOG.info("Stopping session cluster");
+ var conf = ctx.getObserveConfig();
+ ctx.getFlinkService()
+ .deleteClusterDeployment(
+ deployment.getMetadata(), deployment.getStatus(), conf, true);
+ return DeleteControl.defaultDelete();
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 54484065fa..9f84f2c113 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -17,17 +17,23 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
+import org.apache.flink.runtime.client.JobStatusMessage;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -36,6 +42,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -43,6 +50,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -139,4 +147,161 @@ public void testSetOwnerReference() throws Exception {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}
+
+ @Test
+ public void testGetNonTerminalJobs() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");
+
+ assertEquals(
+ "true",
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));
+
+ reconciler.reconcile(deployment, flinkService.getContext());
+
+ // Verify deployment is in DEPLOYED state
+ assertEquals(
+ ReconciliationState.DEPLOYED,
+ deployment.getStatus().getReconciliationStatus().getState());
+
+ // Create different types of jobs
+ JobID managedJobId1 = new JobID();
+ JobID managedJobId2 = new JobID();
+ JobID unmanagedRunningJobId1 = new JobID();
+ JobID unmanagedTerminatedJobId = new JobID();
+ JobID unmanagedRunningJobId2 = new JobID();
+
+ // Add jobs to the testing service
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ managedJobId1,
+ "managed-job-1",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ managedJobId2,
+ "managed-job-2",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ unmanagedRunningJobId1,
+ "unmanaged-running-job-1",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ unmanagedTerminatedJobId,
+ "unmanaged-terminated-job",
+ JobStatus.CANCELED,
+ System.currentTimeMillis()),
+ new Configuration()));
+ flinkService
+ .listJobs()
+ .add(
+ Tuple3.of(
+ null,
+ new JobStatusMessage(
+ unmanagedRunningJobId2,
+ "unmanaged-running-job-2",
+ JobStatus.RUNNING,
+ System.currentTimeMillis()),
+ new Configuration()));
+
+ // Create FlinkSessionJob resources for the managed jobs
+ FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
+ managedSessionJob1.getMetadata().setName("managed-session-job-1");
+ managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
+ kubernetesClient.resource(managedSessionJob1).createOrReplace();
+
+ FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
+ managedSessionJob2.getMetadata().setName("managed-session-job-2");
+ managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
+ kubernetesClient.resource(managedSessionJob2).createOrReplace();
+
+ Set sessionJobs = new HashSet<>();
+ sessionJobs.add(managedSessionJob1);
+ sessionJobs.add(managedSessionJob2);
+
+ // Test with blocking enabled - should identify all non-terminal jobs
+ var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+ var resourceContext = getResourceContext(deployment, context);
+
+ var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
+ Set nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);
+
+ // Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
+ // running)
+ assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");
+
+ assertTrue(
+ nonTerminalJobs.contains(unmanagedRunningJobId1),
+ "Should contain unmanagedRunningJobId1");
+ assertTrue(
+ nonTerminalJobs.contains(unmanagedRunningJobId2),
+ "Should contain unmanagedRunningJobId2");
+
+ // Verify terminated job is not included
+ assertFalse(
+ nonTerminalJobs.contains(unmanagedTerminatedJobId),
+ "Should not contain terminated job");
+
+ // Test scenario with only unmanaged jobs
+ flinkService
+ .listJobs()
+ .removeIf(
+ job ->
+ job.f1.getJobId().equals(managedJobId1)
+ || job.f1.getJobId().equals(managedJobId2));
+
+ Set nonTerminalJobsAfterSessionJobsRemoval =
+ sessionReconciler.getNonTerminalJobs(resourceContext);
+
+ assertEquals(
+ 2,
+ nonTerminalJobsAfterSessionJobsRemoval.size(),
+ "Should have 2 non-terminal jobs when sessionjobs are deleted");
+
+ // Test scenario with no running jobs
+ flinkService
+ .listJobs()
+ .removeIf(
+ job ->
+ job.f1.getJobId().equals(unmanagedRunningJobId1)
+ || job.f1.getJobId().equals(unmanagedRunningJobId2));
+
+ Set nonTerminalJobsAfterRemoval =
+ sessionReconciler.getNonTerminalJobs(resourceContext);
+
+ assertEquals(
+ 0,
+ nonTerminalJobsAfterRemoval.size(),
+ "Should have no non-terminal jobs when only terminated jobs exist");
+ }
}