Skip to content

Commit 46ee95c

Browse files
committed
[hotfix] Make exception reporting compatible with Flink 2.0
1 parent 0679d63 commit 46ee95c

File tree

1 file changed

+367
-0
lines changed

1 file changed

+367
-0
lines changed
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.messages;
20+
21+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
25+
26+
import javax.annotation.Nullable;
27+
28+
import java.util.Collection;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Objects;
33+
import java.util.StringJoiner;
34+
35+
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
36+
import static org.apache.flink.util.Preconditions.checkNotNull;
37+
38+
/** Copied from Flink 2.0 to handle removed changes. */
39+
public class JobExceptionsInfoWithHistory implements ResponseBody {
40+
41+
public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory";
42+
43+
@JsonProperty(FIELD_NAME_EXCEPTION_HISTORY)
44+
private final JobExceptionHistory exceptionHistory;
45+
46+
@JsonCreator
47+
public JobExceptionsInfoWithHistory(
48+
@JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory exceptionHistory) {
49+
this.exceptionHistory = exceptionHistory;
50+
}
51+
52+
@JsonIgnore
53+
public JobExceptionHistory getExceptionHistory() {
54+
return exceptionHistory;
55+
}
56+
57+
// hashCode and equals are necessary for the test classes deriving from
58+
// RestResponseMarshallingTestBase
59+
@Override
60+
public boolean equals(Object o) {
61+
if (this == o) {
62+
return true;
63+
}
64+
if (o == null || getClass() != o.getClass()) {
65+
return false;
66+
}
67+
JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
68+
return Objects.equals(exceptionHistory, that.exceptionHistory);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hash(exceptionHistory);
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return new StringJoiner(", ", JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
79+
.add("exceptionHistory=" + exceptionHistory)
80+
.toString();
81+
}
82+
83+
/** {@code JobExceptionHistory} collects all previously caught errors. */
84+
public static final class JobExceptionHistory {
85+
86+
public static final String FIELD_NAME_ENTRIES = "entries";
87+
public static final String FIELD_NAME_TRUNCATED = "truncated";
88+
89+
@JsonProperty(FIELD_NAME_ENTRIES)
90+
private final List<RootExceptionInfo> entries;
91+
92+
@JsonProperty(FIELD_NAME_TRUNCATED)
93+
private final boolean truncated;
94+
95+
@JsonCreator
96+
public JobExceptionHistory(
97+
@JsonProperty(FIELD_NAME_ENTRIES) List<RootExceptionInfo> entries,
98+
@JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
99+
this.entries = entries;
100+
this.truncated = truncated;
101+
}
102+
103+
@JsonIgnore
104+
public List<RootExceptionInfo> getEntries() {
105+
return entries;
106+
}
107+
108+
@JsonIgnore
109+
public boolean isTruncated() {
110+
return truncated;
111+
}
112+
113+
// hashCode and equals are necessary for the test classes deriving from
114+
// RestResponseMarshallingTestBase
115+
@Override
116+
public boolean equals(Object o) {
117+
if (this == o) {
118+
return true;
119+
}
120+
if (o == null || getClass() != o.getClass()) {
121+
return false;
122+
}
123+
JobExceptionHistory that = (JobExceptionHistory) o;
124+
return this.isTruncated() == that.isTruncated()
125+
&& Objects.equals(entries, that.entries);
126+
}
127+
128+
@Override
129+
public int hashCode() {
130+
return Objects.hash(entries, truncated);
131+
}
132+
133+
@Override
134+
public String toString() {
135+
return new StringJoiner(", ", JobExceptionHistory.class.getSimpleName() + "[", "]")
136+
.add("entries=" + entries)
137+
.add("truncated=" + truncated)
138+
.toString();
139+
}
140+
}
141+
142+
/**
143+
* Json equivalent of {@link
144+
* org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}.
145+
*/
146+
public static class ExceptionInfo {
147+
148+
public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName";
149+
public static final String FIELD_NAME_EXCEPTION_STACKTRACE = "stacktrace";
150+
public static final String FIELD_NAME_EXCEPTION_TIMESTAMP = "timestamp";
151+
public static final String FIELD_NAME_TASK_NAME = "taskName";
152+
public static final String FIELD_NAME_ENDPOINT = "endpoint";
153+
public static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId";
154+
public static final String FIELD_NAME_FAILURE_LABELS = "failureLabels";
155+
156+
@JsonProperty(FIELD_NAME_EXCEPTION_NAME)
157+
private final String exceptionName;
158+
159+
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE)
160+
private final String stacktrace;
161+
162+
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP)
163+
private final long timestamp;
164+
165+
@JsonInclude(NON_NULL)
166+
@JsonProperty(FIELD_NAME_TASK_NAME)
167+
@Nullable
168+
private final String taskName;
169+
170+
@JsonInclude(NON_NULL)
171+
@JsonProperty(FIELD_NAME_ENDPOINT)
172+
@Nullable
173+
private final String endpoint;
174+
175+
@JsonInclude(NON_NULL)
176+
@JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
177+
@Nullable
178+
private final String taskManagerId;
179+
180+
@JsonProperty(FIELD_NAME_FAILURE_LABELS)
181+
private final Map<String, String> failureLabels;
182+
183+
public ExceptionInfo(String exceptionName, String stacktrace, long timestamp) {
184+
this(exceptionName, stacktrace, timestamp, Collections.emptyMap(), null, null, null);
185+
}
186+
187+
@JsonCreator
188+
public ExceptionInfo(
189+
@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
190+
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
191+
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
192+
@JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels,
193+
@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
194+
@JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
195+
@JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) {
196+
this.exceptionName = checkNotNull(exceptionName);
197+
this.stacktrace = checkNotNull(stacktrace);
198+
this.timestamp = timestamp;
199+
this.failureLabels = checkNotNull(failureLabels);
200+
this.taskName = taskName;
201+
this.endpoint = endpoint;
202+
this.taskManagerId = taskManagerId;
203+
}
204+
205+
@JsonIgnore
206+
public String getExceptionName() {
207+
return exceptionName;
208+
}
209+
210+
@JsonIgnore
211+
public String getStacktrace() {
212+
return stacktrace;
213+
}
214+
215+
@JsonIgnore
216+
public long getTimestamp() {
217+
return timestamp;
218+
}
219+
220+
@JsonIgnore
221+
@Nullable
222+
public String getTaskName() {
223+
return taskName;
224+
}
225+
226+
@JsonIgnore
227+
@Nullable
228+
public String getEndpoint() {
229+
return endpoint;
230+
}
231+
232+
@JsonIgnore
233+
@Nullable
234+
public String getTaskManagerId() {
235+
return taskManagerId;
236+
}
237+
238+
@JsonIgnore
239+
public Map<String, String> getFailureLabels() {
240+
return failureLabels;
241+
}
242+
243+
// hashCode and equals are necessary for the test classes deriving from
244+
// RestResponseMarshallingTestBase
245+
@Override
246+
public boolean equals(Object o) {
247+
if (this == o) {
248+
return true;
249+
}
250+
if (o == null || getClass() != o.getClass()) {
251+
return false;
252+
}
253+
ExceptionInfo that = (ExceptionInfo) o;
254+
return exceptionName.equals(that.exceptionName)
255+
&& stacktrace.equals(that.stacktrace)
256+
&& Objects.equals(timestamp, that.timestamp)
257+
&& Objects.equals(failureLabels, that.failureLabels)
258+
&& Objects.equals(taskName, that.taskName)
259+
&& Objects.equals(endpoint, that.endpoint);
260+
}
261+
262+
@Override
263+
public int hashCode() {
264+
return Objects.hash(
265+
exceptionName, stacktrace, timestamp, failureLabels, taskName, endpoint);
266+
}
267+
268+
@Override
269+
public String toString() {
270+
return new StringJoiner(", ", ExceptionInfo.class.getSimpleName() + "[", "]")
271+
.add("exceptionName='" + exceptionName + "'")
272+
.add("stacktrace='" + stacktrace + "'")
273+
.add("timestamp=" + timestamp)
274+
.add("failureLabels=" + failureLabels)
275+
.add("taskName='" + taskName + "'")
276+
.add("endpoint='" + endpoint + "'")
277+
.toString();
278+
}
279+
}
280+
281+
/**
282+
* Json equivalent of {@link
283+
* org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}.
284+
*/
285+
public static class RootExceptionInfo extends ExceptionInfo {
286+
287+
public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS = "concurrentExceptions";
288+
289+
@JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
290+
private final Collection<ExceptionInfo> concurrentExceptions;
291+
292+
public RootExceptionInfo(
293+
String exceptionName,
294+
String stacktrace,
295+
long timestamp,
296+
Map<String, String> failureLabels,
297+
Collection<ExceptionInfo> concurrentExceptions) {
298+
this(
299+
exceptionName,
300+
stacktrace,
301+
timestamp,
302+
failureLabels,
303+
null,
304+
null,
305+
null,
306+
concurrentExceptions);
307+
}
308+
309+
@JsonCreator
310+
public RootExceptionInfo(
311+
@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
312+
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
313+
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
314+
@JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels,
315+
@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
316+
@JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
317+
@JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId,
318+
@JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS)
319+
Collection<ExceptionInfo> concurrentExceptions) {
320+
super(
321+
exceptionName,
322+
stacktrace,
323+
timestamp,
324+
failureLabels,
325+
taskName,
326+
endpoint,
327+
taskManagerId);
328+
this.concurrentExceptions = concurrentExceptions;
329+
}
330+
331+
@JsonIgnore
332+
public Collection<ExceptionInfo> getConcurrentExceptions() {
333+
return concurrentExceptions;
334+
}
335+
336+
// hashCode and equals are necessary for the test classes deriving from
337+
// RestResponseMarshallingTestBase
338+
@Override
339+
public boolean equals(Object o) {
340+
if (this == o) {
341+
return true;
342+
}
343+
if (o == null || getClass() != o.getClass() || !super.equals(o)) {
344+
return false;
345+
}
346+
RootExceptionInfo that = (RootExceptionInfo) o;
347+
return getConcurrentExceptions().equals(that.getConcurrentExceptions());
348+
}
349+
350+
@Override
351+
public int hashCode() {
352+
return Objects.hash(super.hashCode(), getConcurrentExceptions());
353+
}
354+
355+
@Override
356+
public String toString() {
357+
return new StringJoiner(", ", RootExceptionInfo.class.getSimpleName() + "[", "]")
358+
.add("exceptionName='" + getExceptionName() + "'")
359+
.add("stacktrace='" + getStacktrace() + "'")
360+
.add("timestamp=" + getTimestamp())
361+
.add("taskName='" + getTaskName() + "'")
362+
.add("endpoint='" + getEndpoint() + "'")
363+
.add("concurrentExceptions=" + getConcurrentExceptions())
364+
.toString();
365+
}
366+
}
367+
}

0 commit comments

Comments
 (0)