Skip to content

Commit c0f2273

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-39133][runtime] Fix DispatcherTest to properly handle async operations for applications
1 parent dc10fa5 commit c0f2273

File tree

2 files changed

+32
-27
lines changed

2 files changed

+32
-27
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -709,29 +709,31 @@ public void notifyApplicationStatusChange(
709709
})
710710
.collect(Collectors.toList());
711711

712-
// wait for all jobs to be stored
713-
FutureUtils.combineAll(jobFutures)
714-
.thenAcceptAsync(
715-
combinedJobs -> {
716-
Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>();
717-
for (ExecutionGraphInfo executionGraphInfo : combinedJobs) {
718-
jobs.put(executionGraphInfo.getJobId(), executionGraphInfo);
719-
partialExecutionGraphInfoStore.remove(
720-
executionGraphInfo.getJobId());
721-
}
722-
723-
ArchivedApplication archivedApplication =
724-
new ArchivedApplication(
725-
application.getApplicationId(),
726-
application.getName(),
727-
application.getApplicationStatus(),
728-
stateTimestamps,
729-
jobs);
730-
731-
applications.remove(applicationId);
732-
writeToArchivedApplicationStore(archivedApplication);
733-
CompletableFuture<?> applicationArchivingFuture =
734-
historyServerArchivist
712+
// wait for all jobs to be stored, then archive the application
713+
CompletableFuture<?> applicationArchivingFuture =
714+
FutureUtils.combineAll(jobFutures)
715+
.thenComposeAsync(
716+
combinedJobs -> {
717+
Map<JobID, ExecutionGraphInfo> jobs = new HashMap<>();
718+
for (ExecutionGraphInfo executionGraphInfo : combinedJobs) {
719+
jobs.put(
720+
executionGraphInfo.getJobId(),
721+
executionGraphInfo);
722+
partialExecutionGraphInfoStore.remove(
723+
executionGraphInfo.getJobId());
724+
}
725+
726+
ArchivedApplication archivedApplication =
727+
new ArchivedApplication(
728+
application.getApplicationId(),
729+
application.getName(),
730+
application.getApplicationStatus(),
731+
stateTimestamps,
732+
jobs);
733+
734+
applications.remove(applicationId);
735+
writeToArchivedApplicationStore(archivedApplication);
736+
return historyServerArchivist
735737
.archiveApplication(archivedApplication)
736738
.exceptionally(
737739
throwable -> {
@@ -741,10 +743,9 @@ public void notifyApplicationStatusChange(
741743
throwable);
742744
return null;
743745
});
744-
applicationArchivingFutures.put(
745-
applicationId, applicationArchivingFuture);
746-
},
747-
getMainThreadExecutor());
746+
},
747+
getMainThreadExecutor());
748+
applicationArchivingFutures.put(applicationId, applicationArchivingFuture);
748749
}
749750
}
750751

flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,10 @@ public void testApplicationStatusChange_ThrowsIfDuplicateTerminalStatus() throws
638638
dispatcher = createTestingDispatcherBuilder().build(rpcService);
639639
dispatcher.start();
640640
final ApplicationID applicationId = mockApplicationStatusChange(ApplicationState.FINISHED);
641+
// wait for archive to complete
642+
dispatcher
643+
.getApplicationArchivingFuture(applicationId)
644+
.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
641645
assertThrows(
642646
IllegalStateException.class,
643647
() ->

0 commit comments

Comments
 (0)