- 
                Notifications
    You must be signed in to change notification settings 
- Fork 13.7k
[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure #27050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dc2613d    to
    9afe37c      
    Compare
  
    736fe94    to
    b0e8240      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the change @1996fanrui. Overall, LGTM, my main concern is about potential test flakiness, PTAL
| lastSubsumed = null; | ||
| } | ||
|  | ||
| pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have concerns that change like this can have potential impacts like:
- Deadlock / race condition if reportCompletedCheckpointwould trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)
- Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical
- If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deadlock / race condition if reportCompletedCheckpoint would trigger any handler that also waits on the checkpoint future before its completion (in general, unlikely situation, and should be caught by existing test)
Yes, it is a unlike situation. reportCompletedCheckpoint only has one parameter, which is completedCheckpoint, so reportCompletedCheckpoint is unable to access pendingCheckpoint.getCompletionFuture().
Checkpoint completion will be slightly delayed, but reporting is a quick operation, so doesn't seem to be critical
Generally, both of them are quick. Of course, complete a CompletableFuture is super quick.
If reporting throws exception it will result in checkpoint being completed exceptionally. Could we confirm that this behaviour matches the previous one?
Judging from the code, the behavior will definitely change for this case. But I think the new behavior makes more sense.
Before this PR, the CompletableFuture is completed even if it is not reported or report is failed. It causes wrong semantic, client receives the checkpoint 10(or X) is completed, then get nothing when fetch more metadata for checkpoint 10. (That is why MapStateNullValueCheckpointingITCase fails occasionally)
After this PR, client could fetch the correct result once the client received the complete signal.
The semantic will be clearer.
| } | ||
| }); | ||
|  | ||
| assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is likely to end up being flaky test. Test in CI could freeze for 15min and more, so 20 seconds timeout may not be sufficient in general.
I suggest to use indefinite timeout of at least a few hours
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to clarify two types of CI timeouts: one is the total timeout of CI, and the other is the timeout of a single unit test or some logical timeout within the unit test.
- For the former, I think 15 minutes or more is reasonable.
- For the latter, if the timeout for each single test is 15 minutes, the total CI duration will be terrible. Flink may have more than 10k tests.
The test sometimes is unstable, but not that bad. The default policy of Flink CI is to fail if there is no progress for 15 consecutive minutes. However, these are generally caused by bugs, for example, deadlock or something like this. It's rare to see a CI process stuck for 15 minutes due to a lack of resources.
For this case, tracker.getReportStartedFuture() is so quick on my local, it always be less than 100 ms, that is why I think 20 seconds is safe here.
For other examples, I checked some callers[1][2] from flink code, some of them are 10 seconds, and some of them are 60 seconds. I could update it from 20 seconds to 60 seconds if you think 20 seconds is not safe enough. TBH, 15 minutes is a little long, it will delay the exception or CI if there are some bugs.
[1]
Line 79 in cc55c56
| private static final int TIMEOUT_SECONDS = 10; | 
[2].
Line 245 in cc55c56
| clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I brought it up is that I got similar feedback from @dmvk in the past, where he suggested that CI VM can freeze for 15 min even if the test is quick, because there are multiple tests that are running and you don't have guarantee that your particular test will be always executed quickly.
My overall view on this is the following, If otherwise quick test for some reason takes longer than 15 minutes then either it faced something like a deadlock or overall CI run was impacted by "bad change"/"external factors". Unless you have a deadlock in your own test, the whole CI run is more likely to timeout than not, so it doesn't make things worse. For the cases when you do have a deadlock, per test timeout could allow you to verify more tests in a failed run, which is beneficial, but the benefit is limited to the non-parallel suit that fails.
To sum up, I don't see big difference between 15 min and 1 hour, but 20 seconds is very likely not enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your concern about CI stability. While I still think 15 minutes is quite conservative for this specific case, I'm willing to use it for now to unblock the fix. We can always revisit this timeout if we observe issues in practice. I'll update it to 15 minutes. Thanks for the discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
| .as("reportCompletedCheckpoint should be started soon when checkpoint is acked.") | ||
| .isNull(); | ||
|  | ||
| for (int i = 0; i < 30; i++) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to above, I am not sure you can confirm whether expected change did not occur because of being blocked vs corresponding thread being inactive. Will be better to wait indefinitely here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sleep here represents the opposite case, where we expect the CompletableFuture to not complete. Therefore, the sleep is intentionally used to verify isNotDone.
If the sleep is set to 10 minutes, then the test will take 10 minutes. Here is 3 seconds, generally, it is enough to check CompletableFuture.isNotDone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.
I am less concerned about this one as it shouldn't introduce flakiness, but testing it this way you have weaker guarantees of "happens before" condition being actually tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of adding this test is that if isDone occurs here, then there must be a bug. It will let developer is aware of bugs.
how would your test distinguish between: Future wasn't complete because of "happens before" condition vs Future wasn't complete because VM froze and responsible thread was not making progress for more than 3 seconds.
From current testing, it cannot to distinguish them. Here we are testing that an unexpected case did not occur. If VM froze happens, both expected case or expected case do not be executed. So I really do not know how to distinguish them.
but testing it this way you have weaker guarantees of "happens before" condition being actually tested.
I also hope to avoid sleep in tests as much as possible. However, I haven't figured out how to use CompletableFuture or CountDownLatch to replace it.
May I know do you have any suggestions on this? I'd really appreciate a better alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is non-trivial to rewrite this test without busy wait to avoid this issue, I am happy to accept this tests implementation as is. Most of the time this test will not be a subject to VM freeze. Even if some bad change will be lucky enough to get green CI tests and be merged, we will see tests being red in other runs shortly after. Assuming that we will still be able to pinpoint the issue promptly. I don't have objections to keep it as is.
|  | ||
| tracker.getReportBlockingFuture().complete(null); | ||
|  | ||
| CompletedCheckpoint result = checkpointFuture.get(5, TimeUnit.SECONDS); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| .as("Checkpoint future should complete after reportCompletedCheckpoint finishes") | ||
| .isNotNull(); | ||
|  | ||
| ackTask.get(5, TimeUnit.SECONDS); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
b0e8240    to
    551ed1f      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @1996fanrui for the change!
9443a22    to
    e41d3df      
    Compare
  
    …fter updating statistics to ensures semantic correctness and prevent test failure
e41d3df    to
    2036713      
    Compare
  
    | Thanks @Izeren for the review, merging | 
What is the purpose of the change
MapStateNullValueCheckpointingITCase failed with No checkpoint was created yet
Root Cause Analysis
Problem Location
Log analysis revealed that the checkpoint had actually completed successfully:
However, the test code could not find the completed checkpoint when calling CommonTestUtils.getLatestCompletedCheckpointPath().
Root Cause
The problem occurs in the execution order of the CheckpointCoordinator.completePendingCheckpoint() method:
flink/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
Line 1389 in 39a4628
Checkpoint Coordinator mechanism:
Test code timeline:
Usually, the execution sequence is A -> B -> C -> D, it works well.
The bug happens if execution sequence is A > C -> D -> B.
Reproduction Method
In the completePendingCheckpoint() method, inserting Thread.sleep(100) between complete() and reportCompletedCheckpoint() can reproduce this issue 100%.
Brief change log: Adjust the execution order in CheckpointCoordinator
[FLINK-38408][checkpoint] Complete the checkpoint CompletableFuture after updating statistics to ensures semantic correctness and prevent test failure
Changes:
Benefits:
Verifying this change
testCompletionFutureCompletesAfterReportingDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation