Skip to content

Commit 9fc75ca

Browse files
[FLINK-37370] [Observer] Finished batch jobs throw ReconciliationException and never reach FINISHED in the CR
1 parent e636a00 commit 9fc75ca

File tree

6 files changed

+56
-4
lines changed

6 files changed

+56
-4
lines changed

flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.api.common.JobStatus;
2424
import org.apache.flink.runtime.execution.ExecutionState;
2525
import org.apache.flink.runtime.instance.SlotSharingGroupId;
26+
import org.apache.flink.runtime.jobgraph.JobType;
2627
import org.apache.flink.runtime.jobgraph.JobVertexID;
2728
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
2829
import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -45,8 +46,8 @@
4546
import java.util.Map;
4647
import java.util.Objects;
4748

48-
/** Copied from Flink. Should be removed once the client dependency is upgraded to 1.20. */
49-
/** The difference compared to 1.18 is that slot sharing group is optional here. */
49+
/** Copied from Flink. */
50+
/** The difference compared to 1.20 is that slot sharing group and job type are optional here. */
5051
public class JobDetailsInfo implements ResponseBody {
5152

5253
public static final String FIELD_NAME_JOB_ID = "jid";
@@ -57,6 +58,8 @@ public class JobDetailsInfo implements ResponseBody {
5758

5859
public static final String FIELD_NAME_JOB_STATUS = "state";
5960

61+
public static final String FIELD_NAME_JOB_TYPE = "job-type";
62+
6063
public static final String FIELD_NAME_START_TIME = "start-time";
6164

6265
public static final String FIELD_NAME_END_TIME = "end-time";
@@ -89,6 +92,9 @@ public class JobDetailsInfo implements ResponseBody {
8992
@JsonProperty(FIELD_NAME_JOB_STATUS)
9093
private final JobStatus jobStatus;
9194

95+
@JsonProperty(FIELD_NAME_JOB_TYPE)
96+
private final JobType jobType;
97+
9298
@JsonProperty(FIELD_NAME_START_TIME)
9399
private final long startTime;
94100

@@ -123,6 +129,7 @@ public JobDetailsInfo(
123129
@JsonProperty(FIELD_NAME_JOB_NAME) String name,
124130
@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
125131
@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
132+
@JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
126133
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
127134
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
128135
@JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -138,6 +145,7 @@ public JobDetailsInfo(
138145
this.name = Preconditions.checkNotNull(name);
139146
this.isStoppable = isStoppable;
140147
this.jobStatus = Preconditions.checkNotNull(jobStatus);
148+
this.jobType = jobType;
141149
this.startTime = startTime;
142150
this.endTime = endTime;
143151
this.duration = duration;
@@ -167,6 +175,7 @@ public boolean equals(Object o) {
167175
&& Objects.equals(jobId, that.jobId)
168176
&& Objects.equals(name, that.name)
169177
&& jobStatus == that.jobStatus
178+
&& jobType == that.jobType
170179
&& Objects.equals(timestamps, that.timestamps)
171180
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
172181
&& Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
@@ -180,6 +189,7 @@ public int hashCode() {
180189
name,
181190
isStoppable,
182191
jobStatus,
192+
jobType,
183193
startTime,
184194
endTime,
185195
duration,
@@ -211,6 +221,11 @@ public JobStatus getJobStatus() {
211221
return jobStatus;
212222
}
213223

224+
@JsonIgnore
225+
public JobType getJobType() {
226+
return jobType;
227+
}
228+
214229
@JsonIgnore
215230
public long getStartTime() {
216231
return startTime;
@@ -261,7 +276,7 @@ public String getJsonPlan() {
261276
// ---------------------------------------------------
262277

263278
/** Detailed information about a job vertex. */
264-
// @Schema(name = "JobDetailsVertexInfo")
279+
// @Schema(name = "JobDetailsVertexInfo")
265280
public static final class JobVertexDetailsInfo {
266281

267282
public static final String FIELD_NAME_JOB_VERTEX_ID = "id";

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.client.program.rest.RestClusterClient;
2929
import org.apache.flink.configuration.Configuration;
3030
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
31+
import org.apache.flink.runtime.jobgraph.JobType;
3132
import org.apache.flink.runtime.jobgraph.JobVertexID;
3233
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
3334
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -79,6 +80,7 @@ public void testJobTopologyParsingFromJobDetails() throws Exception {
7980
+ " \"name\": \"State machine job\",\n"
8081
+ " \"isStoppable\": false,\n"
8182
+ " \"state\": \"RUNNING\",\n"
83+
+ " \"job-type\": \"STREAMING\",\n"
8284
+ " \"start-time\": 1707893512027,\n"
8385
+ " \"end-time\": -1,\n"
8486
+ " \"duration\": 214716,\n"
@@ -237,6 +239,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
237239
+ " \"name\": \"State machine job\",\n"
238240
+ " \"isStoppable\": false,\n"
239241
+ " \"state\": \"RUNNING\",\n"
242+
+ " \"job-type\": \"STREAMING\",\n"
240243
+ " \"start-time\": 1707893512027,\n"
241244
+ " \"end-time\": -1,\n"
242245
+ " \"duration\": 214716,\n"
@@ -357,7 +360,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
357360
@Test
358361
public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception {
359362
String s =
360-
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
363+
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"job-type\":\"STREAMING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
361364
JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class);
362365

363366
var metricsCollector = new RestApiMetricsCollector();
@@ -372,6 +375,7 @@ public void testJobUpdateTsLogic() {
372375
"",
373376
false,
374377
org.apache.flink.api.common.JobStatus.RUNNING,
378+
JobType.STREAMING,
375379
0,
376380
0,
377381
0,

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.configuration.MemorySize;
2525
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
26+
import org.apache.flink.runtime.jobgraph.JobType;
2627
import org.apache.flink.runtime.metrics.MetricRegistry;
2728
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
2829
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
@@ -96,6 +97,7 @@ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
9697
"",
9798
false,
9899
JobStatus.RUNNING,
100+
JobType.STREAMING,
99101
0,
100102
0,
101103
0,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3838
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
3939
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
40+
import org.apache.flink.runtime.jobgraph.JobType;
4041
import org.apache.flink.util.CollectionUtil;
4142

4243
import org.slf4j.Logger;
@@ -83,6 +84,11 @@ public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {
8384
var jobStatus = resource.getStatus().getJobStatus();
8485
var jobId = jobStatus.getJobId();
8586

87+
if (isBatchJob(ctx, jobId)) {
88+
LOG.debug("Skipping checkpoint observation for BATCH job");
89+
return;
90+
}
91+
8692
// If any manual or periodic savepoint is in progress, observe it
8793
if (SnapshotUtils.savepointInProgress(jobStatus)) {
8894
observeTriggeredSavepoint(ctx, jobId);
@@ -458,4 +464,16 @@ private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId)
458464
}
459465
});
460466
}
467+
468+
private boolean isBatchJob(FlinkResourceContext<CR> ctx, String jobId) {
469+
try {
470+
var jobDetails =
471+
ctx.getFlinkService()
472+
.getJobDetails(JobID.fromHexString(jobId), ctx.getObserveConfig());
473+
return jobDetails.getJobType() == JobType.BATCH;
474+
} catch (Exception e) {
475+
LOG.debug("Could not determine job type, assuming streaming job", e);
476+
return false;
477+
}
478+
}
461479
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
7676
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
7777
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
78+
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
7879
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
7980
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
8081
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -837,6 +838,15 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
837838
(c, e) -> new StandaloneClientHAServices(restServerAddress));
838839
}
839840

841+
@Override
842+
public JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception {
843+
try (var clusterClient = getClusterClient(conf)) {
844+
return clusterClient
845+
.getJobDetails(jobId)
846+
.get(operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
847+
}
848+
}
849+
840850
@VisibleForTesting
841851
protected void runJar(
842852
JobSpec job,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
3434
import org.apache.flink.runtime.client.JobStatusMessage;
3535
import org.apache.flink.runtime.jobmaster.JobResult;
36+
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
3637

3738
import io.fabric8.kubernetes.api.model.ObjectMeta;
3839
import io.fabric8.kubernetes.api.model.PodList;
@@ -127,6 +128,8 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me
127128

128129
RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
129130

131+
JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception;
132+
130133
/** Result of a cancel operation. */
131134
@AllArgsConstructor
132135
class CancelResult {

0 commit comments

Comments
 (0)