diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 730e16847c..8a2fd2651d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -238,6 +238,7 @@ public FlinkOperatorConfiguration getOperatorConfiguration( * * @param baseConfMap The configuration map that should be searched for relevant Flink version * prefixes. + * @param flinkVersion The FlinkVersion to be used * @return A list of relevant Flink version prefixes in order of ascending Flink version. */ protected static List getRelevantVersionPrefixes( @@ -381,6 +382,7 @@ private void applyConfigsFromCurrentSpec( * Get configuration for interacting with session jobs. Similar to the observe configuration for * FlinkDeployments. * + * @param name The name of the job * @param deployment FlinkDeployment for the session cluster * @param sessionJobSpec Session job spec * @return Session job config diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index ac0f7356e1..bae03cba2d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; +import org.apache.flink.kubernetes.operator.utils.ExceptionUtils; import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; @@ -162,7 +163,7 @@ public UpdateControl reconcile(FlinkDeployment flinkApp, Contex flinkApp, EventRecorder.Type.Warning, "ClusterDeploymentException", - e.getMessage(), + ExceptionUtils.getExceptionMessage(e), EventRecorder.Component.JobManagerDeployment, josdkContext.getClient()); throw new ReconciliationException(e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 1e818d6659..4838dea865 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; +import org.apache.flink.kubernetes.operator.utils.ExceptionUtils; import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; @@ -124,7 +125,7 @@ public UpdateControl reconcile( flinkSessionJob, EventRecorder.Type.Warning, "SessionJobException", - e.getMessage(), + ExceptionUtils.getExceptionMessage(e), EventRecorder.Component.Job, josdkContext.getClient()); throw new ReconciliationException(e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java index 606d243b51..47e185c1ce 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java @@ -31,6 +31,7 @@ public interface FlinkResourceMutator extends Plugin { * Mutate deployment and return the mutated Object. * * @param deployment A Flink application or session cluster deployment. + * @return the mutated Flink application or session cluster deployment. */ FlinkDeployment mutateDeployment(FlinkDeployment deployment); @@ -39,6 +40,7 @@ public interface FlinkResourceMutator extends Plugin { * * @param sessionJob the session job to be mutated. * @param session the target session cluster of the session job to be Mutated. + * @return the mutated session job. */ FlinkSessionJob mutateSessionJob(FlinkSessionJob sessionJob, Optional session); @@ -46,6 +48,7 @@ public interface FlinkResourceMutator extends Plugin { * Mutate snapshot and return the mutated Object. * * @param stateSnapshot the snapshot to be mutated. + * @return the mutated snapshot. */ FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot stateSnapshot); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java index 9fd9f59687..1baf68f60a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java @@ -107,6 +107,7 @@ protected boolean isResourceReadyToBeObserved(FlinkResourceContext ctx) { * DEPLOYED state. * * @param ctx Context for resource. + * @return true if the resource was already upgraded, false otherwise. */ protected abstract boolean checkIfAlreadyUpgraded(FlinkResourceContext ctx); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java index 21cad5e62b..b206840372 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java @@ -53,6 +53,7 @@ public class SnapshotTriggerTimestampStore { * updated with this value. * * @param resource Flink resource + * @param snapshotType the snapshot type * @param snapshotsSupplier supplies related snapshot resources * @return instant of last trigger */ @@ -103,6 +104,7 @@ public Instant getLastPeriodicTriggerInstant( * Updates the time a periodic snapshot was last triggered for this resource. * * @param resource Kubernetes resource + * @param snapshotType the snapshot type * @param instant new timestamp */ public void updateLastPeriodicTriggerTimestamp( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index d88aca65be..48a70b3a71 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -954,7 +954,15 @@ private void deleteJar(Configuration conf, String jarId) { } } - /** Wait until Deployment is removed, return remaining timeout. */ + /** + * Wait until Deployment is removed, return remaining timeout. + * + * @param name name of the deployment + * @param deployment The deployment resource + * @param propagation DeletePropagation + * @param timeout Timeout to wait + * @return remaining timeout after deletion + */ @VisibleForTesting protected Duration deleteDeploymentBlocking( String name, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index 02f73fcc17..4af3f1906e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -122,10 +122,16 @@ public boolean triggerEvent( } /** + * @param resource The resource + * @param type The type + * @param reason the reason + * @param message the message + * @param component the component + * @param messageKey the message key + * @param client the client * @param interval Interval for dedupe. Null mean no dedupe. - * @return */ - public boolean triggerEventWithInterval( + public void triggerEventWithInterval( AbstractFlinkResource resource, Type type, String reason, @@ -134,7 +140,7 @@ public boolean triggerEventWithInterval( String messageKey, KubernetesClient client, @Nullable Duration interval) { - return EventUtils.createOrUpdateEventWithInterval( + EventUtils.createOrUpdateEventWithInterval( client, resource, type, @@ -166,12 +172,18 @@ public boolean triggerEventOnce( } /** + * @param resource The resource + * @param type The type + * @param reason the reason + * @param message the message + * @param component the component + * @param messageKey the message key + * @param client the client * @param interval Interval for dedupe. Null mean no dedupe. * @param dedupePredicate Predicate for dedupe algorithm.. * @param labels Labels to store in meta data for dedupe. Do nothing if null. - * @return */ - public boolean triggerEventWithLabels( + public void triggerEventWithLabels( AbstractFlinkResource resource, Type type, String reason, @@ -182,7 +194,7 @@ public boolean triggerEventWithLabels( @Nullable Duration interval, @Nullable Predicate> dedupePredicate, @Nullable Map labels) { - return EventUtils.createOrUpdateEventWithLabels( + EventUtils.createOrUpdateEventWithLabels( client, resource, type, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java index 98a9393b4c..ad7bd6be92 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java @@ -25,9 +25,17 @@ /** Exception utils. * */ public class ExceptionUtils { + private static final int EXCEPTION_LIMIT_FOR_EVENT_MESSAGE = 3; + /** * Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug * resulting from SerializedThrowable deserialization errors. + * + * @param throwable the throwable to be processed + * @param searchType the type of the exception to search for + * @param classLoader the classloader to use for deserialization + * @param the exception type + * @return the found exception, or empty if it is not found. */ public static Optional findThrowableSerializedAware( Throwable throwable, Class searchType, ClassLoader classLoader) { @@ -57,4 +65,56 @@ public static Optional findThrowableSerializedAware( return Optional.empty(); } + + /** + * traverse the throwable and extract useful information for up to the first 3 possible + * exceptions in the hierarchy. + * + * @param throwable the throwable to be processed + * @return the exception message, which will have a format similar to "cause1 → cause2 + * → cause3" + */ + public static String getExceptionMessage(Throwable throwable) { + return getExceptionMessage(throwable, 0); + } + + /** + * Helper for recursion for `getExceptionMessage`. + * + * @param throwable the throwable to be processed + * @param level the level we are in. The caller will set this value to 0, and we will be + * incrementing it with each recursive call + * @return the exception message, which will have a format similar to "cause1 -> cause2 -> + * cause3" + */ + private static String getExceptionMessage(Throwable throwable, int level) { + if (throwable == null) { + return null; + } + + if (throwable instanceof SerializedThrowable) { + var deserialized = + ((SerializedThrowable) throwable) + .deserializeError(Thread.currentThread().getContextClassLoader()); + if (deserialized == throwable) { + return "Unknown Error (SerializedThrowable)"; + } else { + return getExceptionMessage(deserialized, level); + } + } + + var msg = + Optional.ofNullable(throwable.getMessage()) + .orElse(throwable.getClass().getSimpleName()); + + if (level >= EXCEPTION_LIMIT_FOR_EVENT_MESSAGE) { + return msg; + } + + if (throwable.getCause() == null) { + return msg; + } else { + return msg + " -> " + getExceptionMessage(throwable.getCause(), level + 1); + } + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java index f8ce07f44c..7ce3db3193 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java @@ -134,6 +134,7 @@ public static SnapshotStatus getLastSnapshotStatus( * @param resource The resource to be snapshotted. * @param conf The observe configuration of the resource. * @param snapshotType The type of the snapshot. + * @param lastTrigger the last time the snapshot was triggered. * @return An optional {@link SnapshotTriggerType}. */ @VisibleForTesting diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java index 8b4fc630d4..e06aa11193 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java @@ -85,6 +85,7 @@ public void notifyListeners(CR resource, STATUS prevStatus) { * operator behavior. * * @param resource Resource for which status update should be performed + * @param client Kubernetes client to use for the update */ @SneakyThrows public void patchAndCacheStatus(CR resource, KubernetesClient client) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java index 33f30d9965..8f2a37103e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/FlinkResourceValidator.java @@ -50,6 +50,7 @@ Optional validateSessionJob( * Validate and return optional error. * * @param savepoint the savepoint to be validated. + * @param target the target resource of the savepoint to be validated. * @return Optional error string, should be present iff validation resulted in an error */ Optional validateStateSnapshot( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index c18e8f34c5..3dfc34a909 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -130,6 +130,7 @@ public class TestingFlinkService extends AbstractFlinkService { @Setter private boolean checkpointAvailable = true; @Setter private boolean jobManagerReady = true; @Setter private boolean deployFailure = false; + @Setter private Exception makeItFailWith; @Setter private boolean triggerSavepointFailure = false; @Setter private boolean disposeSavepointFailure = false; @Setter private Runnable sessionJobSubmittedCallback; @@ -212,6 +213,9 @@ public void submitApplicationCluster( } protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception { + if (makeItFailWith != null) { + throw makeItFailWith; + } if (deployFailure) { throw new Exception("Deployment failure"); } @@ -270,6 +274,10 @@ public JobID submitJobToSessionCluster( @Nullable String savepoint) throws Exception { + if (makeItFailWith != null) { + throw makeItFailWith; + } + if (deployFailure) { throw new Exception("Deployment failure"); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 0354a1256e..18273c6372 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -41,6 +41,7 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.util.SerializedThrowable; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; @@ -984,6 +985,33 @@ public void testEventOfNonDeploymentFailedException() throws Exception { assertEquals("Deployment failure", event.getMessage()); } + @Test + public void testEventOfNonDeploymentFailedChainedException() { + assertTrue(testController.flinkResourceEvents().isEmpty()); + var flinkDeployment = TestUtils.buildApplicationCluster(); + + flinkService.setMakeItFailWith( + new RuntimeException( + "Deployment Failure", + new IllegalStateException( + null, + new SerializedThrowable(new Exception("actual failure reason"))))); + try { + testController.reconcile(flinkDeployment, context); + fail(); + } catch (Exception expected) { + } + assertEquals(2, testController.flinkResourceEvents().size()); + + var event = testController.flinkResourceEvents().remove(); + assertEquals("Submit", event.getReason()); + event = testController.flinkResourceEvents().remove(); + assertEquals("ClusterDeploymentException", event.getReason()); + assertEquals( + "Deployment Failure -> IllegalStateException -> actual failure reason", + event.getMessage()); + } + @Test public void cleanUpNewDeployment() { FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); @@ -1064,6 +1092,32 @@ public void testInitialSavepointOnError() throws Exception { assertEquals("msp", flinkService.listJobs().get(0).f0); } + @Test + public void testErrorOnReconcileWithChainedExceptions() throws Exception { + FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); + flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp"); + flinkService.setMakeItFailWith( + new RuntimeException( + "Deployment Failure", + new IllegalStateException( + null, + new SerializedThrowable(new Exception("actual failure reason"))))); + try { + testController.reconcile(flinkDeployment, context); + fail(); + } catch (Exception expected) { + } + assertEquals(2, testController.flinkResourceEvents().size()); + + var event = testController.flinkResourceEvents().remove(); + assertEquals("Submit", event.getReason()); + event = testController.flinkResourceEvents().remove(); + assertEquals("ClusterDeploymentException", event.getReason()); + assertEquals( + "Deployment Failure -> IllegalStateException -> actual failure reason", + event.getMessage()); + } + @Test public void testInitialHaError() throws Exception { var appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_20); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index 0a5490890f..b7ad6f1352 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.util.SerializedThrowable; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -615,6 +616,31 @@ public void testInitialSavepointOnError() throws Exception { assertEquals("msp", flinkService.listJobs().get(0).f0); } + @Test + public void testErrorOnReconcileWithChainedExceptions() throws Exception { + sessionJob.getSpec().getJob().setInitialSavepointPath("msp"); + flinkService.setMakeItFailWith( + new RuntimeException( + "Deployment Failure", + new IllegalStateException( + null, + new SerializedThrowable(new Exception("actual failure reason"))))); + try { + testController.reconcile(sessionJob, context); + fail(); + } catch (Exception expected) { + } + assertEquals(2, testController.events().size()); + + var event = testController.events().remove(); + assertEquals("Submit", event.getReason()); + event = testController.events().remove(); + assertEquals("SessionJobException", event.getReason()); + assertEquals( + "Deployment Failure -> IllegalStateException -> actual failure reason", + event.getMessage()); + } + @Test public void verifyCanaryHandling() throws Exception { var canary = TestUtils.createCanaryJob(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java new file mode 100644 index 0000000000..1bd8873d36 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.util.SerializedThrowable; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ExceptionUtils}. */ +public class ExceptionUtilsTest { + + @Test + void testGetExceptionMessage_nullThrowable() { + assertThat(ExceptionUtils.getExceptionMessage(null)).isNull(); + } + + @Test + void testGetExceptionMessage_serializedThrowable() { + var serializedException = new SerializedThrowable(new Exception("Serialized Exception")); + assertThat(ExceptionUtils.getExceptionMessage(serializedException)) + .isEqualTo("Serialized Exception"); + } + + @Test + void testGetExceptionMessage_differentKindsOfExceptions() { + var ex4 = new RuntimeException("Cause 4"); + var ex3 = new RuntimeException("Cause 3", ex4); + var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3)); + var ex = new RuntimeException("Cause 1", ex2); + assertThat(ExceptionUtils.getExceptionMessage(ex)) + .isEqualTo("Cause 1 -> Cause 2 -> Cause 3 -> Cause 4"); + } + + @Test + void testSerializedThrowableError() { + var serializedException = new SerializedThrowable(new NonSerializableException()); + assertThat(ExceptionUtils.getExceptionMessage(serializedException)) + .isEqualTo("Unknown Error (SerializedThrowable)"); + } + + private static class NonSerializableException extends Exception { + private void writeObject(java.io.ObjectOutputStream stream) throws IOException { + throw new IOException(); + } + } +}