Skip to content
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Indicate whether the job should be drained when stopping with savepoint.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
operatorConfig("session.block-on-unmanaged-jobs")
.booleanType()
.defaultValue(true)
.withDescription(
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
Expand All @@ -25,11 +27,16 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,6 +127,37 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}

// Detects jobs which are not in globally terminated states
@VisibleForTesting
Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
LOG.debug("Starting nonTerminal jobs detection for session cluster");
try {
// Get all jobs running in the Flink cluster
var flinkService = ctx.getFlinkService();
var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
var allJobs =
clusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get()
.getJobs();

// running job Ids
Set<JobID> nonTerminalJobIds =
allJobs.stream()
.filter(job -> !job.getStatus().isGloballyTerminalState())
.map(JobDetails::getJobId)
.collect(Collectors.toSet());

return nonTerminalJobIds;
} catch (Exception e) {
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
return Set.of();
}
}

@Override
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
Set<FlinkSessionJob> sessionJobs =
Expand All @@ -143,13 +181,39 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
}
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
} else {
LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}

// Check for non-terminated jobs if the option is enabled (Enabled by default) , after
// sessionJobs are deleted
boolean blockOnUnmanagedJobs =
ctx.getObserveConfig()
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
if (blockOnUnmanagedJobs) {
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
if (!nonTerminalJobs.isEmpty()) {
var error =
String.format(
"The session cluster has non terminated jobs %s that should be cancelled first",
nonTerminalJobs.stream()
.map(JobID::toHexString)
.collect(Collectors.toList()));
eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient());
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
}
}

LOG.info("Stopping session cluster");
var conf = ctx.getObserveConfig();
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.runtime.client.JobStatusMessage;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -36,13 +42,15 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -139,4 +147,161 @@ public void testSetOwnerReference() throws Exception {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}

@Test
public void testGetNonTerminalJobs() throws Exception {
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
.getSpec()
.getFlinkConfiguration()
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");

assertEquals(
"true",
deployment
.getSpec()
.getFlinkConfiguration()
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));

reconciler.reconcile(deployment, flinkService.getContext());

// Verify deployment is in DEPLOYED state
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());

// Create different types of jobs
JobID managedJobId1 = new JobID();
JobID managedJobId2 = new JobID();
JobID unmanagedRunningJobId1 = new JobID();
JobID unmanagedTerminatedJobId = new JobID();
JobID unmanagedRunningJobId2 = new JobID();

// Add jobs to the testing service
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId1,
"managed-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId2,
"managed-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId1,
"unmanaged-running-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedTerminatedJobId,
"unmanaged-terminated-job",
JobStatus.CANCELED,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId2,
"unmanaged-running-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));

// Create FlinkSessionJob resources for the managed jobs
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
managedSessionJob1.getMetadata().setName("managed-session-job-1");
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
kubernetesClient.resource(managedSessionJob1).createOrReplace();

FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
managedSessionJob2.getMetadata().setName("managed-session-job-2");
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
kubernetesClient.resource(managedSessionJob2).createOrReplace();

Set<FlinkSessionJob> sessionJobs = new HashSet<>();
sessionJobs.add(managedSessionJob1);
sessionJobs.add(managedSessionJob2);

// Test with blocking enabled - should identify all non-terminal jobs
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);

// Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
// running)
assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");

assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId1),
"Should contain unmanagedRunningJobId1");
assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId2),
"Should contain unmanagedRunningJobId2");

// Verify terminated job is not included
assertFalse(
nonTerminalJobs.contains(unmanagedTerminatedJobId),
"Should not contain terminated job");

// Test scenario with only unmanaged jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(managedJobId1)
|| job.f1.getJobId().equals(managedJobId2));

Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
2,
nonTerminalJobsAfterSessionJobsRemoval.size(),
"Should have 2 non-terminal jobs when sessionjobs are deleted");

// Test scenario with no running jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(unmanagedRunningJobId1)
|| job.f1.getJobId().equals(unmanagedRunningJobId2));

Set<JobID> nonTerminalJobsAfterRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
0,
nonTerminalJobsAfterRemoval.size(),
"Should have no non-terminal jobs when only terminated jobs exist");
}
}