Skip to content

Commit 556dd6f

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-38758][runtime] Use JobStatus instead of ApplicationStatus in JobResult
1 parent 883e33b commit 556dd6f

File tree

22 files changed

+107
-120
lines changed

22 files changed

+107
-120
lines changed

flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ private CompletableFuture<Acknowledge> finish(
201201
private Optional<ApplicationStatus> extractApplicationStatus(Throwable t) {
202202
final Optional<UnsuccessfulExecutionException> maybeException =
203203
ExceptionUtils.findThrowable(t, UnsuccessfulExecutionException.class);
204-
return maybeException.map(UnsuccessfulExecutionException::getStatus);
204+
return maybeException.map(
205+
exception -> ApplicationStatus.fromJobStatus(exception.getStatus().orElse(null)));
205206
}
206207

207208
private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
@@ -377,7 +378,7 @@ private CompletableFuture<JobResult> getJobResult(
377378
exception ->
378379
new JobResult.Builder()
379380
.jobId(jobId)
380-
.applicationStatus(ApplicationStatus.UNKNOWN)
381+
.jobStatus(null)
381382
.netRuntime(Long.MAX_VALUE)
382383
.build());
383384
}

flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,34 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.JobID;
23+
import org.apache.flink.api.common.JobStatus;
2324
import org.apache.flink.runtime.client.JobExecutionException;
24-
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2525
import org.apache.flink.runtime.jobmaster.JobResult;
2626

27+
import javax.annotation.Nullable;
28+
29+
import java.util.Optional;
30+
2731
import static org.apache.flink.util.Preconditions.checkNotNull;
2832
import static org.apache.flink.util.Preconditions.checkState;
2933

30-
/** Exception that signals the failure of an application with a given {@link ApplicationStatus}. */
34+
/** Exception that signals the failure of a job with a given {@link JobStatus}. */
3135
@Internal
3236
public class UnsuccessfulExecutionException extends JobExecutionException {
3337

34-
private final ApplicationStatus status;
38+
@Nullable private final JobStatus status;
3539

3640
public UnsuccessfulExecutionException(
3741
final JobID jobID,
38-
final ApplicationStatus status,
42+
@Nullable final JobStatus status,
3943
final String message,
4044
final Throwable cause) {
4145
super(jobID, message, cause);
42-
this.status = checkNotNull(status);
46+
this.status = status;
4347
}
4448

45-
public ApplicationStatus getStatus() {
46-
return status;
49+
public Optional<JobStatus> getStatus() {
50+
return Optional.ofNullable(status);
4751
}
4852

4953
public static UnsuccessfulExecutionException fromJobResult(
@@ -64,13 +68,13 @@ public static UnsuccessfulExecutionException fromJobResult(
6468
} catch (Throwable t) {
6569

6670
final JobID jobID = result.getJobId();
67-
final ApplicationStatus status = result.getApplicationStatus();
71+
final JobStatus status = result.getJobStatus().orElse(null);
6872

69-
return status == ApplicationStatus.CANCELED || status == ApplicationStatus.FAILED
73+
return status == JobStatus.CANCELED || status == JobStatus.FAILED
7074
? new UnsuccessfulExecutionException(
71-
jobID, status, "Application Status: " + status.name(), t)
75+
jobID, status, "Job Status: " + status.name(), t)
7276
: new UnsuccessfulExecutionException(
73-
jobID, ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", t);
77+
jobID, null, "Job failed for unknown reason.", t);
7478
}
7579
}
7680
}

flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.flink.core.fs.Path;
3939
import org.apache.flink.runtime.client.JobStatusMessage;
4040
import org.apache.flink.runtime.client.JobSubmissionException;
41-
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
4241
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
4342
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
4443
import org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory;
@@ -1027,7 +1026,7 @@ private CompletableFuture<JobResult> requestJobResultInternal(@Nonnull JobID job
10271026
})
10281027
.thenApply(
10291028
jobResult -> {
1030-
if (jobResult.getApplicationStatus() == ApplicationStatus.UNKNOWN) {
1029+
if (jobResult.getJobStatus().isEmpty()) {
10311030
throw new JobStateUnknownException(
10321031
String.format("Result for Job %s is UNKNOWN", jobId));
10331032
}

flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.flink.configuration.PipelineOptionsInternal;
3434
import org.apache.flink.core.testutils.FlinkAssertions;
3535
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
36-
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
3736
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
3837
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
3938
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -131,9 +130,7 @@ void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exception
131130
final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobId);
132131
haServices.revokeDispatcherLeadership();
133132
// make sure the leadership is revoked to avoid race conditions
134-
assertThat(firstJobResult.get())
135-
.extracting(JobResult::getApplicationStatus)
136-
.isEqualTo(ApplicationStatus.UNKNOWN);
133+
assertThat(firstJobResult.get().getJobStatus().isPresent()).isFalse();
137134
haServices.grantDispatcherLeadership();
138135

139136
// job is suspended, wait until it's running
@@ -145,8 +142,7 @@ void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exception
145142
// and wait for it to actually finish
146143
final JobResult secondJobResult = cluster.requestJobResult(jobId).get();
147144
assertThat(secondJobResult.isSuccess()).isTrue();
148-
assertThat(secondJobResult.getApplicationStatus())
149-
.isEqualTo(ApplicationStatus.SUCCEEDED);
145+
assertThat(secondJobResult.getJobStatus().orElse(null)).isEqualTo(JobStatus.FINISHED);
150146

151147
// the cluster should shut down automatically once the application completes
152148
awaitClusterStopped(cluster);

flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555
import org.junit.jupiter.params.ParameterizedTest;
5656
import org.junit.jupiter.params.provider.EnumSource;
5757

58+
import javax.annotation.Nullable;
59+
5860
import java.util.Collections;
5961
import java.util.Optional;
6062
import java.util.concurrent.CompletableFuture;
@@ -260,7 +262,7 @@ void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
260262
final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
261263
final UnsuccessfulExecutionException exception =
262264
assertException(applicationFuture, UnsuccessfulExecutionException.class);
263-
assertThat(exception.getStatus()).isEqualTo(ApplicationStatus.FAILED);
265+
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
264266
}
265267

266268
@Test
@@ -435,8 +437,7 @@ private void testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> s
435437
.setRequestJobResultFunction(
436438
jobId ->
437439
CompletableFuture.completedFuture(
438-
createJobResult(
439-
jobId, ApplicationStatus.SUCCEEDED)))
440+
createJobResult(jobId, JobStatus.FINISHED)))
440441
.setClusterShutdownFunction(status -> shutdownFunction.get());
441442

442443
// we're "listening" on this to be completed to verify that the error handler is called.
@@ -820,8 +821,7 @@ private void testSubmitFailedJobOnApplicationError(
820821
jobId ->
821822
submitted.thenApply(
822823
ignored ->
823-
createJobResult(
824-
jobId, ApplicationStatus.FAILED)))
824+
createJobResult(jobId, JobStatus.FAILED)))
825825
.build();
826826

827827
final ApplicationDispatcherBootstrap bootstrap =
@@ -886,10 +886,7 @@ private TestingDispatcherGateway.Builder dispatcherGatewayBuilder(JobStatus jobS
886886
jobId -> CompletableFuture.completedFuture(jobStatus));
887887
if (jobStatus != JobStatus.RUNNING) {
888888
builder.setRequestJobResultFunction(
889-
jobID ->
890-
CompletableFuture.completedFuture(
891-
createJobResult(
892-
jobID, ApplicationStatus.fromJobStatus(jobStatus))));
889+
jobID -> CompletableFuture.completedFuture(createJobResult(jobID, jobStatus)));
893890
}
894891
return builder;
895892
}
@@ -979,25 +976,21 @@ private PackagedProgram getProgram(int noOfJobs) throws FlinkException {
979976
}
980977

981978
private static JobResult createFailedJobResult(final JobID jobId) {
982-
return createJobResult(jobId, ApplicationStatus.FAILED);
979+
return createJobResult(jobId, JobStatus.FAILED);
983980
}
984981

985982
private static JobResult createUnknownJobResult(final JobID jobId) {
986-
return createJobResult(jobId, ApplicationStatus.UNKNOWN);
983+
return createJobResult(jobId, null);
987984
}
988985

989986
private static JobResult createJobResult(
990-
final JobID jobID, final ApplicationStatus applicationStatus) {
987+
final JobID jobID, @Nullable final JobStatus jobStatus) {
991988
JobResult.Builder builder =
992-
new JobResult.Builder()
993-
.jobId(jobID)
994-
.netRuntime(2L)
995-
.applicationStatus(applicationStatus);
996-
if (applicationStatus == ApplicationStatus.CANCELED) {
989+
new JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
990+
if (jobStatus == JobStatus.CANCELED) {
997991
builder.serializedThrowable(
998992
new SerializedThrowable(new JobCancellationException(jobID, "Hello", null)));
999-
} else if (applicationStatus == ApplicationStatus.FAILED
1000-
|| applicationStatus == ApplicationStatus.UNKNOWN) {
993+
} else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
1001994
builder.serializedThrowable(
1002995
new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla")));
1003996
}

flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
23-
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2423
import org.apache.flink.runtime.jobmaster.JobResult;
2524
import org.apache.flink.util.ExecutorUtils;
2625
import org.apache.flink.util.SerializedThrowable;
@@ -164,7 +163,7 @@ private static JobResult createFailedJobResult(final JobID jobId) {
164163
return new JobResult.Builder()
165164
.jobId(jobId)
166165
.netRuntime(2L)
167-
.applicationStatus(ApplicationStatus.FAILED)
166+
.jobStatus(JobStatus.FAILED)
168167
.serializedThrowable(new SerializedThrowable(new Exception("bla bla bla")))
169168
.build();
170169
}
@@ -173,7 +172,7 @@ private static JobResult createSuccessfulJobResult(final JobID jobId) {
173172
return new JobResult.Builder()
174173
.jobId(jobId)
175174
.netRuntime(2L)
176-
.applicationStatus(ApplicationStatus.SUCCEEDED)
175+
.jobStatus(JobStatus.FINISHED)
177176
.build();
178177
}
179178
}

flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.flink.configuration.RestOptions;
3636
import org.apache.flink.runtime.client.JobStatusMessage;
3737
import org.apache.flink.runtime.client.JobSubmissionException;
38-
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
3938
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
4039
import org.apache.flink.runtime.execution.ExecutionState;
4140
import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -880,7 +879,7 @@ void testSubmitJobAndWaitForExecutionResult() throws Exception {
880879
// On an UNKNOWN JobResult it should be retried
881880
JobExecutionResultResponseBody.created(
882881
new JobResult.Builder()
883-
.applicationStatus(ApplicationStatus.UNKNOWN)
882+
.jobStatus(null)
884883
.jobId(jobId)
885884
.netRuntime(Long.MAX_VALUE)
886885
.accumulatorResults(
@@ -891,7 +890,7 @@ void testSubmitJobAndWaitForExecutionResult() throws Exception {
891890
.build()),
892891
JobExecutionResultResponseBody.created(
893892
new JobResult.Builder()
894-
.applicationStatus(ApplicationStatus.SUCCEEDED)
893+
.jobStatus(JobStatus.FINISHED)
895894
.jobId(jobId)
896895
.netRuntime(Long.MAX_VALUE)
897896
.accumulatorResults(
@@ -902,7 +901,7 @@ void testSubmitJobAndWaitForExecutionResult() throws Exception {
902901
.build()),
903902
JobExecutionResultResponseBody.created(
904903
new JobResult.Builder()
905-
.applicationStatus(ApplicationStatus.FAILED)
904+
.jobStatus(JobStatus.FAILED)
906905
.jobId(jobId)
907906
.netRuntime(Long.MAX_VALUE)
908907
.serializedThrowable(

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private JobStatus getJobStatus() {
216216
}
217217

218218
private static JobStatus getJobStatus(JobResult jobResult) {
219-
return jobResult.getApplicationStatus().deriveJobStatus();
219+
return jobResult.getJobStatus().orElseThrow();
220220
}
221221

222222
private static ExecutionGraphInfo generateExecutionGraphInfo(

0 commit comments

Comments
 (0)