From f99c8a4936ca894a6677300b38b84b55a957c05e Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Mon, 27 Oct 2025 11:13:11 +0800 Subject: [PATCH 1/4] [FLINK-38568][mysql-cdc] Optimize binlog split lookup using binary search --- .../debezium/reader/BinlogSplitReader.java | 15 +- .../mysql/source/utils/RecordUtils.java | 158 +++++++++++++--- .../mysql/source/utils/RecordUtilsTest.java | 175 ++++++++++++++++++ 3 files changed, 314 insertions(+), 34 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index dfc639e84c9..ddc01bf5338 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -282,13 +282,11 @@ private boolean shouldEmit(SourceRecord sourceRecord) { Object[] chunkKey = RecordUtils.getSplitKey( splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target); - for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) { - if (RecordUtils.splitKeyRangeContains( - chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd()) - && position.isAfter(splitInfo.getHighWatermark())) { - return true; - } - } + + FinishedSnapshotSplitInfo matchedSplit = + RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), chunkKey); + + return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark()); } // not in the monitored splits scope, do not emit return false; @@ -349,6 +347,9 @@ private void configureFilter() { tableIdBinlogPositionMap.put(tableId, highWatermark); } } + // Sort splits by splitStart for binary search optimization + // Binary search requires sorted data to work correctly + splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); } this.finishedSplitsInfo = splitsInfoMap; this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index e6848f1c4c8..9914def7841 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -46,7 +46,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -454,39 +453,144 @@ public static BinlogOffset getBinlogPosition(Map offset) { /** Returns the specific key contains in the split key range or not. */ public static boolean splitKeyRangeContains( Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) { - // for all range - if (splitKeyStart == null && splitKeyEnd == null) { - return true; + return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == 0; + } + + /** + * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending order. This is + * required for binary search to work correctly. + * + *

Handles special cases: - Splits with null splitStart are considered as MIN value (sorted + * to front) - Splits with null splitEnd are considered as MAX value (sorted to back) + * + *

NOTE: Current implementation assumes single-field split keys (as indicated by + * getSplitKey()). If multi-field split keys are supported in the future, the comparison logic + * should be reviewed to ensure consistency with {@link #splitKeyRangeContains(Object[],Object[],Object[])}. + * + * @param splits List of splits to be sorted (sorted in-place) + */ + public static void sortFinishedSplitInfos(List splits) { + if (splits == null || splits.size() <= 1) { + return; + } + + splits.sort( + (leftSplit, rightSplit) -> { + Object[] leftSplitStart = leftSplit.getSplitStart(); + Object[] rightSplitStart = rightSplit.getSplitStart(); + + // Splits with null splitStart should come first (they are the first split) + if (leftSplitStart == null && rightSplitStart == null) { + return 0; + } + if (leftSplitStart == null) { + return -1; + } + if (rightSplitStart == null) { + return 1; + } + + // Compare split starts + return compareSplit(leftSplitStart, rightSplitStart); + }); + } + + /** + * Uses binary search to find the split containing the specified key in a sorted split list. + * + *

IMPORTANT: The splits list MUST be sorted by splitStart before calling this method. Use + * sortFinishedSplitInfos() to sort the list if needed. + * + * @param sortedSplits List of splits sorted by splitStart (MUST be sorted!) + * @param key The chunk key to search for + * @return The split containing the key, or null if not found + */ + public static FinishedSnapshotSplitInfo findSplitByKeyBinary( + List sortedSplits, Object[] key) { + + if (sortedSplits == null || sortedSplits.isEmpty()) { + return null; } - // first split - if (splitKeyStart == null) { - int[] upperBoundRes = new int[key.length]; - for (int i = 0; i < key.length; i++) { - upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]); + + int left = 0; + int right = sortedSplits.size() - 1; + + while (left <= right) { + int mid = left + (right - left) / 2; + FinishedSnapshotSplitInfo split = sortedSplits.get(mid); + + int position = compareKeyWithSplit(key, split); + + if (position == 0) { + return split; + } else if (position < 0) { + // key < splitStart, search left + right = mid - 1; + } else { + // key >= splitEnd, search right + left = mid + 1; } - return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) - && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0); } - // last split - else if (splitKeyEnd == null) { - int[] lowerBoundRes = new int[key.length]; - for (int i = 0; i < key.length; i++) { - lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]); + + return null; + } + + /** + * Compares the position of key relative to split. + * + * @return -1: key < splitStart, 0: splitStart <= key < splitEnd, 1: key >= splitEnd + */ + private static int compareKeyWithSplit(Object[] key, FinishedSnapshotSplitInfo split) { + return compareKeyWithRange(key, split.getSplitStart(), split.getSplitEnd()); + } + + /** + * Compares the position of key relative to a split range [splitStart, splitEnd). + * + * @param key The key to compare + * @param splitStart Start boundary (inclusive), null means MIN + * @param splitEnd End boundary (exclusive), null means MAX + * @return -1: key < splitStart, 0: splitStart <= key < splitEnd, 1: key >= splitEnd + */ + private static int compareKeyWithRange(Object[] key, Object[] splitStart, Object[] splitEnd) { + if (splitStart == null) { + if (splitEnd == null) { + return 0; // Full range split } - return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0); + // key < splitEnd ? + int cmp = compareSplit(key, splitEnd); + return cmp < 0 ? 0 : 1; + } + + if (splitEnd == null) { + // key >= splitStart ? + int cmp = compareSplit(key, splitStart); + return cmp >= 0 ? 0 : -1; + } + + // Normal case: [splitStart, splitEnd) + int cmpStart = compareSplit(key, splitStart); + if (cmpStart < 0) { + return -1; // key < splitStart + } + + int cmpEnd = compareSplit(key, splitEnd); + if (cmpEnd >= 0) { + return 1; // key >= splitEnd } - // other split - else { - int[] lowerBoundRes = new int[key.length]; - int[] upperBoundRes = new int[key.length]; - for (int i = 0; i < key.length; i++) { - lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]); - upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]); + + return 0; // splitStart <= key < splitEnd + } + + private static int compareSplit(Object[] leftSplit, Object[] rightSplit) { + int compareResult = 0; + for (int i = 0; i < leftSplit.length; i++) { + compareResult = compareObjects(leftSplit[i], rightSplit[i]); + if (compareResult != 0) { + break; } - return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0) - && (Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) - && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0)); } + return compareResult; } @SuppressWarnings("unchecked") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java index 51039948964..02b2608cd9e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtilsTest.java @@ -17,12 +17,19 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; + +import io.debezium.relational.TableId; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import java.math.BigDecimal; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.findSplitByKeyBinary; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains; /** Tests for {@link org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils}. */ @@ -101,6 +108,174 @@ void testDifferentKeyTypes() { assertKeyRangeContains(new Object[] {7}, new Object[] {BigDecimal.valueOf(6)}, null); } + @Test + void testSortFinishedSplitInfos() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List emptyList = new ArrayList<>(); + RecordUtils.sortFinishedSplitInfos(emptyList); + Assertions.assertThat(emptyList).isEmpty(); + + List singleList = new ArrayList<>(); + singleList.add(createSplit(tableId, "split-1", new Object[] {100L}, new Object[] {200L})); + RecordUtils.sortFinishedSplitInfos(singleList); + Assertions.assertThat(singleList).hasSize(1); + + List splits = new ArrayList<>(); + splits.add(createSplit(tableId, "split-3", new Object[] {200L}, new Object[] {300L})); + splits.add(createSplit(tableId, "split-1", null, new Object[] {100L})); + splits.add(createSplit(tableId, "split-4", new Object[] {300L}, null)); + splits.add(createSplit(tableId, "split-2", new Object[] {100L}, new Object[] {200L})); + + RecordUtils.sortFinishedSplitInfos(splits); + + Assertions.assertThat(splits.get(0).getSplitStart()).isNull(); + Assertions.assertThat(splits.get(1).getSplitStart()).isEqualTo(new Object[] {100L}); + Assertions.assertThat(splits.get(2).getSplitStart()).isEqualTo(new Object[] {200L}); + Assertions.assertThat(splits.get(3).getSplitStart()).isEqualTo(new Object[] {300L}); + } + + @Test + void testFindSplitByKeyBinary() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List sortedSplits = new ArrayList<>(); + sortedSplits.add(createSplit(tableId, "split-0", null, new Object[] {100L})); + sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L}, new Object[] {200L})); + sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L}, new Object[] {300L})); + sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L}, null)); + + FinishedSnapshotSplitInfo result = findSplitByKeyBinary(sortedSplits, new Object[] {-1L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-0"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {100L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-1"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {150L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-1"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {200L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-2"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {250L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-2"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {300L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-3"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {1000L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-3"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {99L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-0"); + } + + @Test + void testFindSplitByKeyBinaryWithOnlyOneSplit() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List sortedSplits = new ArrayList<>(); + sortedSplits.add(createSplit(tableId, "split-0", null, null)); + + FinishedSnapshotSplitInfo result = findSplitByKeyBinary(sortedSplits, new Object[] {100L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-0"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {Long.MAX_VALUE}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-0"); + } + + @Test + void testFindSplitByKeyBinaryWithLargeNumberOfSplits() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List sortedSplits = new ArrayList<>(); + + for (int i = 0; i < 1000; i++) { + Object[] start = i == 0 ? null : new Object[] {(long) i * 10}; + Object[] end = i == 999 ? null : new Object[] {(long) (i + 1) * 10}; + sortedSplits.add(createSplit(tableId, "split-" + i, start, end)); + } + + FinishedSnapshotSplitInfo result = findSplitByKeyBinary(sortedSplits, new Object[] {5L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-0"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {505L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-50"); + + result = findSplitByKeyBinary(sortedSplits, new Object[] {9995L}); + Assertions.assertThat(result).isNotNull(); + Assertions.assertThat(result.getSplitId()).isEqualTo("split-999"); + } + + @Test + void testFindSplitByKeyBinaryEdgeCases() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List emptyList = new ArrayList<>(); + FinishedSnapshotSplitInfo result = findSplitByKeyBinary(emptyList, new Object[] {100L}); + Assertions.assertThat(result).isNull(); + + result = findSplitByKeyBinary(null, new Object[] {100L}); + Assertions.assertThat(result).isNull(); + } + + @Test + void testBinarySearchConsistencyWithLinearSearch() { + TableId tableId = new TableId("test_db", null, "test_table"); + + List sortedSplits = new ArrayList<>(); + sortedSplits.add(createSplit(tableId, "split-0", null, new Object[] {100L})); + sortedSplits.add(createSplit(tableId, "split-1", new Object[] {100L}, new Object[] {200L})); + sortedSplits.add(createSplit(tableId, "split-2", new Object[] {200L}, new Object[] {300L})); + sortedSplits.add(createSplit(tableId, "split-3", new Object[] {300L}, new Object[] {400L})); + sortedSplits.add(createSplit(tableId, "split-4", new Object[] {400L}, null)); + + for (long key = 0; key < 500; key += 10) { + Object[] keyArray = new Object[] {key}; + + FinishedSnapshotSplitInfo binaryResult = findSplitByKeyBinary(sortedSplits, keyArray); + + FinishedSnapshotSplitInfo linearResult = null; + for (FinishedSnapshotSplitInfo split : sortedSplits) { + if (splitKeyRangeContains(keyArray, split.getSplitStart(), split.getSplitEnd())) { + linearResult = split; + break; + } + } + + if (binaryResult == null) { + Assertions.assertThat(linearResult) + .as("Key %d should not be in any split", key) + .isNull(); + } else { + Assertions.assertThat(linearResult) + .as("Key %d should be found by both methods", key) + .isNotNull(); + Assertions.assertThat(binaryResult.getSplitId()) + .as("Both methods should find the same split for key %d", key) + .isEqualTo(linearResult.getSplitId()); + } + } + } + + private FinishedSnapshotSplitInfo createSplit( + TableId tableId, String splitId, Object[] splitStart, Object[] splitEnd) { + return new FinishedSnapshotSplitInfo( + tableId, splitId, splitStart, splitEnd, BinlogOffset.ofEarliest()); + } + private void assertKeyRangeContains( Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) { Assertions.assertThat(splitKeyRangeContains(key, splitKeyStart, splitKeyEnd)).isTrue(); From a0726d15411ebd1b1435e2874cd14f53be6ebbfd Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Mon, 27 Oct 2025 14:58:41 +0800 Subject: [PATCH 2/4] spotless apply --- .../flink/cdc/connectors/mysql/source/utils/RecordUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 9914def7841..9b7bcce085e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -465,7 +465,8 @@ public static boolean splitKeyRangeContains( * *

NOTE: Current implementation assumes single-field split keys (as indicated by * getSplitKey()). If multi-field split keys are supported in the future, the comparison logic - * should be reviewed to ensure consistency with {@link #splitKeyRangeContains(Object[],Object[],Object[])}. + * should be reviewed to ensure consistency with {@link + * #splitKeyRangeContains(Object[],Object[],Object[])}. * * @param splits List of splits to be sorted (sorted in-place) */ From 16443fc9517b07073e9f8b4f256429b2a26d83c4 Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Mon, 27 Oct 2025 17:05:41 +0800 Subject: [PATCH 3/4] trigger actions From 1790d5de5a224a9d78d58681d85639200d46f14d Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Mon, 27 Oct 2025 17:05:55 +0800 Subject: [PATCH 4/4] verification --- .../cdc/connectors/mysql/source/utils/RecordUtils.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 9b7bcce085e..6b346749906 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -584,6 +584,14 @@ private static int compareKeyWithRange(Object[] key, Object[] splitStart, Object } private static int compareSplit(Object[] leftSplit, Object[] rightSplit) { + // Ensure both splits have the same length + if (leftSplit.length != rightSplit.length) { + throw new IllegalArgumentException( + String.format( + "Split key arrays must have the same length. Left: %d, Right: %d", + leftSplit.length, rightSplit.length)); + } + int compareResult = 0; for (int i = 0; i < leftSplit.length; i++) { compareResult = compareObjects(leftSplit[i], rightSplit[i]);