Skip to content

Commit ef02fa8

Browse files
authored
[FLINK-28648] Allow session deletion to block on any running job
1 parent b03ebae commit ef02fa8

File tree

5 files changed

+256
-7
lines changed

5 files changed

+256
-7
lines changed

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@
194194
<td>Duration</td>
195195
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
196196
</tr>
197+
<tr>
198+
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
199+
<td style="word-wrap: break-word;">true</td>
200+
<td>Boolean</td>
201+
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
202+
</tr>
197203
<tr>
198204
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
199205
<td style="word-wrap: break-word;">true</td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,12 @@
416416
<td>Duration</td>
417417
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
418418
</tr>
419+
<tr>
420+
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
421+
<td style="word-wrap: break-word;">true</td>
422+
<td>Boolean</td>
423+
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
424+
</tr>
419425
<tr>
420426
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
421427
<td style="word-wrap: break-word;">true</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
647647
.withDescription(
648648
"Indicate whether the job should be drained when stopping with savepoint.");
649649

650+
@Documentation.Section(SECTION_DYNAMIC)
651+
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
652+
operatorConfig("session.block-on-unmanaged-jobs")
653+
.booleanType()
654+
.defaultValue(true)
655+
.withDescription(
656+
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");
657+
650658
@Documentation.Section(SECTION_ADVANCED)
651659
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
652660
operatorConfig("cluster.resource-view.refresh-interval")

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.kubernetes.operator.reconciler.deployment;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.api.common.JobID;
2022
import org.apache.flink.autoscaler.NoopJobAutoscaler;
2123
import org.apache.flink.configuration.Configuration;
2224
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -25,11 +27,16 @@
2527
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
2628
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2729
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
30+
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
2831
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2932
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3033
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3134
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
3235
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
36+
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
37+
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
38+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
39+
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
3340

3441
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
3542
import org.slf4j.Logger;
@@ -120,6 +127,37 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
120127
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
121128
}
122129

130+
// Detects jobs which are not in globally terminated states
131+
@VisibleForTesting
132+
Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
133+
LOG.debug("Starting nonTerminal jobs detection for session cluster");
134+
try {
135+
// Get all jobs running in the Flink cluster
136+
var flinkService = ctx.getFlinkService();
137+
var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
138+
var allJobs =
139+
clusterClient
140+
.sendRequest(
141+
JobsOverviewHeaders.getInstance(),
142+
EmptyMessageParameters.getInstance(),
143+
EmptyRequestBody.getInstance())
144+
.get()
145+
.getJobs();
146+
147+
// running job Ids
148+
Set<JobID> nonTerminalJobIds =
149+
allJobs.stream()
150+
.filter(job -> !job.getStatus().isGloballyTerminalState())
151+
.map(JobDetails::getJobId)
152+
.collect(Collectors.toSet());
153+
154+
return nonTerminalJobIds;
155+
} catch (Exception e) {
156+
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
157+
return Set.of();
158+
}
159+
}
160+
123161
@Override
124162
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
125163
Set<FlinkSessionJob> sessionJobs =
@@ -143,13 +181,39 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
143181
}
144182
return DeleteControl.noFinalizerRemoval()
145183
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
146-
} else {
147-
LOG.info("Stopping session cluster");
148-
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
149-
ctx.getFlinkService()
150-
.deleteClusterDeployment(
151-
deployment.getMetadata(), deployment.getStatus(), conf, true);
152-
return DeleteControl.defaultDelete();
153184
}
185+
186+
// Check for non-terminated jobs if the option is enabled (Enabled by default) , after
187+
// sessionJobs are deleted
188+
boolean blockOnUnmanagedJobs =
189+
ctx.getObserveConfig()
190+
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
191+
if (blockOnUnmanagedJobs) {
192+
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
193+
if (!nonTerminalJobs.isEmpty()) {
194+
var error =
195+
String.format(
196+
"The session cluster has non terminated jobs %s that should be cancelled first",
197+
nonTerminalJobs.stream()
198+
.map(JobID::toHexString)
199+
.collect(Collectors.toList()));
200+
eventRecorder.triggerEvent(
201+
deployment,
202+
EventRecorder.Type.Warning,
203+
EventRecorder.Reason.CleanupFailed,
204+
EventRecorder.Component.Operator,
205+
error,
206+
ctx.getKubernetesClient());
207+
return DeleteControl.noFinalizerRemoval()
208+
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
209+
}
210+
}
211+
212+
LOG.info("Stopping session cluster");
213+
var conf = ctx.getObserveConfig();
214+
ctx.getFlinkService()
215+
.deleteClusterDeployment(
216+
deployment.getMetadata(), deployment.getStatus(), conf, true);
217+
return DeleteControl.defaultDelete();
154218
}
155219
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@
1717

1818
package org.apache.flink.kubernetes.operator.reconciler.deployment;
1919

20+
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.api.java.tuple.Tuple3;
2023
import org.apache.flink.configuration.Configuration;
2124
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
2225
import org.apache.flink.kubernetes.operator.OperatorTestBase;
2326
import org.apache.flink.kubernetes.operator.TestUtils;
2427
import org.apache.flink.kubernetes.operator.TestingFlinkService;
2528
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
29+
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
2630
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
2731
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2832
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
2933
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
34+
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3035
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
36+
import org.apache.flink.runtime.client.JobStatusMessage;
3137

3238
import io.fabric8.kubernetes.api.model.ObjectMeta;
3339
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -36,13 +42,15 @@
3642
import org.junit.jupiter.api.Assertions;
3743
import org.junit.jupiter.api.Test;
3844

45+
import java.util.HashSet;
3946
import java.util.List;
4047
import java.util.Map;
4148
import java.util.Optional;
4249
import java.util.Set;
4350
import java.util.concurrent.atomic.AtomicInteger;
4451

4552
import static org.junit.jupiter.api.Assertions.assertEquals;
53+
import static org.junit.jupiter.api.Assertions.assertFalse;
4654
import static org.junit.jupiter.api.Assertions.assertTrue;
4755
import static org.junit.jupiter.api.Assertions.fail;
4856

@@ -139,4 +147,161 @@ public void testSetOwnerReference() throws Exception {
139147
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
140148
Assertions.assertEquals(expectedOwnerReferences, or);
141149
}
150+
151+
@Test
152+
public void testGetNonTerminalJobs() throws Exception {
153+
FlinkDeployment deployment = TestUtils.buildSessionCluster();
154+
deployment
155+
.getSpec()
156+
.getFlinkConfiguration()
157+
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");
158+
159+
assertEquals(
160+
"true",
161+
deployment
162+
.getSpec()
163+
.getFlinkConfiguration()
164+
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));
165+
166+
reconciler.reconcile(deployment, flinkService.getContext());
167+
168+
// Verify deployment is in DEPLOYED state
169+
assertEquals(
170+
ReconciliationState.DEPLOYED,
171+
deployment.getStatus().getReconciliationStatus().getState());
172+
173+
// Create different types of jobs
174+
JobID managedJobId1 = new JobID();
175+
JobID managedJobId2 = new JobID();
176+
JobID unmanagedRunningJobId1 = new JobID();
177+
JobID unmanagedTerminatedJobId = new JobID();
178+
JobID unmanagedRunningJobId2 = new JobID();
179+
180+
// Add jobs to the testing service
181+
flinkService
182+
.listJobs()
183+
.add(
184+
Tuple3.of(
185+
null,
186+
new JobStatusMessage(
187+
managedJobId1,
188+
"managed-job-1",
189+
JobStatus.RUNNING,
190+
System.currentTimeMillis()),
191+
new Configuration()));
192+
flinkService
193+
.listJobs()
194+
.add(
195+
Tuple3.of(
196+
null,
197+
new JobStatusMessage(
198+
managedJobId2,
199+
"managed-job-2",
200+
JobStatus.RUNNING,
201+
System.currentTimeMillis()),
202+
new Configuration()));
203+
flinkService
204+
.listJobs()
205+
.add(
206+
Tuple3.of(
207+
null,
208+
new JobStatusMessage(
209+
unmanagedRunningJobId1,
210+
"unmanaged-running-job-1",
211+
JobStatus.RUNNING,
212+
System.currentTimeMillis()),
213+
new Configuration()));
214+
flinkService
215+
.listJobs()
216+
.add(
217+
Tuple3.of(
218+
null,
219+
new JobStatusMessage(
220+
unmanagedTerminatedJobId,
221+
"unmanaged-terminated-job",
222+
JobStatus.CANCELED,
223+
System.currentTimeMillis()),
224+
new Configuration()));
225+
flinkService
226+
.listJobs()
227+
.add(
228+
Tuple3.of(
229+
null,
230+
new JobStatusMessage(
231+
unmanagedRunningJobId2,
232+
"unmanaged-running-job-2",
233+
JobStatus.RUNNING,
234+
System.currentTimeMillis()),
235+
new Configuration()));
236+
237+
// Create FlinkSessionJob resources for the managed jobs
238+
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
239+
managedSessionJob1.getMetadata().setName("managed-session-job-1");
240+
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
241+
kubernetesClient.resource(managedSessionJob1).createOrReplace();
242+
243+
FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
244+
managedSessionJob2.getMetadata().setName("managed-session-job-2");
245+
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
246+
kubernetesClient.resource(managedSessionJob2).createOrReplace();
247+
248+
Set<FlinkSessionJob> sessionJobs = new HashSet<>();
249+
sessionJobs.add(managedSessionJob1);
250+
sessionJobs.add(managedSessionJob2);
251+
252+
// Test with blocking enabled - should identify all non-terminal jobs
253+
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
254+
var resourceContext = getResourceContext(deployment, context);
255+
256+
var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
257+
Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);
258+
259+
// Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
260+
// running)
261+
assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");
262+
263+
assertTrue(
264+
nonTerminalJobs.contains(unmanagedRunningJobId1),
265+
"Should contain unmanagedRunningJobId1");
266+
assertTrue(
267+
nonTerminalJobs.contains(unmanagedRunningJobId2),
268+
"Should contain unmanagedRunningJobId2");
269+
270+
// Verify terminated job is not included
271+
assertFalse(
272+
nonTerminalJobs.contains(unmanagedTerminatedJobId),
273+
"Should not contain terminated job");
274+
275+
// Test scenario with only unmanaged jobs
276+
flinkService
277+
.listJobs()
278+
.removeIf(
279+
job ->
280+
job.f1.getJobId().equals(managedJobId1)
281+
|| job.f1.getJobId().equals(managedJobId2));
282+
283+
Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
284+
sessionReconciler.getNonTerminalJobs(resourceContext);
285+
286+
assertEquals(
287+
2,
288+
nonTerminalJobsAfterSessionJobsRemoval.size(),
289+
"Should have 2 non-terminal jobs when sessionjobs are deleted");
290+
291+
// Test scenario with no running jobs
292+
flinkService
293+
.listJobs()
294+
.removeIf(
295+
job ->
296+
job.f1.getJobId().equals(unmanagedRunningJobId1)
297+
|| job.f1.getJobId().equals(unmanagedRunningJobId2));
298+
299+
Set<JobID> nonTerminalJobsAfterRemoval =
300+
sessionReconciler.getNonTerminalJobs(resourceContext);
301+
302+
assertEquals(
303+
0,
304+
nonTerminalJobsAfterRemoval.size(),
305+
"Should have no non-terminal jobs when only terminated jobs exist");
306+
}
142307
}

0 commit comments

Comments
 (0)