Skip to content

Commit 042c27e

Browse files
committed
[FLINK-37372] Fix infinite loop bug in savepoint error handling
1 parent e636a00 commit 042c27e

File tree

3 files changed

+111
-2
lines changed

3 files changed

+111
-2
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
5252
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
5353
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
54+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
5455
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
5556
import org.apache.flink.runtime.client.JobStatusMessage;
5657
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
@@ -98,7 +99,6 @@
9899
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
99100
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
100101
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
101-
import org.apache.flink.util.ExceptionUtils;
102102
import org.apache.flink.util.FileUtils;
103103
import org.apache.flink.util.FlinkRuntimeException;
104104
import org.apache.flink.util.Preconditions;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.utils;
19+
20+
import org.apache.flink.types.DeserializationException;
21+
import org.apache.flink.util.SerializedThrowable;
22+
23+
import java.util.Optional;
24+
25+
/** Exception utils. * */
26+
public class ExceptionUtils {
27+
28+
/**
29+
* Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug
30+
* resulting from SerializedThrowable deserialization errors.
31+
*/
32+
public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
33+
Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
34+
35+
if (throwable == null || searchType == null) {
36+
return Optional.empty();
37+
}
38+
39+
Throwable t = throwable;
40+
while (t != null) {
41+
if (searchType.isAssignableFrom(t.getClass())) {
42+
return Optional.of(searchType.cast(t));
43+
} else if (t instanceof SerializedThrowable) {
44+
var deserialized = ((SerializedThrowable) t).deserializeError(classLoader);
45+
// This is the key part of the fix:
46+
// The deserializeError method returns the same SerializedThrowable if it cannot
47+
// deserialize it. Previously this is what caused the infinite loop.
48+
t =
49+
deserialized == t
50+
? new DeserializationException(
51+
"Could not deserialize SerializedThrowable")
52+
: deserialized;
53+
} else {
54+
t = t.getCause();
55+
}
56+
}
57+
58+
return Optional.empty();
59+
}
60+
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,52 @@ public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)
423423
}
424424
}
425425

426+
@ParameterizedTest
427+
@ValueSource(booleans = {true, false})
428+
public void savepointErrorTest(boolean deserializable) throws Exception {
429+
var testingClusterClient =
430+
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
431+
var savepointPath = "file:///path/of/svp-1";
432+
configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
433+
434+
var savepointErr = new SerializedThrowable(new Exception("sp test err"));
435+
if (!deserializable) {
436+
var cachedException = SerializedThrowable.class.getDeclaredField("cachedException");
437+
cachedException.setAccessible(true);
438+
cachedException.set(savepointErr, null);
439+
440+
var bytes = SerializedThrowable.class.getDeclaredField("serializedException");
441+
bytes.setAccessible(true);
442+
bytes.set(savepointErr, new byte[] {1, 2, 3});
443+
}
444+
445+
testingClusterClient.setStopWithSavepointFunction(
446+
(jobID, advanceToEndOfEventTime, savepointDir) -> {
447+
CompletableFuture<String> result = new CompletableFuture<>();
448+
result.completeExceptionally(savepointErr);
449+
return result;
450+
});
451+
452+
var flinkService = new TestingService(testingClusterClient);
453+
454+
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
455+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
456+
JobStatus jobStatus = deployment.getStatus().getJobStatus();
457+
jobStatus.setJobId(JobID.generate().toHexString());
458+
jobStatus.setState(RUNNING);
459+
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
460+
461+
assertThrows(
462+
UpgradeFailureException.class,
463+
() ->
464+
flinkService.cancelJob(
465+
deployment,
466+
SuspendMode.SAVEPOINT,
467+
configManager.getObserveConfig(deployment),
468+
true),
469+
"sp test err");
470+
}
471+
426472
@ParameterizedTest
427473
@ValueSource(booleans = {true, false})
428474
public void cancelJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoint)
@@ -737,11 +783,14 @@ private void runNativeSavepointFormatTest(boolean failAfterSavepointCompletes)
737783
testingClusterClient.setStopWithSavepointFormat(
738784
(id, formatType, savepointDir) -> {
739785
if (failAfterSavepointCompletes) {
740-
stopWithSavepointFuture.completeExceptionally(
786+
CompletableFuture<String> result = new CompletableFuture<>();
787+
stopWithSavepointFuture.completeExceptionally(new Exception());
788+
result.completeExceptionally(
741789
new CompletionException(
742790
new SerializedThrowable(
743791
new StopWithSavepointStoppingException(
744792
savepointPath, jobID))));
793+
return result;
745794
} else {
746795
stopWithSavepointFuture.complete(
747796
new Tuple3<>(id, formatType, savepointDir));

0 commit comments

Comments
 (0)