Skip to content

Commit f11956a

Browse files
[FLINK-38148] Support Flink v1.17 when listing job exceptions
1 parent 7880a11 commit f11956a

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
3636
import static org.apache.flink.util.Preconditions.checkNotNull;
3737

38-
/** Copied from Flink 2.0 to handle removed changes. */
38+
/**
39+
* Copied from Flink 2.0 to handle removed changes, with small modifications like: FLINK-38148 that
40+
* allows nullable `failureLabels` to support Flink v1.17
41+
*/
3942
public class JobExceptionsInfoWithHistory implements ResponseBody {
4043

4144
public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory";
@@ -189,14 +192,15 @@ public ExceptionInfo(
189192
@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
190193
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
191194
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
192-
@JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels,
195+
@JsonProperty(FIELD_NAME_FAILURE_LABELS) @Nullable
196+
Map<String, String> failureLabels,
193197
@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
194198
@JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
195199
@JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) {
196200
this.exceptionName = checkNotNull(exceptionName);
197201
this.stacktrace = checkNotNull(stacktrace);
198202
this.timestamp = timestamp;
199-
this.failureLabels = checkNotNull(failureLabels);
203+
this.failureLabels = failureLabels;
200204
this.taskName = taskName;
201205
this.endpoint = endpoint;
202206
this.taskManagerId = taskManagerId;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
6060
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
6161
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
62+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
6263
import org.apache.flink.runtime.rest.messages.MessageHeaders;
6364
import org.apache.flink.runtime.rest.messages.MessageParameters;
6465
import org.apache.flink.runtime.rest.messages.RequestBody;
@@ -1315,6 +1316,36 @@ public void testBlockingDeletionDeleteCallErrorHandling() {
13151316
assertTrue(remaining.toMillis() < 1000);
13161317
}
13171318

1319+
@Test
1320+
public void listingJobExceptionsIsCompatibleWihFlinkV1_17Test() throws Exception {
1321+
JobExceptionsInfoWithHistory exceptionInfoWithNullFailureLabels =
1322+
new JobExceptionsInfoWithHistory(
1323+
new JobExceptionsInfoWithHistory.JobExceptionHistory(
1324+
java.util.Collections.singletonList(
1325+
new JobExceptionsInfoWithHistory.RootExceptionInfo(
1326+
"org.apache.flink.util.FlinkExpectedException",
1327+
"The TaskExecutor is shutting down.",
1328+
1755998006623L,
1329+
null,
1330+
"Source: Events Generator Source (2/2) - execution #0",
1331+
"10.244.0.105:44401",
1332+
"basic-example-taskmanager-1-1",
1333+
java.util.Collections.emptyList())),
1334+
false));
1335+
1336+
var flinkService =
1337+
getTestingService(
1338+
(messageHeaders, messageParameters, requestBody) ->
1339+
CompletableFuture.completedFuture(
1340+
exceptionInfoWithNullFailureLabels));
1341+
assertDoesNotThrow(
1342+
() ->
1343+
flinkService.getJobExceptions(
1344+
TestUtils.buildApplicationCluster(),
1345+
new JobID(),
1346+
new Configuration()));
1347+
}
1348+
13181349
class TestingService extends AbstractFlinkService {
13191350

13201351
RestClusterClient<String> clusterClient;

0 commit comments

Comments
 (0)