kafka-consumer(ticdc): tolerate replayed resolved and DDL events#12596
kafka-consumer(ticdc): tolerate replayed resolved and DDL events#12596wlwilliamx wants to merge 1 commit intopingcap:masterfrom
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request improves the Kafka consumer's resilience to replayed events by relaxing watermark fallback checks and introducing logical DDL deduplication. A critical feedback point highlights that the current deduplication logic for split DDLs (e.g., from RENAME TABLES) is insufficient when replaying sequences, as it only compares against the single most recent event. The reviewer suggests using CommitTs and Seq ordering to correctly identify and ignore replayed DDLs.
| // So to tell if a DDL is redundant or not, we must check the equivalence of | ||
| // the current DDL and the DDL with max CommitTs. | ||
| if ddl == w.ddlWithMaxCommitTs { | ||
| if isEquivalentDDLEvent(ddl, w.ddlWithMaxCommitTs) { |
There was a problem hiding this comment.
The deduplication logic here only checks if the incoming DDL is equivalent to the last appended DDL. This is insufficient for replayed sequences of split DDLs (e.g., from a RENAME TABLES job).
If a sequence like [DDL_A(Seq:1), DDL_B(Seq:2)] is replayed, DDL_A will be compared against DDL_B. Since their Seq numbers differ, isEquivalentDDLEvent returns false, and DDL_A is appended again. This leads to redundant DDL execution and a consumer panic.
Since DDLs are strictly ordered by (CommitTs, Seq) on partition 0, any DDL with a Seq less than or equal to the current maximum for the same CommitTs should be ignored. It is also recommended to add a regression test for replayed sequences of split DDLs.
| if isEquivalentDDLEvent(ddl, w.ddlWithMaxCommitTs) { | |
| if w.ddlWithMaxCommitTs != nil && ddl.CommitTs == w.ddlWithMaxCommitTs.CommitTs && ddl.Seq <= w.ddlWithMaxCommitTs.Seq { |
|
/retest |
What problem does this PR solve?
Issue Number: close #12595
What is changed and how it works?
cmd/kafka-consumeras duplicate delivery instead of a fatal errorCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
No. This only makes the standalone Kafka consumer tolerate duplicate MQ delivery in line with TiCDC's at-least-once behavior.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note