Skip to content

Commit dfd66c2

Browse files
sharath1709gyfora
authored andcommitted
[FLINK-36645] [flink-autoscaler] Gracefully handle null execution plan from autoscaler
1 parent e7ce6df commit dfd66c2

File tree

3 files changed

+135
-0
lines changed

3 files changed

+135
-0
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,8 @@ public class NotReadyException extends RuntimeException {
2323
public NotReadyException(Exception cause) {
2424
super(cause);
2525
}
26+
27+
public NotReadyException(String message) {
28+
super(message);
29+
}
2630
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.topology;
1919

20+
import org.apache.flink.autoscaler.exceptions.NotReadyException;
2021
import org.apache.flink.runtime.instance.SlotSharingGroupId;
2122
import org.apache.flink.runtime.jobgraph.JobVertexID;
2223

@@ -150,6 +151,10 @@ public static JobTopology fromJsonPlan(
150151
ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
151152
ArrayNode nodes = (ArrayNode) plan.get("nodes");
152153

154+
if (nodes == null || nodes.isEmpty()) {
155+
throw new NotReadyException("No nodes found in the plan, job is not ready yet");
156+
}
157+
153158
var vertexInfo = new HashSet<VertexInfo>();
154159

155160
for (JsonNode node : nodes) {

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.autoscaler.exceptions.NotReadyException;
2223
import org.apache.flink.autoscaler.metrics.FlinkMetric;
2324
import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
2425
import org.apache.flink.autoscaler.topology.IOMetrics;
@@ -228,6 +229,131 @@ public void testJobTopologyParsingFromJobDetails() throws Exception {
228229
new JobTopology(source, sink), metricsCollector.getJobTopology(jobDetailsInfo));
229230
}
230231

232+
@Test
233+
public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
234+
String s =
235+
"{\n"
236+
+ " \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n"
237+
+ " \"name\": \"State machine job\",\n"
238+
+ " \"isStoppable\": false,\n"
239+
+ " \"state\": \"RUNNING\",\n"
240+
+ " \"start-time\": 1707893512027,\n"
241+
+ " \"end-time\": -1,\n"
242+
+ " \"duration\": 214716,\n"
243+
+ " \"maxParallelism\": -1,\n"
244+
+ " \"now\": 1707893726743,\n"
245+
+ " \"timestamps\": {\n"
246+
+ " \"SUSPENDED\": 0,\n"
247+
+ " \"CREATED\": 1707893512139,\n"
248+
+ " \"FAILING\": 0,\n"
249+
+ " \"FAILED\": 0,\n"
250+
+ " \"INITIALIZING\": 1707893512027,\n"
251+
+ " \"RECONCILING\": 0,\n"
252+
+ " \"RUNNING\": 1707893512217,\n"
253+
+ " \"RESTARTING\": 0,\n"
254+
+ " \"CANCELLING\": 0,\n"
255+
+ " \"FINISHED\": 0,\n"
256+
+ " \"CANCELED\": 0\n"
257+
+ " },\n"
258+
+ " \"vertices\": [\n"
259+
+ " {\n"
260+
+ " \"id\": \"bc764cd8ddf7a0cff126f51c16239658\",\n"
261+
+ " \"name\": \"Source: Custom Source\",\n"
262+
+ " \"maxParallelism\": 128,\n"
263+
+ " \"parallelism\": 2,\n"
264+
+ " \"status\": \"FINISHED\",\n"
265+
+ " \"start-time\": 1707893517277,\n"
266+
+ " \"end-time\": -1,\n"
267+
+ " \"duration\": 209466,\n"
268+
+ " \"tasks\": {\n"
269+
+ " \"DEPLOYING\": 0,\n"
270+
+ " \"INITIALIZING\": 0,\n"
271+
+ " \"SCHEDULED\": 0,\n"
272+
+ " \"CANCELING\": 0,\n"
273+
+ " \"CANCELED\": 0,\n"
274+
+ " \"RECONCILING\": 0,\n"
275+
+ " \"RUNNING\": 2,\n"
276+
+ " \"FAILED\": 0,\n"
277+
+ " \"CREATED\": 0,\n"
278+
+ " \"FINISHED\": 0\n"
279+
+ " },\n"
280+
+ " \"metrics\": {\n"
281+
+ " \"read-bytes\": 0,\n"
282+
+ " \"read-bytes-complete\": true,\n"
283+
+ " \"write-bytes\": 4036982,\n"
284+
+ " \"write-bytes-complete\": true,\n"
285+
+ " \"read-records\": 0,\n"
286+
+ " \"read-records-complete\": true,\n"
287+
+ " \"write-records\": 291629,\n"
288+
+ " \"write-records-complete\": true,\n"
289+
+ " \"accumulated-backpressured-time\": 0,\n"
290+
+ " \"accumulated-idle-time\": 0,\n"
291+
+ " \"accumulated-busy-time\": \"NaN\"\n"
292+
+ " }\n"
293+
+ " },\n"
294+
+ " {\n"
295+
+ " \"id\": \"20ba6b65f97481d5570070de90e4e791\",\n"
296+
+ " \"name\": \"Flat Map -> Sink: Print to Std. Out\",\n"
297+
+ " \"maxParallelism\": 128,\n"
298+
+ " \"parallelism\": 2,\n"
299+
+ " \"status\": \"RUNNING\",\n"
300+
+ " \"start-time\": 1707893517280,\n"
301+
+ " \"end-time\": -1,\n"
302+
+ " \"duration\": 209463,\n"
303+
+ " \"tasks\": {\n"
304+
+ " \"DEPLOYING\": 0,\n"
305+
+ " \"INITIALIZING\": 0,\n"
306+
+ " \"SCHEDULED\": 0,\n"
307+
+ " \"CANCELING\": 0,\n"
308+
+ " \"CANCELED\": 0,\n"
309+
+ " \"RECONCILING\": 0,\n"
310+
+ " \"RUNNING\": 2,\n"
311+
+ " \"FAILED\": 0,\n"
312+
+ " \"CREATED\": 0,\n"
313+
+ " \"FINISHED\": 0\n"
314+
+ " },\n"
315+
+ " \"metrics\": {\n"
316+
+ " \"read-bytes\": 4078629,\n"
317+
+ " \"read-bytes-complete\": true,\n"
318+
+ " \"write-bytes\": 0,\n"
319+
+ " \"write-bytes-complete\": true,\n"
320+
+ " \"read-records\": 291532,\n"
321+
+ " \"read-records-complete\": true,\n"
322+
+ " \"write-records\": 1,\n"
323+
+ " \"write-records-complete\": true,\n"
324+
+ " \"accumulated-backpressured-time\": 0,\n"
325+
+ " \"accumulated-idle-time\": 407702,\n"
326+
+ " \"accumulated-busy-time\": 2\n"
327+
+ " }\n"
328+
+ " }\n"
329+
+ " ],\n"
330+
+ " \"status-counts\": {\n"
331+
+ " \"DEPLOYING\": 0,\n"
332+
+ " \"INITIALIZING\": 0,\n"
333+
+ " \"SCHEDULED\": 0,\n"
334+
+ " \"CANCELING\": 0,\n"
335+
+ " \"CANCELED\": 0,\n"
336+
+ " \"RECONCILING\": 0,\n"
337+
+ " \"RUNNING\": 2,\n"
338+
+ " \"FAILED\": 0,\n"
339+
+ " \"CREATED\": 0,\n"
340+
+ " \"FINISHED\": 0\n"
341+
+ " },\n"
342+
+ " \"plan\": {\n"
343+
+ " \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n"
344+
+ " \"name\": \"State machine job\",\n"
345+
+ " \"type\": \"STREAMING\",\n"
346+
+ " \"nodes\": [\n"
347+
+ " ]\n"
348+
+ " }\n"
349+
+ "}";
350+
JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class);
351+
352+
var metricsCollector = new RestApiMetricsCollector<>();
353+
assertThrows(
354+
NotReadyException.class, () -> metricsCollector.getJobTopology(jobDetailsInfo));
355+
}
356+
231357
@Test
232358
public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception {
233359
String s =

0 commit comments

Comments
 (0)