Skip to content

Commit 15f648c

Browse files
committed
[FLINK-35776] Simplify job status handling
1 parent 29076c8 commit 15f648c

File tree

15 files changed

+174
-345
lines changed

15 files changed

+174
-345
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/JobStatusUtils.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,18 @@ public class JobStatusUtils {
3838
public static List<JobStatusMessage> toJobStatusMessage(
3939
MultipleJobsDetails multipleJobsDetails) {
4040
return multipleJobsDetails.getJobs().stream()
41-
.map(
42-
details ->
43-
new JobStatusMessage(
44-
details.getJobId(),
45-
details.getJobName(),
46-
getEffectiveStatus(details),
47-
details.getStartTime()))
41+
.map(JobStatusUtils::toJobStatusMessage)
4842
.collect(Collectors.toList());
4943
}
5044

45+
public static JobStatusMessage toJobStatusMessage(JobDetails details) {
46+
return new JobStatusMessage(
47+
details.getJobId(),
48+
details.getJobName(),
49+
getEffectiveStatus(details),
50+
details.getStartTime());
51+
}
52+
5153
@VisibleForTesting
5254
static JobStatus getEffectiveStatus(JobDetails details) {
5355
int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/MissingSessionJobException.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/UnknownJobException.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 36 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,29 @@
1717

1818
package org.apache.flink.kubernetes.operator.observer;
1919

20+
import org.apache.flink.api.common.JobID;
2021
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2122
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2223
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
2324
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2425
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2526
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
2627
import org.apache.flink.runtime.client.JobStatusMessage;
28+
import org.apache.flink.runtime.rest.NotFoundException;
2729

2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032

31-
import java.util.ArrayList;
32-
import java.util.List;
33-
import java.util.Optional;
3433
import java.util.concurrent.TimeoutException;
3534

3635
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
3736

3837
/** An observer to observe the job status. */
39-
public abstract class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
38+
public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
4039

4140
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
4241

43-
public static final String MISSING_SESSION_JOB_ERR = "Missing Session Job";
42+
public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
4443

4544
protected final EventRecorder eventRecorder;
4645

@@ -68,58 +67,51 @@ public boolean observe(FlinkResourceContext<R> ctx) {
6867
LOG.debug("Observing job status");
6968
var previousJobStatus = jobStatus.getState();
7069

71-
List<JobStatusMessage> clusterJobStatuses;
7270
try {
73-
// Query job list from the cluster
74-
clusterJobStatuses =
75-
new ArrayList<>(ctx.getFlinkService().listJobs(ctx.getObserveConfig()));
71+
var newJobStatusOpt =
72+
ctx.getFlinkService()
73+
.getJobStatus(
74+
ctx.getObserveConfig(),
75+
JobID.fromHexString(jobStatus.getJobId()));
76+
77+
if (newJobStatusOpt.isPresent()) {
78+
updateJobStatus(ctx, newJobStatusOpt.get());
79+
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
80+
return true;
81+
} else {
82+
onTargetJobNotFound(ctx);
83+
}
7684
} catch (Exception e) {
7785
// Error while accessing the rest api, will try again later...
78-
LOG.warn("Exception while listing jobs", e);
86+
LOG.warn("Exception while getting job status", e);
7987
ifRunningMoveToReconciling(jobStatus, previousJobStatus);
8088
if (e instanceof TimeoutException) {
8189
onTimeout(ctx);
8290
}
83-
return false;
84-
}
85-
86-
if (!clusterJobStatuses.isEmpty()) {
87-
// There are jobs on the cluster, we filter the ones for this resource
88-
Optional<JobStatusMessage> targetJobStatusMessage =
89-
filterTargetJob(jobStatus, clusterJobStatuses);
90-
91-
if (targetJobStatusMessage.isEmpty()) {
92-
LOG.warn("No matching jobs found on the cluster");
93-
ifRunningMoveToReconciling(jobStatus, previousJobStatus);
94-
onTargetJobNotFound(ctx);
95-
return false;
96-
} else {
97-
updateJobStatus(ctx, targetJobStatusMessage.get());
98-
}
99-
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
100-
return true;
101-
} else {
102-
LOG.debug("No jobs found on the cluster");
103-
// No jobs found on the cluster, it is possible that the jobmanager is still starting up
104-
ifRunningMoveToReconciling(jobStatus, previousJobStatus);
105-
onNoJobsFound(ctx);
106-
return false;
10791
}
92+
return false;
10893
}
10994

11095
/**
11196
* Callback when no matching target job was found on a cluster where jobs were found.
11297
*
11398
* @param ctx The Flink resource context.
11499
*/
115-
protected abstract void onTargetJobNotFound(FlinkResourceContext<R> ctx);
116-
117-
/**
118-
* Callback when no jobs were found on the cluster.
119-
*
120-
* @param ctx The Flink resource context.
121-
*/
122-
protected void onNoJobsFound(FlinkResourceContext<R> ctx) {}
100+
protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
101+
ctx.getResource()
102+
.getStatus()
103+
.getJobStatus()
104+
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
105+
ReconciliationUtils.updateForReconciliationError(
106+
ctx, new NotFoundException(JOB_NOT_FOUND_ERR));
107+
eventRecorder.triggerEvent(
108+
ctx.getResource(),
109+
EventRecorder.Type.Warning,
110+
EventRecorder.Reason.Missing,
111+
EventRecorder.Component.Job,
112+
JOB_NOT_FOUND_ERR,
113+
ctx.getKubernetesClient());
114+
}
123115

124116
/**
125117
* If we observed the job previously in RUNNING state we move to RECONCILING instead as we are
@@ -139,18 +131,7 @@ private void ifRunningMoveToReconciling(JobStatus jobStatus, String previousJobS
139131
*
140132
* @param ctx Resource context.
141133
*/
142-
protected abstract void onTimeout(FlinkResourceContext<R> ctx);
143-
144-
/**
145-
* Filter the target job status message by the job list from the cluster.
146-
*
147-
* @param status the target job status.
148-
* @param clusterJobStatuses the candidate cluster jobs.
149-
* @return The target job status message. If no matched job found, {@code Optional.empty()} will
150-
* be returned.
151-
*/
152-
protected abstract Optional<JobStatusMessage> filterTargetJob(
153-
JobStatus status, List<JobStatusMessage> clusterJobStatuses);
134+
protected void onTimeout(FlinkResourceContext<R> ctx) {}
154135

155136
/**
156137
* Update the status in CR according to the cluster job status.
@@ -161,16 +142,13 @@ protected abstract Optional<JobStatusMessage> filterTargetJob(
161142
private void updateJobStatus(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) {
162143
var resource = ctx.getResource();
163144
var jobStatus = resource.getStatus().getJobStatus();
164-
var previousJobId = jobStatus.getJobId();
165145
var previousJobStatus = jobStatus.getState();
166146

167147
jobStatus.setState(clusterJobStatus.getJobState().name());
168148
jobStatus.setJobName(clusterJobStatus.getJobName());
169-
jobStatus.setJobId(clusterJobStatus.getJobId().toHexString());
170149
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
171150

172-
if (jobStatus.getJobId().equals(previousJobId)
173-
&& jobStatus.getState().equals(previousJobStatus)) {
151+
if (jobStatus.getState().equals(previousJobStatus)) {
174152
LOG.debug("Job status ({}) unchanged", previousJobStatus);
175153
} else {
176154
jobStatus.setUpdateTime(String.valueOf(System.currentTimeMillis()));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,11 @@
1919

2020
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2121
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
22-
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
2322
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
24-
import org.apache.flink.kubernetes.operator.exception.UnknownJobException;
2523
import org.apache.flink.kubernetes.operator.observer.ClusterHealthObserver;
2624
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
2725
import org.apache.flink.kubernetes.operator.observer.SnapshotObserver;
28-
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
2926
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
30-
import org.apache.flink.runtime.client.JobStatusMessage;
31-
32-
import java.util.Comparator;
33-
import java.util.List;
34-
import java.util.Optional;
3527

3628
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
3729

@@ -73,46 +65,5 @@ public ApplicationJobObserver(EventRecorder eventRecorder) {
7365
public void onTimeout(FlinkResourceContext<FlinkDeployment> ctx) {
7466
observeJmDeployment(ctx);
7567
}
76-
77-
@Override
78-
protected Optional<JobStatusMessage> filterTargetJob(
79-
JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
80-
if (!clusterJobStatuses.isEmpty()) {
81-
clusterJobStatuses.sort(
82-
Comparator.comparingLong(JobStatusMessage::getStartTime).reversed());
83-
return Optional.of(clusterJobStatuses.get(0));
84-
}
85-
return Optional.empty();
86-
}
87-
88-
@Override
89-
protected void onTargetJobNotFound(FlinkResourceContext<FlinkDeployment> ctx) {
90-
// This should never happen for application clusters, there is something
91-
// wrong
92-
setUnknownJobError(ctx);
93-
}
94-
95-
/**
96-
* We found a job on an application cluster that doesn't match the expected job. Trigger
97-
* error.
98-
*
99-
* @param ctx Application deployment context.
100-
*/
101-
private void setUnknownJobError(FlinkResourceContext<FlinkDeployment> ctx) {
102-
ctx.getResource()
103-
.getStatus()
104-
.getJobStatus()
105-
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
106-
String err = "Unrecognized Job for Application deployment";
107-
logger.error(err);
108-
ReconciliationUtils.updateForReconciliationError(ctx, new UnknownJobException(err));
109-
eventRecorder.triggerEvent(
110-
ctx.getResource(),
111-
EventRecorder.Type.Warning,
112-
EventRecorder.Reason.Missing,
113-
EventRecorder.Component.Job,
114-
err,
115-
ctx.getKubernetesClient());
116-
}
11768
}
11869
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
3636
// Check if session cluster can serve rest calls following our practice in JobObserver
3737
try {
3838
logger.debug("Observing session cluster");
39-
ctx.getFlinkService().listJobs(ctx.getObserveConfig());
39+
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
4040
var rs = ctx.getResource().getStatus().getReconciliationStatus();
4141
if (rs.getState() == ReconciliationState.DEPLOYED) {
4242
rs.markReconciledSpecAsStable();

0 commit comments

Comments
 (0)