diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c05efb10b484f..8750f1fbf97ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1384,8 +1384,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) lastSubsumed = null; } - pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); reportCompletedCheckpoint(completedCheckpoint); + pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); } catch (Exception exception) { // For robustness reasons, we need catch exception and try marking the checkpoint // completed. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index de7419e18e47b..c2d2144a1fde6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.rpc.exceptions.RpcException; import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; @@ -120,6 +121,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -155,6 +157,8 @@ /** Tests for the checkpoint coordinator. */ class CheckpointCoordinatorTest { + private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); + @RegisterExtension static final TestExecutorExtension EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension(); @@ -4403,4 +4407,107 @@ public boolean isDiscarded() { } } } + + /** + * Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint + * finishes. This ensures that when external components are notified via the CompletableFuture + * that a checkpoint is complete, all statistics have already been updated. + */ + @Test + void testCompletionFutureCompletesAfterReporting() throws Exception { + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID) + .build(EXECUTOR_RESOURCE.getExecutor()); + + ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker(); + + CheckpointCoordinator coordinator = + new CheckpointCoordinatorBuilder() + .setCheckpointStatsTracker(tracker) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(graph); + + CompletableFuture checkpointFuture = + coordinator.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + + CompletableFuture ackTask = + CompletableFuture.runAsync( + () -> { + try { + ackCheckpoint( + 1L, + coordinator, + jobVertexID, + graph, + handle(), + handle(), + handle()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("reportCompletedCheckpoint should be started soon when checkpoint is acked.") + .isNull(); + + for (int i = 0; i < 30; i++) { + assertThat(checkpointFuture) + .as( + "Checkpoint future should not complete while reportCompletedCheckpoint is blocked") + .isNotDone(); + Thread.sleep(100); + } + + tracker.getReportBlockingFuture().complete(null); + + CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(result) + .as("Checkpoint future should complete after reportCompletedCheckpoint finishes") + .isNotNull(); + + ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + /** + * A controllable checkpoint stats tracker for testing purposes. Allows precise control over + * when reportCompletedCheckpoint() completes, enabling verification of execution order and + * timing in tests. + */ + private static class ControllableCheckpointStatsTracker extends CheckpointStatsTracker { + private final CompletableFuture reportStartedFuture; + private final CompletableFuture reportBlockingFuture; + + public ControllableCheckpointStatsTracker() { + super( + Integer.MAX_VALUE, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + JobID.generate()); + this.reportStartedFuture = new CompletableFuture<>(); + this.reportBlockingFuture = new CompletableFuture<>(); + } + + public CompletableFuture getReportStartedFuture() { + return reportStartedFuture; + } + + public CompletableFuture getReportBlockingFuture() { + return reportBlockingFuture; + } + + @Override + public void reportCompletedCheckpoint(CompletedCheckpointStats completed) { + reportStartedFuture.complete(null); + + try { + reportBlockingFuture.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + super.reportCompletedCheckpoint(completed); + } + } }