From 9fc75ca03751df1499f211d9c76900bc933f6deb Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Fri, 21 Feb 2025 12:30:27 -0500 Subject: [PATCH 1/3] [FLINK-37370] [Observer] Finished batch jobs throw ReconciliationException and never reach FINISHED in the CR --- .../rest/messages/job/JobDetailsInfo.java | 21 ++++++++++++++++--- .../ScalingMetricCollectorTest.java | 6 +++++- .../autoscaler/TestingAutoscalerUtils.java | 2 ++ .../operator/observer/SnapshotObserver.java | 18 ++++++++++++++++ .../service/AbstractFlinkService.java | 10 +++++++++ .../operator/service/FlinkService.java | 3 +++ 6 files changed, 56 insertions(+), 4 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 8df9142464..4453da751c 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.ResponseBody; @@ -45,8 +46,8 @@ import java.util.Map; import java.util.Objects; -/** Copied from Flink. Should be removed once the client dependency is upgraded to 1.20. */ -/** The difference compared to 1.18 is that slot sharing group is optional here. */ +/** Copied from Flink. */ +/** The difference compared to 1.20 is that slot sharing group and job type are optional here. */ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JOB_ID = "jid"; @@ -57,6 +58,8 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JOB_STATUS = "state"; + public static final String FIELD_NAME_JOB_TYPE = "job-type"; + public static final String FIELD_NAME_START_TIME = "start-time"; public static final String FIELD_NAME_END_TIME = "end-time"; @@ -89,6 +92,9 @@ public class JobDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_JOB_STATUS) private final JobStatus jobStatus; + @JsonProperty(FIELD_NAME_JOB_TYPE) + private final JobType jobType; + @JsonProperty(FIELD_NAME_START_TIME) private final long startTime; @@ -123,6 +129,7 @@ public JobDetailsInfo( @JsonProperty(FIELD_NAME_JOB_NAME) String name, @JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable, @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus, + @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType, @JsonProperty(FIELD_NAME_START_TIME) long startTime, @JsonProperty(FIELD_NAME_END_TIME) long endTime, @JsonProperty(FIELD_NAME_DURATION) long duration, @@ -138,6 +145,7 @@ public JobDetailsInfo( this.name = Preconditions.checkNotNull(name); this.isStoppable = isStoppable; this.jobStatus = Preconditions.checkNotNull(jobStatus); + this.jobType = jobType; this.startTime = startTime; this.endTime = endTime; this.duration = duration; @@ -167,6 +175,7 @@ public boolean equals(Object o) { && Objects.equals(jobId, that.jobId) && Objects.equals(name, that.name) && jobStatus == that.jobStatus + && jobType == that.jobType && Objects.equals(timestamps, that.timestamps) && Objects.equals(jobVertexInfos, that.jobVertexInfos) && Objects.equals(jobVerticesPerState, that.jobVerticesPerState) @@ -180,6 +189,7 @@ public int hashCode() { name, isStoppable, jobStatus, + jobType, startTime, endTime, duration, @@ -211,6 +221,11 @@ public JobStatus getJobStatus() { return jobStatus; } + @JsonIgnore + public JobType getJobType() { + return jobType; + } + @JsonIgnore public long getStartTime() { return startTime; @@ -261,7 +276,7 @@ public String getJsonPlan() { // --------------------------------------------------- /** Detailed information about a job vertex. */ - // @Schema(name = "JobDetailsVertexInfo") + // @Schema(name = "JobDetailsVertexInfo") public static final class JobVertexDetailsInfo { public static final String FIELD_NAME_JOB_VERTEX_ID = "id"; diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java index 70e3f539f6..5ae75358b8 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -79,6 +80,7 @@ public void testJobTopologyParsingFromJobDetails() throws Exception { + " \"name\": \"State machine job\",\n" + " \"isStoppable\": false,\n" + " \"state\": \"RUNNING\",\n" + + " \"job-type\": \"STREAMING\",\n" + " \"start-time\": 1707893512027,\n" + " \"end-time\": -1,\n" + " \"duration\": 214716,\n" @@ -237,6 +239,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception { + " \"name\": \"State machine job\",\n" + " \"isStoppable\": false,\n" + " \"state\": \"RUNNING\",\n" + + " \"job-type\": \"STREAMING\",\n" + " \"start-time\": 1707893512027,\n" + " \"end-time\": -1,\n" + " \"duration\": 214716,\n" @@ -357,7 +360,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception { @Test public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception { String s = - "{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map
+- Sink: Print to Std. Out
\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source
\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n"; + "{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"job-type\":\"STREAMING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map
+- Sink: Print to Std. Out
\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source
\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n"; JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class); var metricsCollector = new RestApiMetricsCollector(); @@ -372,6 +375,7 @@ public void testJobUpdateTsLogic() { "", false, org.apache.flink.api.common.JobStatus.RUNNING, + JobType.STREAMING, 0, 0, 0, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java index d1b18b412a..cbae86658f 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; @@ -96,6 +97,7 @@ public CompletableFuture getJobDetails(JobID jobId) { "", false, JobStatus.RUNNING, + JobType.STREAMING, 0, 0, 0, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java index 4255b09a1b..0e759dd255 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java @@ -37,6 +37,7 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; +import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.util.CollectionUtil; import org.slf4j.Logger; @@ -83,6 +84,11 @@ public void observeSavepointStatus(FlinkResourceContext ctx) { var jobStatus = resource.getStatus().getJobStatus(); var jobId = jobStatus.getJobId(); + if (isBatchJob(ctx, jobId)) { + LOG.debug("Skipping checkpoint observation for BATCH job"); + return; + } + // If any manual or periodic savepoint is in progress, observe it if (SnapshotUtils.savepointInProgress(jobStatus)) { observeTriggeredSavepoint(ctx, jobId); @@ -458,4 +464,16 @@ private void observeLatestCheckpoint(FlinkResourceContext ctx, String jobId) } }); } + + private boolean isBatchJob(FlinkResourceContext ctx, String jobId) { + try { + var jobDetails = + ctx.getFlinkService() + .getJobDetails(JobID.fromHexString(jobId), ctx.getObserveConfig()); + return jobDetails.getJobType() == JobType.BATCH; + } catch (Exception e) { + LOG.debug("Could not determine job type, assuming streaming job", e); + return false; + } + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 4d7634ea4c..02c54b8344 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -75,6 +75,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; @@ -837,6 +838,15 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc (c, e) -> new StandaloneClientHAServices(restServerAddress)); } + @Override + public JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception { + try (var clusterClient = getClusterClient(conf)) { + return clusterClient + .getJobDetails(jobId) + .get(operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS); + } + } + @VisibleForTesting protected void runJar( JobSpec job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index 168ea4c1f4..e1cc293104 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.PodList; @@ -127,6 +128,8 @@ Map getMetrics(Configuration conf, String jobId, List me RestClusterClient getClusterClient(Configuration conf) throws Exception; + JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception; + /** Result of a cancel operation. */ @AllArgsConstructor class CancelResult { From 71f53d88ec062767de8e952297a304b2f963aa6c Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Mon, 24 Feb 2025 12:17:54 -0500 Subject: [PATCH 2/3] Handle exception instead and add observer test --- .../rest/messages/job/JobDetailsInfo.java | 21 ++------ .../ScalingMetricCollectorTest.java | 6 +-- .../autoscaler/TestingAutoscalerUtils.java | 2 - .../operator/observer/SnapshotObserver.java | 52 +++++++++---------- .../service/AbstractFlinkService.java | 10 ---- .../operator/service/FlinkService.java | 3 -- .../operator/TestingFlinkService.java | 9 ++++ .../deployment/ApplicationObserverTest.java | 31 +++++++++++ 8 files changed, 68 insertions(+), 66 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java index 4453da751c..8df9142464 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.ResponseBody; @@ -46,8 +45,8 @@ import java.util.Map; import java.util.Objects; -/** Copied from Flink. */ -/** The difference compared to 1.20 is that slot sharing group and job type are optional here. */ +/** Copied from Flink. Should be removed once the client dependency is upgraded to 1.20. */ +/** The difference compared to 1.18 is that slot sharing group is optional here. */ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JOB_ID = "jid"; @@ -58,8 +57,6 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JOB_STATUS = "state"; - public static final String FIELD_NAME_JOB_TYPE = "job-type"; - public static final String FIELD_NAME_START_TIME = "start-time"; public static final String FIELD_NAME_END_TIME = "end-time"; @@ -92,9 +89,6 @@ public class JobDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_JOB_STATUS) private final JobStatus jobStatus; - @JsonProperty(FIELD_NAME_JOB_TYPE) - private final JobType jobType; - @JsonProperty(FIELD_NAME_START_TIME) private final long startTime; @@ -129,7 +123,6 @@ public JobDetailsInfo( @JsonProperty(FIELD_NAME_JOB_NAME) String name, @JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable, @JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus, - @JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType, @JsonProperty(FIELD_NAME_START_TIME) long startTime, @JsonProperty(FIELD_NAME_END_TIME) long endTime, @JsonProperty(FIELD_NAME_DURATION) long duration, @@ -145,7 +138,6 @@ public JobDetailsInfo( this.name = Preconditions.checkNotNull(name); this.isStoppable = isStoppable; this.jobStatus = Preconditions.checkNotNull(jobStatus); - this.jobType = jobType; this.startTime = startTime; this.endTime = endTime; this.duration = duration; @@ -175,7 +167,6 @@ public boolean equals(Object o) { && Objects.equals(jobId, that.jobId) && Objects.equals(name, that.name) && jobStatus == that.jobStatus - && jobType == that.jobType && Objects.equals(timestamps, that.timestamps) && Objects.equals(jobVertexInfos, that.jobVertexInfos) && Objects.equals(jobVerticesPerState, that.jobVerticesPerState) @@ -189,7 +180,6 @@ public int hashCode() { name, isStoppable, jobStatus, - jobType, startTime, endTime, duration, @@ -221,11 +211,6 @@ public JobStatus getJobStatus() { return jobStatus; } - @JsonIgnore - public JobType getJobType() { - return jobType; - } - @JsonIgnore public long getStartTime() { return startTime; @@ -276,7 +261,7 @@ public String getJsonPlan() { // --------------------------------------------------- /** Detailed information about a job vertex. */ - // @Schema(name = "JobDetailsVertexInfo") + // @Schema(name = "JobDetailsVertexInfo") public static final class JobVertexDetailsInfo { public static final String FIELD_NAME_JOB_VERTEX_ID = "id"; diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java index 5ae75358b8..70e3f539f6 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java @@ -28,7 +28,6 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; -import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -80,7 +79,6 @@ public void testJobTopologyParsingFromJobDetails() throws Exception { + " \"name\": \"State machine job\",\n" + " \"isStoppable\": false,\n" + " \"state\": \"RUNNING\",\n" - + " \"job-type\": \"STREAMING\",\n" + " \"start-time\": 1707893512027,\n" + " \"end-time\": -1,\n" + " \"duration\": 214716,\n" @@ -239,7 +237,6 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception { + " \"name\": \"State machine job\",\n" + " \"isStoppable\": false,\n" + " \"state\": \"RUNNING\",\n" - + " \"job-type\": \"STREAMING\",\n" + " \"start-time\": 1707893512027,\n" + " \"end-time\": -1,\n" + " \"duration\": 214716,\n" @@ -360,7 +357,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception { @Test public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception { String s = - "{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"job-type\":\"STREAMING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map
+- Sink: Print to Std. Out
\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source
\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n"; + "{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map
+- Sink: Print to Std. Out
\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source
\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n"; JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class); var metricsCollector = new RestApiMetricsCollector(); @@ -375,7 +372,6 @@ public void testJobUpdateTsLogic() { "", false, org.apache.flink.api.common.JobStatus.RUNNING, - JobType.STREAMING, 0, 0, 0, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java index cbae86658f..d1b18b412a 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; -import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; @@ -97,7 +96,6 @@ public CompletableFuture getJobDetails(JobID jobId) { "", false, JobStatus.RUNNING, - JobType.STREAMING, 0, 0, 0, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java index 0e759dd255..703ada4b4f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java @@ -37,8 +37,9 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; -import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,11 +85,6 @@ public void observeSavepointStatus(FlinkResourceContext ctx) { var jobStatus = resource.getStatus().getJobStatus(); var jobId = jobStatus.getJobId(); - if (isBatchJob(ctx, jobId)) { - LOG.debug("Skipping checkpoint observation for BATCH job"); - return; - } - // If any manual or periodic savepoint is in progress, observe it if (SnapshotUtils.savepointInProgress(jobStatus)) { observeTriggeredSavepoint(ctx, jobId); @@ -447,33 +443,33 @@ private long getMaxCountForSnapshotType( } private void observeLatestCheckpoint(FlinkResourceContext ctx, String jobId) { - var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); - ctx.getFlinkService() - .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) - .ifPresentOrElse( - snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), - () -> { - if (ReconciliationUtils.isJobCancelled(status)) { - // For cancelled jobs the observed savepoint is always definite, - // so if empty we know the job doesn't have any - // checkpoints/savepoints - jobStatus.setUpgradeSavepointPath(null); - } - }); - } - - private boolean isBatchJob(FlinkResourceContext ctx, String jobId) { try { - var jobDetails = - ctx.getFlinkService() - .getJobDetails(JobID.fromHexString(jobId), ctx.getObserveConfig()); - return jobDetails.getJobType() == JobType.BATCH; + ctx.getFlinkService() + .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) + .ifPresentOrElse( + snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), + () -> { + if (ReconciliationUtils.isJobCancelled(status)) { + // For cancelled jobs the observed savepoint is always definite, + // so if empty we know the job doesn't have any + // checkpoints/savepoints + jobStatus.setUpgradeSavepointPath(null); + } + }); } catch (Exception e) { - LOG.debug("Could not determine job type, assuming streaming job", e); - return false; + if (ExceptionUtils.findThrowable(e, RestClientException.class) + .map(ex -> ex.getMessage().contains("Checkpointing has not been enabled")) + .orElse(false)) { + LOG.warn( + "Checkpointing not enabled for job {}, skipping checkpoint observation", + jobId, + e); + return; + } + throw e; } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 02c54b8344..4d7634ea4c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -75,7 +75,6 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders; -import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest; import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders; @@ -838,15 +837,6 @@ public RestClusterClient getClusterClient(Configuration conf) throws Exc (c, e) -> new StandaloneClientHAServices(restServerAddress)); } - @Override - public JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception { - try (var clusterClient = getClusterClient(conf)) { - return clusterClient - .getJobDetails(jobId) - .get(operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS); - } - } - @VisibleForTesting protected void runJar( JobSpec job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index e1cc293104..168ea4c1f4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobmaster.JobResult; -import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.PodList; @@ -128,8 +127,6 @@ Map getMetrics(Configuration conf, String jobId, List me RestClusterClient getClusterClient(Configuration conf) throws Exception; - JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception; - /** Result of a cancel operation. */ @AllArgsConstructor class CancelResult { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 42e3ca243a..c26a5ee724 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -43,6 +43,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException; import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; import org.apache.flink.kubernetes.operator.observer.CheckpointStatsResult; @@ -138,6 +139,7 @@ public class TestingFlinkService extends AbstractFlinkService { @Getter private final Map savepointTriggers = new HashMap<>(); @Getter private final Map checkpointTriggers = new HashMap<>(); private final Map checkpointStats = new HashMap<>(); + @Setter private boolean throwCheckpointingDisabledError = false; @Getter private int desiredReplicas = 0; @Getter private int cancelJobCallCount = 0; @@ -574,6 +576,13 @@ public void disposeSavepoint(String savepointPath, Configuration conf) throws Ex @Override public Optional getLastCheckpoint(JobID jobId, Configuration conf) { + if (throwCheckpointingDisabledError) { + throw new ReconciliationException( + "Could not observe latest savepoint information", + new RestClientException( + "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST)); + } + jobs.stream() .filter(js -> js.f1.getJobId().equals(jobId)) .findAny() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 83e7fad5fe..4f84337809 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -898,4 +898,35 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception { org.apache.flink.api.common.JobStatus.FINISHED, deployment.getStatus().getJobStatus().getState()); } + + @Test + public void observeLatestCheckpointShouldSkipWhenCheckpointingDisabled() throws Exception { + Configuration conf = + configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); + flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false); + bringToReadyStatus(deployment); + + deployment + .getStatus() + .getJobStatus() + .setState(org.apache.flink.api.common.JobStatus.FINISHED); + var jobs = flinkService.listJobs(); + var oldStatus = jobs.get(0).f1; + jobs.get(0).f1 = + new JobStatusMessage( + oldStatus.getJobId(), + oldStatus.getJobName(), + org.apache.flink.api.common.JobStatus.FINISHED, + oldStatus.getStartTime()); + + flinkService.setThrowCheckpointingDisabledError(true); + observer.observe(deployment, readyContext); + + assertEquals( + 0, + countErrorEvents( + EventRecorder.Reason.CheckpointError.name(), + deployment.getMetadata().getNamespace(), + "Checkpointing has not been enabled")); + } } From 0abaa1fd39602bdbbb96c6c4ba84ea2f8fe90de6 Mon Sep 17 00:00:00 2001 From: Luca Castelli Date: Mon, 24 Feb 2025 14:51:01 -0500 Subject: [PATCH 3/3] Move exception handling to getLastCheckpoint in the service --- .../operator/observer/SnapshotObserver.java | 39 ++++++------------- .../service/AbstractFlinkService.java | 6 +++ .../operator/TestingFlinkService.java | 12 ++---- .../deployment/ApplicationObserverTest.java | 2 +- 4 files changed, 23 insertions(+), 36 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java index 703ada4b4f..8f079415a2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java @@ -37,9 +37,7 @@ import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils; import org.apache.flink.kubernetes.operator.utils.SnapshotUtils; -import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.util.CollectionUtil; -import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -446,30 +444,17 @@ private void observeLatestCheckpoint(FlinkResourceContext ctx, String jobId) var status = ctx.getResource().getStatus(); var jobStatus = status.getJobStatus(); - try { - ctx.getFlinkService() - .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) - .ifPresentOrElse( - snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), - () -> { - if (ReconciliationUtils.isJobCancelled(status)) { - // For cancelled jobs the observed savepoint is always definite, - // so if empty we know the job doesn't have any - // checkpoints/savepoints - jobStatus.setUpgradeSavepointPath(null); - } - }); - } catch (Exception e) { - if (ExceptionUtils.findThrowable(e, RestClientException.class) - .map(ex -> ex.getMessage().contains("Checkpointing has not been enabled")) - .orElse(false)) { - LOG.warn( - "Checkpointing not enabled for job {}, skipping checkpoint observation", - jobId, - e); - return; - } - throw e; - } + ctx.getFlinkService() + .getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig()) + .ifPresentOrElse( + snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()), + () -> { + if (ReconciliationUtils.isJobCancelled(status)) { + // For cancelled jobs the observed savepoint is always definite, + // so if empty we know the job doesn't have any + // checkpoints/savepoints + jobStatus.setUpgradeSavepointPath(null); + } + }); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 4d7634ea4c..42e85a1ac2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -546,6 +546,12 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { try { latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0; } catch (Exception e) { + if (e instanceof RestClientException + && e.getMessage() != null + && e.getMessage().contains("Checkpointing has not been enabled")) { + LOG.warn("Checkpointing not enabled for job {}", jobId, e); + return Optional.empty(); + } throw new ReconciliationException("Could not observe latest savepoint information", e); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index c26a5ee724..fec1dfa64e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -43,7 +43,6 @@ import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; -import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException; import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult; import org.apache.flink.kubernetes.operator.observer.CheckpointStatsResult; @@ -576,13 +575,6 @@ public void disposeSavepoint(String savepointPath, Configuration conf) throws Ex @Override public Optional getLastCheckpoint(JobID jobId, Configuration conf) { - if (throwCheckpointingDisabledError) { - throw new ReconciliationException( - "Could not observe latest savepoint information", - new RestClientException( - "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST)); - } - jobs.stream() .filter(js -> js.f1.getJobId().equals(jobId)) .findAny() @@ -602,6 +594,10 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { Optional, Optional> getCheckpointInfo(JobID jobId, Configuration conf) throws Exception { + if (throwCheckpointingDisabledError) { + throw new RestClientException( + "Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST); + } if (checkpointInfo != null) { return checkpointInfo; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index 4f84337809..aa4898d827 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -900,7 +900,7 @@ public void jobStatusNotOverwrittenWhenTerminal() throws Exception { } @Test - public void observeLatestCheckpointShouldSkipWhenCheckpointingDisabled() throws Exception { + public void getLastCheckpointShouldHandleCheckpointingNotEnabled() throws Exception { Configuration conf = configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec()); flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);