diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e4d455a3f959b..85622387daa64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1385,8 +1385,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) lastSubsumed = null; } - pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); reportCompletedCheckpoint(completedCheckpoint); + pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); } catch (Exception exception) { // For robustness reasons, we need catch exception and try marking the checkpoint // completed. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 1e5c4d82516f0..40229381558f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -119,6 +119,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -154,6 +155,8 @@ /** Tests for the checkpoint coordinator. */ class CheckpointCoordinatorTest { + private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); + @RegisterExtension static final TestExecutorExtension EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension(); @@ -4409,4 +4412,106 @@ public boolean isDiscarded() { } } } + + /** + * Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint + * finishes. This ensures that when external components are notified via the CompletableFuture + * that a checkpoint is complete, all statistics have already been updated. + */ + @Test + void testCompletionFutureCompletesAfterReporting() throws Exception { + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker(); + + CheckpointCoordinator coordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointStatsTracker(tracker) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(graph); + + CompletableFuture checkpointFuture = + coordinator.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + + CompletableFuture ackTask = + CompletableFuture.runAsync( + () -> { + try { + ackCheckpoint( + 1L, + coordinator, + jobVertexID, + graph, + handle(), + handle(), + handle()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("reportCompletedCheckpoint should be started soon when checkpoint is acked.") + .isNull(); + + for (int i = 0; i < 30; i++) { + assertThat(checkpointFuture) + .as( + "Checkpoint future should not complete while reportCompletedCheckpoint is blocked") + .isNotDone(); + Thread.sleep(100); + } + + tracker.getReportBlockingFuture().complete(null); + + CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(result) + .as("Checkpoint future should complete after reportCompletedCheckpoint finishes") + .isNotNull(); + + ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * A controllable checkpoint stats tracker for testing purposes. Allows precise control over + * when reportCompletedCheckpoint() completes, enabling verification of execution order and + * timing in tests. + */ + private static class ControllableCheckpointStatsTracker extends DefaultCheckpointStatsTracker { + private final CompletableFuture reportStartedFuture; + private final CompletableFuture reportBlockingFuture; + + public ControllableCheckpointStatsTracker() { + super( + Integer.MAX_VALUE, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()); + this.reportStartedFuture = new CompletableFuture<>(); + this.reportBlockingFuture = new CompletableFuture<>(); + } + + public CompletableFuture getReportStartedFuture() { + return reportStartedFuture; + } + + public CompletableFuture getReportBlockingFuture() { + return reportBlockingFuture; + } + + @Override + public void reportCompletedCheckpoint(CompletedCheckpointStats completed) { + reportStartedFuture.complete(null); + + try { + reportBlockingFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + super.reportCompletedCheckpoint(completed); + } + } }