Skip to content

Commit 2b7a1d7

Browse files
[FLINK-38148] Support Flink v1.17 when listing job exceptions
1 parent 7880a11 commit 2b7a1d7

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import static org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
3636
import static org.apache.flink.util.Preconditions.checkNotNull;
3737

38-
/** Copied from Flink 2.0 to handle removed changes. */
38+
/**
39+
* Copied from Flink 2.0 to handle removed changes, with small modifications like: FLINK-38148 that
40+
* allows nullable `failureLabels` to support Flink v1.17.
41+
*/
3942
public class JobExceptionsInfoWithHistory implements ResponseBody {
4043

4144
public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory";
@@ -189,14 +192,15 @@ public ExceptionInfo(
189192
@JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName,
190193
@JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace,
191194
@JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp,
192-
@JsonProperty(FIELD_NAME_FAILURE_LABELS) Map<String, String> failureLabels,
195+
@JsonProperty(FIELD_NAME_FAILURE_LABELS) @Nullable
196+
Map<String, String> failureLabels,
193197
@JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName,
194198
@JsonProperty(FIELD_NAME_ENDPOINT) @Nullable String endpoint,
195199
@JsonProperty(FIELD_NAME_TASK_MANAGER_ID) @Nullable String taskManagerId) {
196200
this.exceptionName = checkNotNull(exceptionName);
197201
this.stacktrace = checkNotNull(stacktrace);
198202
this.timestamp = timestamp;
199-
this.failureLabels = checkNotNull(failureLabels);
203+
this.failureLabels = failureLabels;
200204
this.taskName = taskName;
201205
this.endpoint = endpoint;
202206
this.taskManagerId = taskManagerId;

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
6060
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
6161
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
62+
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
6263
import org.apache.flink.runtime.rest.messages.MessageHeaders;
6364
import org.apache.flink.runtime.rest.messages.MessageParameters;
6465
import org.apache.flink.runtime.rest.messages.RequestBody;
@@ -111,6 +112,7 @@
111112
import io.fabric8.kubernetes.client.KubernetesClientException;
112113
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
113114
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
115+
import lombok.SneakyThrows;
114116
import org.junit.jupiter.api.Assertions;
115117
import org.junit.jupiter.api.BeforeEach;
116118
import org.junit.jupiter.api.Test;
@@ -147,6 +149,7 @@
147149
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
148150
import static org.junit.jupiter.api.Assertions.assertEquals;
149151
import static org.junit.jupiter.api.Assertions.assertFalse;
152+
import static org.junit.jupiter.api.Assertions.assertNotNull;
150153
import static org.junit.jupiter.api.Assertions.assertNull;
151154
import static org.junit.jupiter.api.Assertions.assertThrows;
152155
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1315,6 +1318,51 @@ public void testBlockingDeletionDeleteCallErrorHandling() {
13151318
assertTrue(remaining.toMillis() < 1000);
13161319
}
13171320

1321+
@Test
1322+
public void listingJobExceptionsIsCompatibleWihFlinkV1_17Test() throws Exception {
1323+
var flinkV117JsonResponse =
1324+
"{\n"
1325+
+ " \"root-exception\": \"org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.\\n\\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:476)\\n\\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)\\n\\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\\n\\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:127)\\n\\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)\\n\\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)\\n\\tat akka.actor.Actor.aroundReceive(Actor.scala:537)\\n\\tat akka.actor.Actor.aroundReceive$(Actor.scala:535)\\n\\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\\n\\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)\\n\\tat akka.actor.ActorCell.invoke(ActorCell.scala:547)\\n\\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\\n\\tat akka.dispatch.Mailbox.run(Mailbox.scala:231)\\n\\tat akka.dispatch.Mailbox.exec(Mailbox.scala:243)\\n\\tat java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)\\n\",\n"
1326+
+ " \"timestamp\": 1755995361447,\n"
1327+
+ " \"all-exceptions\": [],\n"
1328+
+ " \"truncated\": false,\n"
1329+
+ " \"exceptionHistory\": {\n"
1330+
+ " \"entries\": [\n"
1331+
+ " {\n"
1332+
+ " \"exceptionName\": \"org.apache.flink.util.FlinkExpectedException\",\n"
1333+
+ " \"stacktrace\": \"org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.\\n\\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:476)\\n\\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)\\n\\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\\n\\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:127)\\n\\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)\\n\\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)\\n\\tat akka.actor.Actor.aroundReceive(Actor.scala:537)\\n\\tat akka.actor.Actor.aroundReceive$(Actor.scala:535)\\n\\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\\n\\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)\\n\\tat akka.actor.ActorCell.invoke(ActorCell.scala:547)\\n\\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\\n\\tat akka.dispatch.Mailbox.run(Mailbox.scala:231)\\n\\tat akka.dispatch.Mailbox.exec(Mailbox.scala:243)\\n\\tat java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)\\n\",\n"
1334+
+ " \"timestamp\": 1755995361447,\n"
1335+
+ " \"taskName\": \"Source: Custom Source (2/2) - execution #0\",\n"
1336+
+ " \"location\": \"10.244.0.93:37079\",\n"
1337+
+ " \"taskManagerId\": \"basic-example-taskmanager-1-1\",\n"
1338+
+ " \"concurrentExceptions\": []\n"
1339+
+ " }\n"
1340+
+ " ],\n"
1341+
+ " \"truncated\": false\n"
1342+
+ " }\n"
1343+
+ "}";
1344+
var flinkService =
1345+
getTestingService(
1346+
(messageHeaders, messageParameters, requestBody) ->
1347+
CompletableFuture.completedFuture(
1348+
parseExceptionsJsonResponse(flinkV117JsonResponse)));
1349+
1350+
var jobExceptions =
1351+
flinkService.getJobExceptions(
1352+
TestUtils.buildApplicationCluster(), new JobID(), new Configuration());
1353+
assertNotNull(jobExceptions);
1354+
assertEquals(1, jobExceptions.getExceptionHistory().getEntries().size());
1355+
}
1356+
1357+
@SneakyThrows
1358+
private static JobExceptionsInfoWithHistory parseExceptionsJsonResponse(
1359+
String flinkV117JsonResponse) {
1360+
var jsonNode = RestMapperUtils.getStrictObjectMapper().readTree(flinkV117JsonResponse);
1361+
var jsonParser = RestMapperUtils.getStrictObjectMapper().treeAsTokens(jsonNode);
1362+
return RestMapperUtils.getFlexibleObjectMapper()
1363+
.readValue(jsonParser, JobExceptionsInfoWithHistory.class);
1364+
}
1365+
13181366
class TestingService extends AbstractFlinkService {
13191367

13201368
RestClusterClient<String> clusterClient;

0 commit comments

Comments
 (0)