Skip to content
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Indicate whether the job should be drained when stopping with savepoint.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
operatorConfig("session.block-on-unmanaged-jobs")
.booleanType()
.defaultValue(true)
.withDescription(
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
Expand All @@ -25,11 +27,16 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,6 +127,37 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}

// Detects jobs which are not in globally terminated states
@VisibleForTesting
Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
LOG.debug("Starting nonTerminal jobs detection for session cluster");
try {
// Get all jobs running in the Flink cluster
var flinkService = ctx.getFlinkService();
var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
var allJobs =
clusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get()
.getJobs();

// running job Ids
Set<JobID> nonTerminalJobIds =
allJobs.stream()
.filter(job -> !job.getStatus().isGloballyTerminalState())
.map(JobDetails::getJobId)
.collect(Collectors.toSet());

return nonTerminalJobIds;
} catch (Exception e) {
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
return Set.of();
}
}

@Override
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
Set<FlinkSessionJob> sessionJobs =
Expand All @@ -143,13 +181,41 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
}
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
} else {
LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}

// Check for non-terminated jobs if the option is enabled (Enabled by default) , after
// sessionJobs are deleted
boolean blockOnUnmanagedJobs =
ctx.getObserveConfig()
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
if (blockOnUnmanagedJobs) {
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
if (!nonTerminalJobs.isEmpty()) {
var error =
String.format(
"The session cluster has non terminated jobs %s that should be cancelled first",
nonTerminalJobs.stream()
.map(JobID::toHexString)
.collect(Collectors.toList()));
if (eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient())) {
LOG.warn(error);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove the if branch and the logging. Event triggering already creates logs we don't need both I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I see this for sessionjob event as well , should we remove here too.

return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
}
}

LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in this existing logic, instead of getting the deployconfig, here we should just use ctx.getObserveConfig()

return DeleteControl.defaultDelete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.runtime.client.JobStatusMessage;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -36,13 +42,15 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -139,4 +147,161 @@ public void testSetOwnerReference() throws Exception {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}

@Test
public void testGetNonTerminalJobs() throws Exception {
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
.getSpec()
.getFlinkConfiguration()
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");

assertEquals(
"true",
deployment
.getSpec()
.getFlinkConfiguration()
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));

reconciler.reconcile(deployment, flinkService.getContext());

// Verify deployment is in DEPLOYED state
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());

// Create different types of jobs
JobID managedJobId1 = new JobID();
JobID managedJobId2 = new JobID();
JobID unmanagedRunningJobId1 = new JobID();
JobID unmanagedTerminatedJobId = new JobID();
JobID unmanagedRunningJobId2 = new JobID();

// Add jobs to the testing service
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId1,
"managed-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId2,
"managed-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId1,
"unmanaged-running-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedTerminatedJobId,
"unmanaged-terminated-job",
JobStatus.CANCELED,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId2,
"unmanaged-running-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));

// Create FlinkSessionJob resources for the managed jobs
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
managedSessionJob1.getMetadata().setName("managed-session-job-1");
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
kubernetesClient.resource(managedSessionJob1).createOrReplace();

FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
managedSessionJob2.getMetadata().setName("managed-session-job-2");
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
kubernetesClient.resource(managedSessionJob2).createOrReplace();

Set<FlinkSessionJob> sessionJobs = new HashSet<>();
sessionJobs.add(managedSessionJob1);
sessionJobs.add(managedSessionJob2);

// Test with blocking enabled - should identify all non-terminal jobs
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);

// Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
// running)
assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");

assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId1),
"Should contain unmanagedRunningJobId1");
assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId2),
"Should contain unmanagedRunningJobId2");

// Verify terminated job is not included
assertFalse(
nonTerminalJobs.contains(unmanagedTerminatedJobId),
"Should not contain terminated job");

// Test scenario with only unmanaged jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(managedJobId1)
|| job.f1.getJobId().equals(managedJobId2));

Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
2,
nonTerminalJobsAfterSessionJobsRemoval.size(),
"Should have 2 non-terminal jobs when sessionjobs are deleted");

// Test scenario with no running jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(unmanagedRunningJobId1)
|| job.f1.getJobId().equals(unmanagedRunningJobId2));

Set<JobID> nonTerminalJobsAfterRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
0,
nonTerminalJobsAfterRemoval.size(),
"Should have no non-terminal jobs when only terminated jobs exist");
}
}