Skip to content

Commit ff26986

Browse files
author
nishita-pattanayak
committed
[FLINK-28648][Flink-Kubernetes-Operator] Changed getUnManagedJobs -> getNonTerminated + annotated @VisibleForTesting
1 parent 8ed2ea4 commit ff26986

File tree

2 files changed

+54
-79
lines changed

2 files changed

+54
-79
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.kubernetes.operator.reconciler.deployment;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
2021
import org.apache.flink.api.common.JobID;
2122
import org.apache.flink.autoscaler.NoopJobAutoscaler;
2223
import org.apache.flink.configuration.Configuration;
@@ -130,11 +131,9 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
130131
* Detects unmanaged jobs running in the session cluster. Unmanaged jobs are jobs that exist in
131132
* the Flink cluster but are not managed by FlinkSessionJob resources.
132133
*/
133-
private Set<JobID> getUnmanagedJobs(
134-
FlinkResourceContext<FlinkDeployment> ctx, Set<FlinkSessionJob> sessionJobs) {
135-
LOG.info(
136-
"Starting unmanaged job detection for session cluster: {}",
137-
ctx.getResource().getMetadata().getName());
134+
@VisibleForTesting
135+
Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
136+
LOG.debug("Starting nonTerminal jobs detection for session cluster");
138137
try {
139138
// Get all jobs running in the Flink cluster
140139
var flinkService = ctx.getFlinkService();
@@ -148,35 +147,17 @@ private Set<JobID> getUnmanagedJobs(
148147
.get()
149148
.getJobs();
150149

151-
// Get job IDs managed by FlinkSessionJob resources
152-
Set<JobID> managedJobIds =
153-
sessionJobs.stream()
154-
.map(job -> job.getStatus().getJobStatus().getJobId())
155-
.filter(jobId -> jobId != null)
156-
.map(JobID::fromHexString)
157-
.collect(Collectors.toSet());
158-
159-
// Get job IDs that are not managed by FlinkSessionJob resources and are currently
160150
// running
161-
// FAILED(TerminalState.GLOBALLY)
162-
// CANCELED(TerminalState.GLOBALLY)
163-
// FINISHED(TerminalState.GLOBALLY)
164151
// Above terminal states are not considered since they are no longer active jobs
165-
Set<JobID> unmanagedJobIds =
152+
Set<JobID> nonTerminalJobIds =
166153
allJobs.stream()
167-
.filter(
168-
job ->
169-
!job.getStatus()
170-
.isGloballyTerminalState()) // Only consider
171-
// running jobs
154+
.filter(job -> !job.getStatus().isGloballyTerminalState())
172155
.map(JobDetails::getJobId)
173-
.filter(jobId -> !managedJobIds.contains(jobId))
174156
.collect(Collectors.toSet());
175157

176-
LOG.info("Detected {} unmanaged job IDs: {}", unmanagedJobIds.size(), unmanagedJobIds);
177-
return unmanagedJobIds;
158+
return nonTerminalJobIds;
178159
} catch (Exception e) {
179-
LOG.warn("Failed to detect unmanaged jobs in session cluster", e);
160+
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
180161
return Set.of();
181162
}
182163
}
@@ -206,20 +187,19 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
206187
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
207188
}
208189

209-
// Check for unmanaged jobs if the option is enabled (Enabled by default)
190+
// Check for unmanaged non-terminated jobs if the option is enabled (Enabled by default)
210191
boolean blockOnUnmanagedJobs =
211192
ctx.getObserveConfig()
212193
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
213194
if (blockOnUnmanagedJobs) {
214-
Set<JobID> unmanagedJobs = getUnmanagedJobs(ctx, sessionJobs);
215-
if (!unmanagedJobs.isEmpty()) {
195+
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
196+
if (!nonTerminalJobs.isEmpty()) {
216197
var error =
217198
String.format(
218-
"The session cluster has unmanaged jobs %s that should be cancelled first",
219-
unmanagedJobs.stream()
199+
"The session cluster has non terminated jobs %s that should be cancelled first",
200+
nonTerminalJobs.stream()
220201
.map(JobID::toHexString)
221202
.collect(Collectors.toList()));
222-
LOG.warn(error);
223203
if (eventRecorder.triggerEvent(
224204
deployment,
225205
EventRecorder.Type.Warning,

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java

Lines changed: 41 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
3333
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
3434
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
35-
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
3635
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
3736
import org.apache.flink.runtime.client.JobStatusMessage;
3837

@@ -49,7 +48,6 @@
4948
import java.util.Optional;
5049
import java.util.Set;
5150
import java.util.concurrent.atomic.AtomicInteger;
52-
import java.util.stream.Collectors;
5351

5452
import static org.junit.jupiter.api.Assertions.assertEquals;
5553
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -151,8 +149,7 @@ public void testSetOwnerReference() throws Exception {
151149
}
152150

153151
@Test
154-
public void testGetUnmanagedJobs() throws Exception {
155-
// Create a session cluster deployment with blocking enabled
152+
public void testGetNonTerminalJobs() throws Exception {
156153
FlinkDeployment deployment = TestUtils.buildSessionCluster();
157154
deployment
158155
.getSpec()
@@ -166,7 +163,6 @@ public void testGetUnmanagedJobs() throws Exception {
166163
.getFlinkConfiguration()
167164
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));
168165

169-
// First, deploy the session cluster to set up proper state
170166
reconciler.reconcile(deployment, flinkService.getContext());
171167

172168
// Verify deployment is in DEPLOYED state
@@ -181,7 +177,7 @@ public void testGetUnmanagedJobs() throws Exception {
181177
JobID unmanagedTerminatedJobId = new JobID();
182178
JobID unmanagedRunningJobId2 = new JobID();
183179

184-
// Add a mix of managed and unmanaged jobs to the testing service
180+
// Add jobs to the testing service
185181
flinkService
186182
.listJobs()
187183
.add(
@@ -253,60 +249,59 @@ public void testGetUnmanagedJobs() throws Exception {
253249
sessionJobs.add(managedSessionJob1);
254250
sessionJobs.add(managedSessionJob2);
255251

256-
// Test with blocking enabled - should identify running unmanaged jobs correctly
252+
// Test with blocking enabled - should identify all non-terminal jobs
257253
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
258254
var resourceContext = getResourceContext(deployment, context);
259255

260-
// Use reflection to access the private getUnmanagedJobs method
261-
var sessionReconciler = reconciler.getReconciler();
262-
var getUnmanagedJobsMethod =
263-
sessionReconciler
264-
.getClass()
265-
.getDeclaredMethod(
266-
"getUnmanagedJobs", FlinkResourceContext.class, Set.class);
267-
getUnmanagedJobsMethod.setAccessible(true);
268-
269-
@SuppressWarnings("unchecked")
270-
var unmanagedJobs =
271-
(java.util.Set<String>)
272-
getUnmanagedJobsMethod.invoke(
273-
sessionReconciler, resourceContext, sessionJobs);
274-
275-
// Verify only RUNNING unmanaged jobs are identified - should be 2
276-
assertEquals(2, unmanagedJobs.size(), "Should identify exactly 2 running unmanaged jobs");
277-
278-
// Verify that none of the managed job IDs are in the unmanaged list
279-
Set<String> managedJobIds =
280-
sessionJobs.stream()
281-
.map(job -> job.getStatus().getJobStatus().getJobId())
282-
.collect(Collectors.toSet());
283-
284-
for (String managedJobId : managedJobIds) {
285-
assertFalse(
286-
unmanagedJobs.contains(managedJobId),
287-
"Managed job " + managedJobId + " should NOT be in unmanaged jobs");
288-
}
256+
var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
257+
Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);
258+
259+
// Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
260+
// running)
261+
assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");
262+
263+
assertTrue(
264+
nonTerminalJobs.contains(unmanagedRunningJobId1),
265+
"Should contain unmanagedRunningJobId1");
266+
assertTrue(
267+
nonTerminalJobs.contains(unmanagedRunningJobId2),
268+
"Should contain unmanagedRunningJobId2");
289269

290-
// Verify managed jobs are properly set up (we should have 2 managed jobs)
291-
assertEquals(2, managedJobIds.size(), "Should have 2 managed jobs configured");
270+
// Verify terminated job is not included
271+
assertFalse(
272+
nonTerminalJobs.contains(unmanagedTerminatedJobId),
273+
"Should not contain terminated job");
274+
275+
// Test scenario with only unmanaged jobs
276+
flinkService
277+
.listJobs()
278+
.removeIf(
279+
job ->
280+
job.f1.getJobId().equals(managedJobId1)
281+
|| job.f1.getJobId().equals(managedJobId2));
282+
283+
Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
284+
sessionReconciler.getNonTerminalJobs(resourceContext);
285+
286+
assertEquals(
287+
2,
288+
nonTerminalJobsAfterSessionJobsRemoval.size(),
289+
"Should have 2 non-terminal jobs when sessionjobs are deleted");
292290

293-
// Test scenario with no running unmanaged jobs
291+
// Test scenario with no running jobs
294292
flinkService
295293
.listJobs()
296294
.removeIf(
297295
job ->
298296
job.f1.getJobId().equals(unmanagedRunningJobId1)
299297
|| job.f1.getJobId().equals(unmanagedRunningJobId2));
300298

301-
@SuppressWarnings("unchecked")
302-
var unmanagedJobsAfterRemoval =
303-
(java.util.Set<String>)
304-
getUnmanagedJobsMethod.invoke(
305-
sessionReconciler, resourceContext, sessionJobs);
299+
Set<JobID> nonTerminalJobsAfterRemoval =
300+
sessionReconciler.getNonTerminalJobs(resourceContext);
306301

307302
assertEquals(
308303
0,
309-
unmanagedJobsAfterRemoval.size(),
310-
"Should have no unmanaged jobs when only managed and terminated jobs exist");
304+
nonTerminalJobsAfterRemoval.size(),
305+
"Should have no non-terminal jobs when only terminated jobs exist");
311306
}
312307
}

0 commit comments

Comments
 (0)