Skip to content

Commit 7a65e02

Browse files
authored
[FLINK-37430] Operator hides the actual error on deployment issues
1 parent 6859683 commit 7a65e02

File tree

16 files changed

+255
-9
lines changed

16 files changed

+255
-9
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
238238
*
239239
* @param baseConfMap The configuration map that should be searched for relevant Flink version
240240
* prefixes.
241+
* @param flinkVersion The FlinkVersion to be used
241242
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
242243
*/
243244
protected static List<String> getRelevantVersionPrefixes(
@@ -381,6 +382,7 @@ private void applyConfigsFromCurrentSpec(
381382
* Get configuration for interacting with session jobs. Similar to the observe configuration for
382383
* FlinkDeployments.
383384
*
385+
* @param name The name of the job
384386
* @param deployment FlinkDeployment for the session cluster
385387
* @param sessionJobSpec Session job spec
386388
* @return Session job config

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3434
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3535
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
36+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3637
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
3738
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3839
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -162,7 +163,7 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
162163
flinkApp,
163164
EventRecorder.Type.Warning,
164165
"ClusterDeploymentException",
165-
e.getMessage(),
166+
ExceptionUtils.getExceptionMessage(e),
166167
EventRecorder.Component.JobManagerDeployment,
167168
josdkContext.getClient());
168169
throw new ReconciliationException(e);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3131
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3232
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
33+
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3334
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
3435
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3536
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -124,7 +125,7 @@ public UpdateControl<FlinkSessionJob> reconcile(
124125
flinkSessionJob,
125126
EventRecorder.Type.Warning,
126127
"SessionJobException",
127-
e.getMessage(),
128+
ExceptionUtils.getExceptionMessage(e),
128129
EventRecorder.Component.Job,
129130
josdkContext.getClient());
130131
throw new ReconciliationException(e);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/mutator/FlinkResourceMutator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public interface FlinkResourceMutator extends Plugin {
3131
* Mutate deployment and return the mutated Object.
3232
*
3333
* @param deployment A Flink application or session cluster deployment.
34+
* @return the mutated Flink application or session cluster deployment.
3435
*/
3536
FlinkDeployment mutateDeployment(FlinkDeployment deployment);
3637

@@ -39,13 +40,15 @@ public interface FlinkResourceMutator extends Plugin {
3940
*
4041
* @param sessionJob the session job to be mutated.
4142
* @param session the target session cluster of the session job to be Mutated.
43+
* @return the mutated session job.
4244
*/
4345
FlinkSessionJob mutateSessionJob(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);
4446

4547
/**
4648
* Mutate snapshot and return the mutated Object.
4749
*
4850
* @param stateSnapshot the snapshot to be mutated.
51+
* @return the mutated snapshot.
4952
*/
5053
FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot stateSnapshot);
5154
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ protected boolean isResourceReadyToBeObserved(FlinkResourceContext<CR> ctx) {
107107
* DEPLOYED state.
108108
*
109109
* @param ctx Context for resource.
110+
* @return true if the resource was already upgraded, false otherwise.
110111
*/
111112
protected abstract boolean checkIfAlreadyUpgraded(FlinkResourceContext<CR> ctx);
112113
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SnapshotTriggerTimestampStore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class SnapshotTriggerTimestampStore {
5353
* updated with this value.
5454
*
5555
* @param resource Flink resource
56+
* @param snapshotType the snapshot type
5657
* @param snapshotsSupplier supplies related snapshot resources
5758
* @return instant of last trigger
5859
*/
@@ -103,6 +104,7 @@ public Instant getLastPeriodicTriggerInstant(
103104
* Updates the time a periodic snapshot was last triggered for this resource.
104105
*
105106
* @param resource Kubernetes resource
107+
* @param snapshotType the snapshot type
106108
* @param instant new timestamp
107109
*/
108110
public void updateLastPeriodicTriggerTimestamp(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -954,7 +954,15 @@ private void deleteJar(Configuration conf, String jarId) {
954954
}
955955
}
956956

957-
/** Wait until Deployment is removed, return remaining timeout. */
957+
/**
958+
* Wait until Deployment is removed, return remaining timeout.
959+
*
960+
* @param name name of the deployment
961+
* @param deployment The deployment resource
962+
* @param propagation DeletePropagation
963+
* @param timeout Timeout to wait
964+
* @return remaining timeout after deletion
965+
*/
958966
@VisibleForTesting
959967
protected Duration deleteDeploymentBlocking(
960968
String name,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,16 @@ public boolean triggerEvent(
122122
}
123123

124124
/**
125+
* @param resource The resource
126+
* @param type The type
127+
* @param reason the reason
128+
* @param message the message
129+
* @param component the component
130+
* @param messageKey the message key
131+
* @param client the client
125132
* @param interval Interval for dedupe. Null mean no dedupe.
126-
* @return
127133
*/
128-
public boolean triggerEventWithInterval(
134+
public void triggerEventWithInterval(
129135
AbstractFlinkResource<?, ?> resource,
130136
Type type,
131137
String reason,
@@ -134,7 +140,7 @@ public boolean triggerEventWithInterval(
134140
String messageKey,
135141
KubernetesClient client,
136142
@Nullable Duration interval) {
137-
return EventUtils.createOrUpdateEventWithInterval(
143+
EventUtils.createOrUpdateEventWithInterval(
138144
client,
139145
resource,
140146
type,
@@ -166,12 +172,18 @@ public boolean triggerEventOnce(
166172
}
167173

168174
/**
175+
* @param resource The resource
176+
* @param type The type
177+
* @param reason the reason
178+
* @param message the message
179+
* @param component the component
180+
* @param messageKey the message key
181+
* @param client the client
169182
* @param interval Interval for dedupe. Null mean no dedupe.
170183
* @param dedupePredicate Predicate for dedupe algorithm..
171184
* @param labels Labels to store in meta data for dedupe. Do nothing if null.
172-
* @return
173185
*/
174-
public boolean triggerEventWithLabels(
186+
public void triggerEventWithLabels(
175187
AbstractFlinkResource<?, ?> resource,
176188
Type type,
177189
String reason,
@@ -182,7 +194,7 @@ public boolean triggerEventWithLabels(
182194
@Nullable Duration interval,
183195
@Nullable Predicate<Map<String, String>> dedupePredicate,
184196
@Nullable Map<String, String> labels) {
185-
return EventUtils.createOrUpdateEventWithLabels(
197+
EventUtils.createOrUpdateEventWithLabels(
186198
client,
187199
resource,
188200
type,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,17 @@
2525
/** Exception utils. * */
2626
public class ExceptionUtils {
2727

28+
private static final int EXCEPTION_LIMIT_FOR_EVENT_MESSAGE = 3;
29+
2830
/**
2931
* Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug
3032
* resulting from SerializedThrowable deserialization errors.
33+
*
34+
* @param throwable the throwable to be processed
35+
* @param searchType the type of the exception to search for
36+
* @param classLoader the classloader to use for deserialization
37+
* @param <T> the exception type
38+
* @return the found exception, or empty if it is not found.
3139
*/
3240
public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
3341
Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
@@ -57,4 +65,56 @@ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
5765

5866
return Optional.empty();
5967
}
68+
69+
/**
70+
* traverse the throwable and extract useful information for up to the first 3 possible
71+
* exceptions in the hierarchy.
72+
*
73+
* @param throwable the throwable to be processed
74+
* @return the exception message, which will have a format similar to "cause1 &rarr; cause2
75+
* &rarr; cause3"
76+
*/
77+
public static String getExceptionMessage(Throwable throwable) {
78+
return getExceptionMessage(throwable, 0);
79+
}
80+
81+
/**
82+
* Helper for recursion for `getExceptionMessage`.
83+
*
84+
* @param throwable the throwable to be processed
85+
* @param level the level we are in. The caller will set this value to 0, and we will be
86+
* incrementing it with each recursive call
87+
* @return the exception message, which will have a format similar to "cause1 -> cause2 ->
88+
* cause3"
89+
*/
90+
private static String getExceptionMessage(Throwable throwable, int level) {
91+
if (throwable == null) {
92+
return null;
93+
}
94+
95+
if (throwable instanceof SerializedThrowable) {
96+
var deserialized =
97+
((SerializedThrowable) throwable)
98+
.deserializeError(Thread.currentThread().getContextClassLoader());
99+
if (deserialized == throwable) {
100+
return "Unknown Error (SerializedThrowable)";
101+
} else {
102+
return getExceptionMessage(deserialized, level);
103+
}
104+
}
105+
106+
var msg =
107+
Optional.ofNullable(throwable.getMessage())
108+
.orElse(throwable.getClass().getSimpleName());
109+
110+
if (level >= EXCEPTION_LIMIT_FOR_EVENT_MESSAGE) {
111+
return msg;
112+
}
113+
114+
if (throwable.getCause() == null) {
115+
return msg;
116+
} else {
117+
return msg + " -> " + getExceptionMessage(throwable.getCause(), level + 1);
118+
}
119+
}
60120
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public static SnapshotStatus getLastSnapshotStatus(
134134
* @param resource The resource to be snapshotted.
135135
* @param conf The observe configuration of the resource.
136136
* @param snapshotType The type of the snapshot.
137+
* @param lastTrigger the last time the snapshot was triggered.
137138
* @return An optional {@link SnapshotTriggerType}.
138139
*/
139140
@VisibleForTesting

0 commit comments

Comments
 (0)