diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 4d7634ea4c..93cb1262fe 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -51,6 +51,7 @@ import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.ExceptionUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; @@ -98,7 +99,6 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java new file mode 100644 index 0000000000..98a9393b4c --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.types.DeserializationException; +import org.apache.flink.util.SerializedThrowable; + +import java.util.Optional; + +/** Exception utils. * */ +public class ExceptionUtils { + + /** + * Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug + * resulting from SerializedThrowable deserialization errors. + */ + public static Optional findThrowableSerializedAware( + Throwable throwable, Class searchType, ClassLoader classLoader) { + + if (throwable == null || searchType == null) { + return Optional.empty(); + } + + Throwable t = throwable; + while (t != null) { + if (searchType.isAssignableFrom(t.getClass())) { + return Optional.of(searchType.cast(t)); + } else if (t instanceof SerializedThrowable) { + var deserialized = ((SerializedThrowable) t).deserializeError(classLoader); + // This is the key part of the fix: + // The deserializeError method returns the same SerializedThrowable if it cannot + // deserialize it. Previously this is what caused the infinite loop. + t = + deserialized == t + ? new DeserializationException( + "Could not deserialize SerializedThrowable") + : deserialized; + } else { + t = t.getCause(); + } + } + + return Optional.empty(); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index a26cbb461f..9082ff358a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -423,6 +423,52 @@ public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint) } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void savepointErrorTest(boolean deserializable) throws Exception { + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + var savepointPath = "file:///path/of/svp-1"; + configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath); + + var savepointErr = new SerializedThrowable(new Exception("sp test err")); + if (!deserializable) { + var cachedException = SerializedThrowable.class.getDeclaredField("cachedException"); + cachedException.setAccessible(true); + cachedException.set(savepointErr, null); + + var bytes = SerializedThrowable.class.getDeclaredField("serializedException"); + bytes.setAccessible(true); + bytes.set(savepointErr, new byte[] {1, 2, 3}); + } + + testingClusterClient.setStopWithSavepointFunction( + (jobID, advanceToEndOfEventTime, savepointDir) -> { + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(savepointErr); + return result; + }); + + var flinkService = new TestingService(testingClusterClient); + + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); + JobStatus jobStatus = deployment.getStatus().getJobStatus(); + jobStatus.setJobId(JobID.generate().toHexString()); + jobStatus.setState(RUNNING); + ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + + assertThrows( + UpgradeFailureException.class, + () -> + flinkService.cancelJob( + deployment, + SuspendMode.SAVEPOINT, + configManager.getObserveConfig(deployment), + true), + "sp test err"); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void cancelJobWithDrainOnSavepointUpgradeModeTest(boolean drainOnSavepoint) @@ -737,11 +783,14 @@ private void runNativeSavepointFormatTest(boolean failAfterSavepointCompletes) testingClusterClient.setStopWithSavepointFormat( (id, formatType, savepointDir) -> { if (failAfterSavepointCompletes) { - stopWithSavepointFuture.completeExceptionally( + CompletableFuture result = new CompletableFuture<>(); + stopWithSavepointFuture.completeExceptionally(new Exception()); + result.completeExceptionally( new CompletionException( new SerializedThrowable( new StopWithSavepointStoppingException( savepointPath, jobID)))); + return result; } else { stopWithSavepointFuture.complete( new Tuple3<>(id, formatType, savepointDir));