Skip to content

Commit 921bfd0

Browse files
haoxins1996fanrui
andauthored
[hotfix] Using Flink ExceptionUtils#findThrowable for exception checking (#818)
Update flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java Co-authored-by: Rui Fan <[email protected]> * fix --------- Co-authored-by: Rui Fan <[email protected]>
1 parent 9ea69eb commit 921bfd0

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.util.Optional;
4444
import java.util.concurrent.ExecutionException;
4545

46+
import static org.apache.flink.util.ExceptionUtils.findThrowable;
47+
4648
/** The reconciler for the {@link FlinkSessionJob}. */
4749
public class SessionJobReconciler
4850
extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
@@ -127,14 +129,13 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkSessionJob> ctx)
127129
: UpgradeMode.STATELESS;
128130
cancelJob(ctx, upgradeMode);
129131
} catch (ExecutionException e) {
130-
final var cause = e.getCause();
131-
132-
if (cause instanceof FlinkJobNotFoundException) {
132+
if (findThrowable(e, FlinkJobNotFoundException.class).isPresent()) {
133133
LOG.error("Job {} not found in the Flink cluster.", jobID, e);
134134
return DeleteControl.defaultDelete();
135135
}
136136

137-
if (cause instanceof FlinkJobTerminatedWithoutCancellationException) {
137+
if (findThrowable(e, FlinkJobTerminatedWithoutCancellationException.class)
138+
.isPresent()) {
138139
LOG.error("Job {} already terminated without cancellation.", jobID, e);
139140
return DeleteControl.defaultDelete();
140141
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,12 @@
6767
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
6868
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
6969
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
70+
import org.apache.flink.runtime.rest.util.RestClientException;
7071
import org.apache.flink.util.SerializedThrowable;
7172
import org.apache.flink.util.concurrent.Executors;
7273

74+
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
75+
7376
import io.fabric8.kubernetes.api.model.DeletionPropagation;
7477
import io.fabric8.kubernetes.api.model.HasMetadata;
7578
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -92,6 +95,7 @@
9295
import java.util.List;
9396
import java.util.Map;
9497
import java.util.Optional;
98+
import java.util.Random;
9599
import java.util.Set;
96100
import java.util.concurrent.CompletableFuture;
97101
import java.util.concurrent.TimeoutException;
@@ -447,7 +451,16 @@ private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean savepoi
447451
}
448452

449453
if (isFlinkJobNotFound) {
450-
throw new FlinkJobNotFoundException(jobID);
454+
// Throw different exceptions randomly, see
455+
// https://github.com/apache/flink-kubernetes-operator/pull/818
456+
if (new Random().nextBoolean()) {
457+
throw new RestClientException(
458+
"Job could not be found.",
459+
new FlinkJobNotFoundException(jobID),
460+
HttpResponseStatus.NOT_FOUND);
461+
} else {
462+
throw new FlinkJobNotFoundException(jobID);
463+
}
451464
}
452465

453466
var jobOpt = jobs.stream().filter(js -> js.f1.getJobId().equals(jobID)).findAny();

0 commit comments

Comments
 (0)