Skip to content

Commit fdab1fa

Browse files
[FLINK-38148][FlinkService] Json based test
1 parent 94ae35a commit fdab1fa

File tree

1 file changed

+39
-22
lines changed

1 file changed

+39
-22
lines changed

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import io.fabric8.kubernetes.client.KubernetesClientException;
113113
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
114114
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
115+
import lombok.SneakyThrows;
115116
import org.junit.jupiter.api.Assertions;
116117
import org.junit.jupiter.api.BeforeEach;
117118
import org.junit.jupiter.api.Test;
@@ -148,6 +149,7 @@
148149
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
149150
import static org.junit.jupiter.api.Assertions.assertEquals;
150151
import static org.junit.jupiter.api.Assertions.assertFalse;
152+
import static org.junit.jupiter.api.Assertions.assertNotNull;
151153
import static org.junit.jupiter.api.Assertions.assertNull;
152154
import static org.junit.jupiter.api.Assertions.assertThrows;
153155
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1318,32 +1320,47 @@ public void testBlockingDeletionDeleteCallErrorHandling() {
13181320

13191321
@Test
13201322
public void listingJobExceptionsIsCompatibleWihFlinkV1_17Test() throws Exception {
1321-
JobExceptionsInfoWithHistory exceptionInfoWithNullFailureLabels =
1322-
new JobExceptionsInfoWithHistory(
1323-
new JobExceptionsInfoWithHistory.JobExceptionHistory(
1324-
java.util.Collections.singletonList(
1325-
new JobExceptionsInfoWithHistory.RootExceptionInfo(
1326-
"org.apache.flink.util.FlinkExpectedException",
1327-
"The TaskExecutor is shutting down.",
1328-
1755998006623L,
1329-
null,
1330-
"Source: Events Generator Source (2/2) - execution #0",
1331-
"10.244.0.105:44401",
1332-
"basic-example-taskmanager-1-1",
1333-
java.util.Collections.emptyList())),
1334-
false));
1335-
1323+
var flinkV117JsonResponse =
1324+
"{\n"
1325+
+ " \"root-exception\": \"org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.\\n\\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:476)\\n\\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)\\n\\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\\n\\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:127)\\n\\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)\\n\\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)\\n\\tat akka.actor.Actor.aroundReceive(Actor.scala:537)\\n\\tat akka.actor.Actor.aroundReceive$(Actor.scala:535)\\n\\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\\n\\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)\\n\\tat akka.actor.ActorCell.invoke(ActorCell.scala:547)\\n\\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\\n\\tat akka.dispatch.Mailbox.run(Mailbox.scala:231)\\n\\tat akka.dispatch.Mailbox.exec(Mailbox.scala:243)\\n\\tat java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)\\n\",\n"
1326+
+ " \"timestamp\": 1755995361447,\n"
1327+
+ " \"all-exceptions\": [],\n"
1328+
+ " \"truncated\": false,\n"
1329+
+ " \"exceptionHistory\": {\n"
1330+
+ " \"entries\": [\n"
1331+
+ " {\n"
1332+
+ " \"exceptionName\": \"org.apache.flink.util.FlinkExpectedException\",\n"
1333+
+ " \"stacktrace\": \"org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.\\n\\tat org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:476)\\n\\tat org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)\\n\\tat org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)\\n\\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\\n\\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\\n\\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:127)\\n\\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)\\n\\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)\\n\\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)\\n\\tat akka.actor.Actor.aroundReceive(Actor.scala:537)\\n\\tat akka.actor.Actor.aroundReceive$(Actor.scala:535)\\n\\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\\n\\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)\\n\\tat akka.actor.ActorCell.invoke(ActorCell.scala:547)\\n\\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\\n\\tat akka.dispatch.Mailbox.run(Mailbox.scala:231)\\n\\tat akka.dispatch.Mailbox.exec(Mailbox.scala:243)\\n\\tat java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)\\n\\tat java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)\\n\",\n"
1334+
+ " \"timestamp\": 1755995361447,\n"
1335+
+ " \"taskName\": \"Source: Custom Source (2/2) - execution #0\",\n"
1336+
+ " \"location\": \"10.244.0.93:37079\",\n"
1337+
+ " \"taskManagerId\": \"basic-example-taskmanager-1-1\",\n"
1338+
+ " \"concurrentExceptions\": []\n"
1339+
+ " }\n"
1340+
+ " ],\n"
1341+
+ " \"truncated\": false\n"
1342+
+ " }\n"
1343+
+ "}";
13361344
var flinkService =
13371345
getTestingService(
13381346
(messageHeaders, messageParameters, requestBody) ->
13391347
CompletableFuture.completedFuture(
1340-
exceptionInfoWithNullFailureLabels));
1341-
assertDoesNotThrow(
1342-
() ->
1343-
flinkService.getJobExceptions(
1344-
TestUtils.buildApplicationCluster(),
1345-
new JobID(),
1346-
new Configuration()));
1348+
parseExceptionsJsonResponse(flinkV117JsonResponse)));
1349+
1350+
var jobExceptions =
1351+
flinkService.getJobExceptions(
1352+
TestUtils.buildApplicationCluster(), new JobID(), new Configuration());
1353+
assertNotNull(jobExceptions);
1354+
assertEquals(1, jobExceptions.getExceptionHistory().getEntries().size());
1355+
}
1356+
1357+
@SneakyThrows
1358+
private static JobExceptionsInfoWithHistory parseExceptionsJsonResponse(
1359+
String flinkV117JsonResponse) {
1360+
var jsonNode = RestMapperUtils.getStrictObjectMapper().readTree(flinkV117JsonResponse);
1361+
var jsonParser = RestMapperUtils.getStrictObjectMapper().treeAsTokens(jsonNode);
1362+
return RestMapperUtils.getFlexibleObjectMapper()
1363+
.readValue(jsonParser, JobExceptionsInfoWithHistory.class);
13471364
}
13481365

13491366
class TestingService extends AbstractFlinkService {

0 commit comments

Comments
 (0)