Skip to content

Commit 6532e39

Browse files
committed
[FLINK-37430] Operator hides the actual error on deployment issues
1 parent c4d460b commit 6532e39

File tree

7 files changed

+276
-2
lines changed

7 files changed

+276
-2
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3434
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3535
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
36+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3637
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
3738
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3839
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -162,7 +163,7 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
162163
flinkApp,
163164
EventRecorder.Type.Warning,
164165
"ClusterDeploymentException",
165-
e.getMessage(),
166+
ExceptionUtils.getExceptionMessage(e),
166167
EventRecorder.Component.JobManagerDeployment,
167168
josdkContext.getClient());
168169
throw new ReconciliationException(e);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3131
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3232
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
33+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3334
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
3435
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3536
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -124,7 +125,7 @@ public UpdateControl<FlinkSessionJob> reconcile(
124125
flinkSessionJob,
125126
EventRecorder.Type.Warning,
126127
"SessionJobException",
127-
e.getMessage(),
128+
ExceptionUtils.getExceptionMessage(e),
128129
EventRecorder.Component.Job,
129130
josdkContext.getClient());
130131
throw new ReconciliationException(e);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
/** Exception utils. * */
2626
public class ExceptionUtils {
2727

28+
private static final int EXCEPTION_LIMIT_FOR_EVENT_MESSAGE = 3;
29+
2830
/**
2931
* Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug
3032
* resulting from SerializedThrowable deserialization errors.
@@ -57,4 +59,56 @@ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
5759

5860
return Optional.empty();
5961
}
62+
63+
/**
64+
* traverse the throwable and extract useful information for up to the first 3 possible
65+
* exceptions in the hierarchy.
66+
*
67+
* @param throwable the throwable to be processed
68+
* @return the exception message, which will have a format similar to "cause1 -> cause2 ->
69+
* cause3"
70+
*/
71+
public static String getExceptionMessage(Throwable throwable) {
72+
return getExceptionMessage(throwable, 0);
73+
}
74+
75+
/**
76+
* Helper for recursion for `getExceptionMessage`.
77+
*
78+
* @param throwable the throwable to be processed
79+
* @param level the level we are in. The caller will set this value to 0, and we will be
80+
* incrementing it with each recursive call
81+
* @return the exception message, which will have a format similar to "cause1 -> cause2 ->
82+
* cause3"
83+
*/
84+
private static String getExceptionMessage(Throwable throwable, int level) {
85+
if (throwable == null) {
86+
return null;
87+
}
88+
89+
if (throwable instanceof SerializedThrowable) {
90+
var deserialized =
91+
((SerializedThrowable) throwable)
92+
.deserializeError(Thread.currentThread().getContextClassLoader());
93+
if (deserialized == throwable) {
94+
return "Unknown Error (SerializedThrowable)";
95+
} else {
96+
return getExceptionMessage(deserialized, level);
97+
}
98+
}
99+
100+
var msg =
101+
Optional.ofNullable(throwable.getMessage())
102+
.orElse(throwable.getClass().getSimpleName());
103+
level++;
104+
if (level == EXCEPTION_LIMIT_FOR_EVENT_MESSAGE) {
105+
return msg;
106+
}
107+
108+
if (throwable.getCause() == null) {
109+
return msg;
110+
} else {
111+
return msg + " -> " + getExceptionMessage(throwable.getCause(), level);
112+
}
113+
}
60114
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public class TestingFlinkService extends AbstractFlinkService {
130130
@Setter private boolean checkpointAvailable = true;
131131
@Setter private boolean jobManagerReady = true;
132132
@Setter private boolean deployFailure = false;
133+
@Setter private Exception makeItFailWith;
133134
@Setter private boolean triggerSavepointFailure = false;
134135
@Setter private boolean disposeSavepointFailure = false;
135136
@Setter private Runnable sessionJobSubmittedCallback;
@@ -212,6 +213,9 @@ public void submitApplicationCluster(
212213
}
213214

214215
protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
216+
if (makeItFailWith != null) {
217+
throw makeItFailWith;
218+
}
215219
if (deployFailure) {
216220
throw new Exception("Deployment failure");
217221
}
@@ -270,6 +274,10 @@ public JobID submitJobToSessionCluster(
270274
@Nullable String savepoint)
271275
throws Exception {
272276

277+
if (makeItFailWith != null) {
278+
throw makeItFailWith;
279+
}
280+
273281
if (deployFailure) {
274282
throw new Exception("Deployment failure");
275283
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
4242
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
4343
import org.apache.flink.runtime.client.JobStatusMessage;
44+
import org.apache.flink.util.SerializedThrowable;
4445

4546
import io.fabric8.kubernetes.api.model.Event;
4647
import io.fabric8.kubernetes.api.model.EventBuilder;
@@ -984,6 +985,33 @@ public void testEventOfNonDeploymentFailedException() throws Exception {
984985
assertEquals("Deployment failure", event.getMessage());
985986
}
986987

988+
@Test
989+
public void testEventOfNonDeploymentFailedChainedException() {
990+
assertTrue(testController.flinkResourceEvents().isEmpty());
991+
var flinkDeployment = TestUtils.buildApplicationCluster();
992+
993+
flinkService.setMakeItFailWith(
994+
new RuntimeException(
995+
"Deployment Failure",
996+
new IllegalStateException(
997+
null,
998+
new SerializedThrowable(new Exception("actual failure reason")))));
999+
try {
1000+
testController.reconcile(flinkDeployment, context);
1001+
fail();
1002+
} catch (Exception expected) {
1003+
}
1004+
assertEquals(2, testController.flinkResourceEvents().size());
1005+
1006+
var event = testController.flinkResourceEvents().remove();
1007+
assertEquals("Submit", event.getReason());
1008+
event = testController.flinkResourceEvents().remove();
1009+
assertEquals("ClusterDeploymentException", event.getReason());
1010+
assertEquals(
1011+
"Deployment Failure -> IllegalStateException -> actual failure reason",
1012+
event.getMessage());
1013+
}
1014+
9871015
@Test
9881016
public void cleanUpNewDeployment() {
9891017
FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
@@ -1064,6 +1092,32 @@ public void testInitialSavepointOnError() throws Exception {
10641092
assertEquals("msp", flinkService.listJobs().get(0).f0);
10651093
}
10661094

1095+
@Test
1096+
public void testErrorOnReconcileWithChainedExceptions() throws Exception {
1097+
FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
1098+
flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp");
1099+
flinkService.setMakeItFailWith(
1100+
new RuntimeException(
1101+
"Deployment Failure",
1102+
new IllegalStateException(
1103+
null,
1104+
new SerializedThrowable(new Exception("actual failure reason")))));
1105+
try {
1106+
testController.reconcile(flinkDeployment, context);
1107+
fail();
1108+
} catch (Exception expected) {
1109+
}
1110+
assertEquals(2, testController.flinkResourceEvents().size());
1111+
1112+
var event = testController.flinkResourceEvents().remove();
1113+
assertEquals("Submit", event.getReason());
1114+
event = testController.flinkResourceEvents().remove();
1115+
assertEquals("ClusterDeploymentException", event.getReason());
1116+
assertEquals(
1117+
"Deployment Failure -> IllegalStateException -> actual failure reason",
1118+
event.getMessage());
1119+
}
1120+
10671121
@Test
10681122
public void testInitialHaError() throws Exception {
10691123
var appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_20);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
3636
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3737
import org.apache.flink.runtime.client.JobStatusMessage;
38+
import org.apache.flink.util.SerializedThrowable;
3839

3940
import io.fabric8.kubernetes.client.KubernetesClient;
4041
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
@@ -615,6 +616,31 @@ public void testInitialSavepointOnError() throws Exception {
615616
assertEquals("msp", flinkService.listJobs().get(0).f0);
616617
}
617618

619+
@Test
620+
public void testErrorOnReconcileWithChainedExceptions() throws Exception {
621+
sessionJob.getSpec().getJob().setInitialSavepointPath("msp");
622+
flinkService.setMakeItFailWith(
623+
new RuntimeException(
624+
"Deployment Failure",
625+
new IllegalStateException(
626+
null,
627+
new SerializedThrowable(new Exception("actual failure reason")))));
628+
try {
629+
testController.reconcile(sessionJob, context);
630+
fail();
631+
} catch (Exception expected) {
632+
}
633+
assertEquals(2, testController.events().size());
634+
635+
var event = testController.events().remove();
636+
assertEquals("Submit", event.getReason());
637+
event = testController.events().remove();
638+
assertEquals("SessionJobException", event.getReason());
639+
assertEquals(
640+
"Deployment Failure -> IllegalStateException -> actual failure reason",
641+
event.getMessage());
642+
}
643+
618644
@Test
619645
public void verifyCanaryHandling() throws Exception {
620646
var canary = TestUtils.createCanaryJob();
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.utils;
19+
20+
import org.apache.flink.util.SerializedThrowable;
21+
22+
import org.junit.jupiter.api.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/** Test for {@link ExceptionUtils}. */
27+
public class ExceptionUtilsTest {
28+
29+
@Test
30+
void testGetExceptionMessage_nullThrowable() {
31+
assertThat(ExceptionUtils.getExceptionMessage(null)).isNull();
32+
}
33+
34+
@Test
35+
void testGetExceptionMessage_simpleException() {
36+
var ex = new RuntimeException("My Exception");
37+
assertThat(ExceptionUtils.getExceptionMessage(ex)).isEqualTo("My Exception");
38+
}
39+
40+
@Test
41+
void testGetExceptionMessage_simpleExceptionNullCause() {
42+
var ex = new OutOfMemoryError(null);
43+
assertThat(ExceptionUtils.getExceptionMessage(ex)).isEqualTo("OutOfMemoryError");
44+
}
45+
46+
@Test
47+
void testGetExceptionMessage_exceptionWithCause() {
48+
var ex = new RuntimeException("Root Cause");
49+
var exWrapped = new RuntimeException("Wrapped Exception", ex);
50+
assertThat(ExceptionUtils.getExceptionMessage(exWrapped))
51+
.isEqualTo("Wrapped Exception -> Root Cause");
52+
}
53+
54+
@Test
55+
void testGetExceptionMessage_exactlyThree() {
56+
var ex3 = new IllegalAccessException(null);
57+
var ex2 = new RuntimeException("Cause 2", ex3);
58+
var ex = new RuntimeException("Cause 1", ex2);
59+
assertThat(ExceptionUtils.getExceptionMessage(ex))
60+
.isEqualTo("Cause 1 -> Cause 2 -> IllegalAccessException");
61+
}
62+
63+
@Test
64+
void testGetExceptionMessage_onlyTwo() {
65+
var ex2 = new RuntimeException("Cause 2");
66+
var ex = new RuntimeException("Cause 1", ex2);
67+
assertThat(ExceptionUtils.getExceptionMessage(ex)).isEqualTo("Cause 1 -> Cause 2");
68+
}
69+
70+
@Test
71+
void testGetExceptionMessage_moreThanThree() {
72+
var ex4 = new RuntimeException("Cause 4");
73+
var ex3 = new RuntimeException("Cause 3", ex4);
74+
var ex2 = new IllegalStateException(null, ex3);
75+
var ex = new RuntimeException("Cause 1", ex2);
76+
assertThat(ExceptionUtils.getExceptionMessage(ex))
77+
.isEqualTo("Cause 1 -> IllegalStateException -> Cause 3");
78+
}
79+
80+
@Test
81+
void testGetExceptionMessage_serializedThrowable() {
82+
var serializedException = new SerializedThrowable(new Exception("Serialized Exception"));
83+
assertThat(ExceptionUtils.getExceptionMessage(serializedException))
84+
.isEqualTo("Serialized Exception");
85+
}
86+
87+
@Test
88+
void testGetExceptionMessage_serializedThrowableWithDoubleSerializedException() {
89+
var firstSerialized = new SerializedThrowable(new IndexOutOfBoundsException("4>3"));
90+
var serializedException = new SerializedThrowable(firstSerialized);
91+
assertThat(ExceptionUtils.getExceptionMessage(serializedException)).isEqualTo("4>3");
92+
}
93+
94+
@Test
95+
void testGetExceptionMessage_serializedThrowableWithRegularException() {
96+
var serializedException = new SerializedThrowable(new Exception("Serialized Exception"));
97+
var ex = new RuntimeException("Cause 1", serializedException);
98+
assertThat(ExceptionUtils.getExceptionMessage(ex))
99+
.isEqualTo("Cause 1 -> Serialized Exception");
100+
}
101+
102+
@Test
103+
void testGetExceptionMessage_serializedThrowableAndAnotherCause() {
104+
var ex = new RuntimeException("Cause 1");
105+
var serializedException =
106+
new SerializedThrowable(new RuntimeException("Serialized Exception", ex));
107+
assertThat(ExceptionUtils.getExceptionMessage(serializedException))
108+
.isEqualTo("Serialized Exception -> Cause 1");
109+
}
110+
111+
@Test
112+
void testGetExceptionMessage_moreThanThreeWithSerializedAnnRegularOnes() {
113+
var ex4 = new RuntimeException("Cause 4");
114+
var ex3 = new RuntimeException("Cause 3", ex4);
115+
var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3));
116+
var ex = new RuntimeException("Cause 1", ex2);
117+
assertThat(ExceptionUtils.getExceptionMessage(ex))
118+
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3");
119+
}
120+
121+
@Test
122+
void testGetExceptionMessage_moreThanThreeWithSerializedAndNotWithDifferentOrdering() {
123+
var ex4 = new RuntimeException("Cause 4");
124+
var ex3 = new RuntimeException("Cause 3", ex4);
125+
var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3));
126+
var ex = new RuntimeException("Cause 1", new SerializedThrowable(ex2));
127+
assertThat(ExceptionUtils.getExceptionMessage(ex))
128+
.isEqualTo("Cause 1 -> Cause 2 -> Cause 3");
129+
}
130+
}

0 commit comments

Comments
 (0)