Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.ResponseBody;
Expand All @@ -45,8 +46,8 @@
import java.util.Map;
import java.util.Objects;

/** Copied from Flink. Should be removed once the client dependency is upgraded to 1.20. */
/** The difference compared to 1.18 is that slot sharing group is optional here. */
/** Copied from Flink. */
/** The difference compared to 1.20 is that slot sharing group and job type are optional here. */
public class JobDetailsInfo implements ResponseBody {

public static final String FIELD_NAME_JOB_ID = "jid";
Expand All @@ -57,6 +58,8 @@ public class JobDetailsInfo implements ResponseBody {

public static final String FIELD_NAME_JOB_STATUS = "state";

public static final String FIELD_NAME_JOB_TYPE = "job-type";

public static final String FIELD_NAME_START_TIME = "start-time";

public static final String FIELD_NAME_END_TIME = "end-time";
Expand Down Expand Up @@ -89,6 +92,9 @@ public class JobDetailsInfo implements ResponseBody {
@JsonProperty(FIELD_NAME_JOB_STATUS)
private final JobStatus jobStatus;

@JsonProperty(FIELD_NAME_JOB_TYPE)
private final JobType jobType;

@JsonProperty(FIELD_NAME_START_TIME)
private final long startTime;

Expand Down Expand Up @@ -123,6 +129,7 @@ public JobDetailsInfo(
@JsonProperty(FIELD_NAME_JOB_NAME) String name,
@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
@JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
@JsonProperty(FIELD_NAME_DURATION) long duration,
Expand All @@ -138,6 +145,7 @@ public JobDetailsInfo(
this.name = Preconditions.checkNotNull(name);
this.isStoppable = isStoppable;
this.jobStatus = Preconditions.checkNotNull(jobStatus);
this.jobType = jobType;
this.startTime = startTime;
this.endTime = endTime;
this.duration = duration;
Expand Down Expand Up @@ -167,6 +175,7 @@ public boolean equals(Object o) {
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(name, that.name)
&& jobStatus == that.jobStatus
&& jobType == that.jobType
&& Objects.equals(timestamps, that.timestamps)
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
&& Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
Expand All @@ -180,6 +189,7 @@ public int hashCode() {
name,
isStoppable,
jobStatus,
jobType,
startTime,
endTime,
duration,
Expand Down Expand Up @@ -211,6 +221,11 @@ public JobStatus getJobStatus() {
return jobStatus;
}

@JsonIgnore
public JobType getJobType() {
return jobType;
}

@JsonIgnore
public long getStartTime() {
return startTime;
Expand Down Expand Up @@ -261,7 +276,7 @@ public String getJsonPlan() {
// ---------------------------------------------------

/** Detailed information about a job vertex. */
// @Schema(name = "JobDetailsVertexInfo")
// @Schema(name = "JobDetailsVertexInfo")
public static final class JobVertexDetailsInfo {

public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
Expand Down Expand Up @@ -79,6 +80,7 @@ public void testJobTopologyParsingFromJobDetails() throws Exception {
+ " \"name\": \"State machine job\",\n"
+ " \"isStoppable\": false,\n"
+ " \"state\": \"RUNNING\",\n"
+ " \"job-type\": \"STREAMING\",\n"
+ " \"start-time\": 1707893512027,\n"
+ " \"end-time\": -1,\n"
+ " \"duration\": 214716,\n"
Expand Down Expand Up @@ -237,6 +239,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
+ " \"name\": \"State machine job\",\n"
+ " \"isStoppable\": false,\n"
+ " \"state\": \"RUNNING\",\n"
+ " \"job-type\": \"STREAMING\",\n"
+ " \"start-time\": 1707893512027,\n"
+ " \"end-time\": -1,\n"
+ " \"duration\": 214716,\n"
Expand Down Expand Up @@ -357,7 +360,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
@Test
public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception {
String s =
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"job-type\":\"STREAMING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class);

var metricsCollector = new RestApiMetricsCollector();
Expand All @@ -372,6 +375,7 @@ public void testJobUpdateTsLogic() {
"",
false,
org.apache.flink.api.common.JobStatus.RUNNING,
JobType.STREAMING,
0,
0,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
Expand Down Expand Up @@ -96,6 +97,7 @@ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
"",
false,
JobStatus.RUNNING,
JobType.STREAMING,
0,
0,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.util.CollectionUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -83,6 +84,11 @@ public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {
var jobStatus = resource.getStatus().getJobStatus();
var jobId = jobStatus.getJobId();

if (isBatchJob(ctx, jobId)) {
LOG.debug("Skipping checkpoint observation for BATCH job");
return;
}

// If any manual or periodic savepoint is in progress, observe it
if (SnapshotUtils.savepointInProgress(jobStatus)) {
observeTriggeredSavepoint(ctx, jobId);
Expand Down Expand Up @@ -458,4 +464,16 @@ private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId)
}
});
}

private boolean isBatchJob(FlinkResourceContext<CR> ctx, String jobId) {
try {
var jobDetails =
ctx.getFlinkService()
.getJobDetails(JobID.fromHexString(jobId), ctx.getObserveConfig());
return jobDetails.getJobType() == JobType.BATCH;
} catch (Exception e) {
LOG.debug("Could not determine job type, assuming streaming job", e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
Expand Down Expand Up @@ -837,6 +838,15 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
(c, e) -> new StandaloneClientHAServices(restServerAddress));
}

@Override
public JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception {
try (var clusterClient = getClusterClient(conf)) {
return clusterClient
.getJobDetails(jobId)
.get(operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
}
}

@VisibleForTesting
protected void runJar(
JobSpec job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
Expand Down Expand Up @@ -127,6 +128,8 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me

RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;

JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception;

/** Result of a cancel operation. */
@AllArgsConstructor
class CancelResult {
Expand Down
Loading