Skip to content

Conversation

1996fanrui
Copy link
Member

@1996fanrui 1996fanrui commented Sep 26, 2025

What is the purpose of the change

MapStateNullValueCheckpointingITCase failed with No checkpoint was created yet

Root Cause Analysis

Problem Location

Log analysis revealed that the checkpoint had actually completed successfully:

07:19:37,522 [jobmanager-io-thread-1] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job b809cf46d67c23697786fd514565c737 (4464 bytes, checkpointDuration=45 ms, finalizationTime=4 ms)

However, the test code could not find the completed checkpoint when calling CommonTestUtils.getLatestCompletedCheckpointPath().

Root Cause

The problem occurs in the execution order of the CheckpointCoordinator.completePendingCheckpoint() method:

pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
reportCompletedCheckpoint(completedCheckpoint);

Checkpoint Coordinator mechanism:

  1. A​: pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint) completes the completion future first{}
  2. B​: reportCompletedCheckpoint(completedCheckpoint) updates checkpoint statistics.

Test code timeline:

  1. C: Detect future completion
  2. D: Call getLatestCompletedCheckpointPath() immediately

Usually, the execution sequence is A -> B -> C -> D, it works well.

The bug happens if execution sequence is A > C -> D -> B.

Reproduction Method

In the completePendingCheckpoint() method, inserting Thread.sleep(100) between complete() and reportCompletedCheckpoint() can reproduce this issue 100%.

Brief change log: Adjust the execution order in CheckpointCoordinator

[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure

Changes:

// Update statistics first 
reportCompletedCheckpoint(completedCheckpoint);
// Complete the future later
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);

Benefits:

  • Fundamentally eliminates race conditions
  • Ensures semantic correctness: Waiting parties are notified only when the checkpoint is fully processed

Verifying this change

  • Added testCompletionFutureCompletesAfterReporting

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 26, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui marked this pull request as ready for review September 29, 2025 18:54
@1996fanrui 1996fanrui marked this pull request as draft September 30, 2025 09:07
@1996fanrui 1996fanrui force-pushed the 38408/no-checkpoint branch 2 times, most recently from 736fe94 to b0e8240 Compare October 2, 2025 09:06
@1996fanrui 1996fanrui marked this pull request as ready for review October 2, 2025 09:13
Copy link
Contributor

@Izeren Izeren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the change @1996fanrui. Overall, LGTM, my main concern is about potential test flakiness, PTAL

lastSubsumed = null;
}

pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concerns that change like this can have potential impacts like:

  1. Deadlock / race condition if reportCompletedCheckpoint would trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)
  2. Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical
  3. If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?

Copy link
Member Author

@1996fanrui 1996fanrui Oct 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deadlock / race condition if reportCompletedCheckpoint would trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)

Yes, it is a unlike situation. reportCompletedCheckpoint only has one parameter, which is completedCheckpoint, so reportCompletedCheckpoint is unable to access pendingCheckpoint.getCompletionFuture().

Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical

Generally, both of them are quick. Of course, complete a CompletableFuture is super quick.

If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?

Judging from the code, the behavior will definitely change for this case. But I think the new behavior makes more sense.

Before this PR, the CompletableFuture is completed even if it is not reported or report is failed. It causes wrong semantic, client receives the checkpoint 10(or X) is completed, then get nothing when fetch more metadata for checkpoint 10. (That is why MapStateNullValueCheckpointingITCase fails occasionally)

After this PR, client could fetch the correct result once the client received the complete signal.

The semantic will be clearer.

}
});

assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is likely to end up being flaky test. Test in CI could freeze for 15min and more, so 20 seconds timeout may not be sufficient in general.
I suggest to use indefinite timeout of at least a few hours

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to clarify two types of CI timeouts: one is the total timeout of CI, and the other is the timeout of a single unit test or some logical timeout within the unit test.

  • For the former, I think 15 minutes or more is reasonable.
  • For the latter, if the timeout for each single test is 15 minutes, the total CI duration will be terrible. Flink may have more than 10k tests.

The test sometimes is unstable, but not that bad. The default policy of Flink CI is to fail if there is no progress for 15 consecutive minutes. However, these are generally caused by bugs, for example, deadlock or something like this. It's rare to see a CI process stuck for 15 minutes due to a lack of resources.

For this case, tracker.getReportStartedFuture() is so quick on my local, it always be less than 100 ms, that is why I think 20 seconds is safe here.

For other examples, I checked some callers[1][2] from flink code, some of them are 10 seconds, and some of them are 60 seconds. I could update it from 20 seconds to 60 seconds if you think 20 seconds is not safe enough. TBH, 15 minutes is a little long, it will delay the exception or CI if there are some bugs.

[1]

[2].

clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I brought it up is that I got similar feedback from @dmvk in the past, where he suggested that CI VM can freeze for 15 min even if the test is quick, because there are multiple tests that are running and you don't have guarantee that your particular test will be always executed quickly.

My overall view on this is the following, If otherwise quick test for some reason takes longer than 15 minutes then either it faced something like a deadlock or overall CI run was impacted by "bad change"/"external factors". Unless you have a deadlock in your own test, the whole CI run is more likely to timeout than not, so it doesn't make things worse. For the cases when you do have a deadlock, per test timeout could allow you to verify more tests in a failed run, which is beneficial, but the benefit is limited to the non-parallel suit that fails.

To sum up, I don't see big difference between 15 min and 1 hour, but 20 seconds is very likely not enough

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your concern about CI stability. While I still think 15 minutes is quite conservative for this specific case, I'm willing to use it for now to unblock the fix. We can always revisit this timeout if we observe issues in practice. I'll update it to 15 minutes. Thanks for the discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

.as("reportCompletedCheckpoint should be started soon when checkpoint is acked.")
.isNull();

for (int i = 0; i < 30; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to above, I am not sure you can confirm whether expected change did not occur because of being blocked vs corresponding thread being inactive. Will be better to wait indefinitely here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep here represents the opposite case, where we expect the CompletableFuture to not complete. Therefore, the sleep is intentionally used to verify isNotDone.

If the sleep is set to 10 minutes, then the test will take 10 minutes. Here is 3 seconds, generally, it is enough to check CompletableFuture.isNotDone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.

I am less concerned about this one as it shouldn't introduce flakiness, but testing it this way you have weaker guarantees of "happens before" condition being actually tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of adding this test is that if isDone occurs here, then there must be a bug. It will let developer is aware of bugs.

how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.

From current testing, it cannot to distinguish them. Here we are testing that an unexpected case did not occur. If VM froze happens, both expected case or expected case do not be executed. So I really do not know how to distinguish them.

but testing it this way you have weaker guarantees of "happens before" condition being actually tested.

I also hope to avoid sleep in tests as much as possible. However, I haven't figured out how to use CompletableFuture or CountDownLatch to replace it.

May I know do you have any suggestions on this? I'd really appreciate a better alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is non-trivial to rewrite this test without busy wait to avoid this issue, I am happy to accept this tests implementation as is. Most of the time this test will not be a subject to VM freeze. Even if some bad change will be lucky enough to get green CI tests and be merged, we will see tests being red in other runs shortly after. Assuming that we will still be able to pinpoint the issue promptly. I don't have objections to keep it as is.


tracker.getReportBlockingFuture().complete(null);

CompletedCheckpoint result = checkpointFuture.get(5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.as("Checkpoint future should complete after reportCompletedCheckpoint finishes")
.isNotNull();

ackTask.get(5, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 7, 2025
Copy link
Contributor

@Izeren Izeren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you @1996fanrui for the change!

@1996fanrui 1996fanrui force-pushed the 38408/no-checkpoint branch 2 times, most recently from 9443a22 to e41d3df Compare October 15, 2025 16:28
…fter updating statistics to ensures semantic correctness and prevent test failure
@1996fanrui
Copy link
Member Author

Thanks @Izeren for the review, merging

@1996fanrui 1996fanrui merged commit e44d638 into apache:master Oct 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants