Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1385,8 +1385,8 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
lastSubsumed = null;
}

pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concerns that change like this can have potential impacts like:

  1. Deadlock / race condition if reportCompletedCheckpoint would trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)
  2. Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical
  3. If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?

Copy link
Member Author

@1996fanrui 1996fanrui Oct 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock / race condition if reportCompletedCheckpoint would trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)

Yes, it is a unlike situation. reportCompletedCheckpoint only has one parameter, which is completedCheckpoint, so reportCompletedCheckpoint is unable to access pendingCheckpoint.getCompletionFuture().

Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical

Generally, both of them are quick. Of course, complete a CompletableFuture is super quick.

If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?

Judging from the code, the behavior will definitely change for this case. But I think the new behavior makes more sense.

Before this PR, the CompletableFuture is completed even if it is not reported or report is failed. It causes wrong semantic, client receives the checkpoint 10(or X) is completed, then get nothing when fetch more metadata for checkpoint 10. (That is why MapStateNullValueCheckpointingITCase fails occasionally)

After this PR, client could fetch the correct result once the client received the complete signal.

The semantic will be clearer.

reportCompletedCheckpoint(completedCheckpoint);
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
} catch (Exception exception) {
// For robustness reasons, we need catch exception and try marking the checkpoint
// completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -154,6 +155,8 @@
/** Tests for the checkpoint coordinator. */
class CheckpointCoordinatorTest {

private static final long TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);

@RegisterExtension
static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE =
TestingUtils.defaultExecutorExtension();
Expand Down Expand Up @@ -4409,4 +4412,106 @@ public boolean isDiscarded() {
}
}
}

/**
* Tests that Checkpoint CompletableFuture completion happens after reportCompletedCheckpoint
* finishes. This ensures that when external components are notified via the CompletableFuture
* that a checkpoint is complete, all statistics have already been updated.
*/
@Test
void testCompletionFutureCompletesAfterReporting() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(jobVertexID)
.build(EXECUTOR_RESOURCE.getExecutor());

ControllableCheckpointStatsTracker tracker = new ControllableCheckpointStatsTracker();

CheckpointCoordinator coordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointStatsTracker(tracker)
.setTimer(manuallyTriggeredScheduledExecutor)
.build(graph);

CompletableFuture<CompletedCheckpoint> checkpointFuture =
coordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();

CompletableFuture<Void> ackTask =
CompletableFuture.runAsync(
() -> {
try {
ackCheckpoint(
1L,
coordinator,
jobVertexID,
graph,
handle(),
handle(),
handle());
} catch (Exception e) {
throw new RuntimeException(e);
}
});

assertThat(tracker.getReportStartedFuture().get(TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("reportCompletedCheckpoint should be started soon when checkpoint is acked.")
.isNull();

for (int i = 0; i < 30; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to above, I am not sure you can confirm whether expected change did not occur because of being blocked vs corresponding thread being inactive. Will be better to wait indefinitely here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep here represents the opposite case, where we expect the CompletableFuture to not complete. Therefore, the sleep is intentionally used to verify isNotDone.

If the sleep is set to 10 minutes, then the test will take 10 minutes. Here is 3 seconds, generally, it is enough to check CompletableFuture.isNotDone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.

I am less concerned about this one as it shouldn't introduce flakiness, but testing it this way you have weaker guarantees of "happens before" condition being actually tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of adding this test is that if isDone occurs here, then there must be a bug. It will let developer is aware of bugs.

how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.

From current testing, it cannot to distinguish them. Here we are testing that an unexpected case did not occur. If VM froze happens, both expected case or expected case do not be executed. So I really do not know how to distinguish them.

but testing it this way you have weaker guarantees of "happens before" condition being actually tested.

I also hope to avoid sleep in tests as much as possible. However, I haven't figured out how to use CompletableFuture or CountDownLatch to replace it.

May I know do you have any suggestions on this? I'd really appreciate a better alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is non-trivial to rewrite this test without busy wait to avoid this issue, I am happy to accept this tests implementation as is. Most of the time this test will not be a subject to VM freeze. Even if some bad change will be lucky enough to get green CI tests and be merged, we will see tests being red in other runs shortly after. Assuming that we will still be able to pinpoint the issue promptly. I don't have objections to keep it as is.

assertThat(checkpointFuture)
.as(
"Checkpoint future should not complete while reportCompletedCheckpoint is blocked")
.isNotDone();
Thread.sleep(100);
}

tracker.getReportBlockingFuture().complete(null);

CompletedCheckpoint result = checkpointFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(result)
.as("Checkpoint future should complete after reportCompletedCheckpoint finishes")
.isNotNull();

ackTask.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}

/**
* A controllable checkpoint stats tracker for testing purposes. Allows precise control over
* when reportCompletedCheckpoint() completes, enabling verification of execution order and
* timing in tests.
*/
private static class ControllableCheckpointStatsTracker extends DefaultCheckpointStatsTracker {
private final CompletableFuture<Void> reportStartedFuture;
private final CompletableFuture<Void> reportBlockingFuture;

public ControllableCheckpointStatsTracker() {
super(
Integer.MAX_VALUE,
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
this.reportStartedFuture = new CompletableFuture<>();
this.reportBlockingFuture = new CompletableFuture<>();
}

public CompletableFuture<Void> getReportStartedFuture() {
return reportStartedFuture;
}

public CompletableFuture<Void> getReportBlockingFuture() {
return reportBlockingFuture;
}

@Override
public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
reportStartedFuture.complete(null);

try {
reportBlockingFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
super.reportCompletedCheckpoint(completed);
}
}
}