Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,11 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
Object[] chunkKey =
RecordUtils.getSplitKey(
splitKeyType, statefulTaskContext.getSchemaNameAdjuster(), target);
for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(
chunkKey, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
&& position.isAfter(splitInfo.getHighWatermark())) {
return true;
}
}

FinishedSnapshotSplitInfo matchedSplit =
RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId), chunkKey);

return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark());
}
// not in the monitored splits scope, do not emit
return false;
Expand Down Expand Up @@ -349,6 +347,9 @@ private void configureFilter() {
tableIdBinlogPositionMap.put(tableId, highWatermark);
}
}
// Sort splits by splitStart for binary search optimization
// Binary search requires sorted data to work correctly
splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos);
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -454,39 +453,153 @@ public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
/** Returns the specific key contains in the split key range or not. */
public static boolean splitKeyRangeContains(
Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
// for all range
if (splitKeyStart == null && splitKeyEnd == null) {
return true;
return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == 0;
}

/**
* Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending order. This is
* required for binary search to work correctly.
*
* <p>Handles special cases: - Splits with null splitStart are considered as MIN value (sorted
* to front) - Splits with null splitEnd are considered as MAX value (sorted to back)
*
* <p>NOTE: Current implementation assumes single-field split keys (as indicated by
* getSplitKey()). If multi-field split keys are supported in the future, the comparison logic
* should be reviewed to ensure consistency with {@link
* #splitKeyRangeContains(Object[],Object[],Object[])}.
*
* @param splits List of splits to be sorted (sorted in-place)
*/
public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo> splits) {
if (splits == null || splits.size() <= 1) {
return;
}
// first split
if (splitKeyStart == null) {
int[] upperBoundRes = new int[key.length];
for (int i = 0; i < key.length; i++) {
upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);

splits.sort(
(leftSplit, rightSplit) -> {
Object[] leftSplitStart = leftSplit.getSplitStart();
Object[] rightSplitStart = rightSplit.getSplitStart();

// Splits with null splitStart should come first (they are the first split)
if (leftSplitStart == null && rightSplitStart == null) {
return 0;
}
if (leftSplitStart == null) {
return -1;
}
if (rightSplitStart == null) {
return 1;
}

// Compare split starts
return compareSplit(leftSplitStart, rightSplitStart);
});
}

/**
* Uses binary search to find the split containing the specified key in a sorted split list.
*
* <p>IMPORTANT: The splits list MUST be sorted by splitStart before calling this method. Use
* sortFinishedSplitInfos() to sort the list if needed.
*
* @param sortedSplits List of splits sorted by splitStart (MUST be sorted!)
* @param key The chunk key to search for
* @return The split containing the key, or null if not found
*/
public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {

if (sortedSplits == null || sortedSplits.isEmpty()) {
return null;
}

int left = 0;
int right = sortedSplits.size() - 1;

while (left <= right) {
int mid = left + (right - left) / 2;
FinishedSnapshotSplitInfo split = sortedSplits.get(mid);

int position = compareKeyWithSplit(key, split);

if (position == 0) {
return split;
} else if (position < 0) {
// key < splitStart, search left
right = mid - 1;
} else {
// key >= splitEnd, search right
left = mid + 1;
}
return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
&& Arrays.stream(upperBoundRes).allMatch(value -> value <= 0);
}
// last split
else if (splitKeyEnd == null) {
int[] lowerBoundRes = new int[key.length];
for (int i = 0; i < key.length; i++) {
lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);

return null;
}

/**
* Compares the position of key relative to split.
*
* @return -1: key < splitStart, 0: splitStart <= key < splitEnd, 1: key >= splitEnd
*/
private static int compareKeyWithSplit(Object[] key, FinishedSnapshotSplitInfo split) {
return compareKeyWithRange(key, split.getSplitStart(), split.getSplitEnd());
}

/**
* Compares the position of key relative to a split range [splitStart, splitEnd).
*
* @param key The key to compare
* @param splitStart Start boundary (inclusive), null means MIN
* @param splitEnd End boundary (exclusive), null means MAX
* @return -1: key < splitStart, 0: splitStart <= key < splitEnd, 1: key >= splitEnd
*/
private static int compareKeyWithRange(Object[] key, Object[] splitStart, Object[] splitEnd) {
if (splitStart == null) {
if (splitEnd == null) {
return 0; // Full range split
}
return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
// key < splitEnd ?
int cmp = compareSplit(key, splitEnd);
return cmp < 0 ? 0 : 1;
}

if (splitEnd == null) {
// key >= splitStart ?
int cmp = compareSplit(key, splitStart);
return cmp >= 0 ? 0 : -1;
}

// Normal case: [splitStart, splitEnd)
int cmpStart = compareSplit(key, splitStart);
if (cmpStart < 0) {
return -1; // key < splitStart
}

int cmpEnd = compareSplit(key, splitEnd);
if (cmpEnd >= 0) {
return 1; // key >= splitEnd
}

return 0; // splitStart <= key < splitEnd
}

private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
// Ensure both splits have the same length
if (leftSplit.length != rightSplit.length) {
throw new IllegalArgumentException(
String.format(
"Split key arrays must have the same length. Left: %d, Right: %d",
leftSplit.length, rightSplit.length));
}
// other split
else {
int[] lowerBoundRes = new int[key.length];
int[] upperBoundRes = new int[key.length];
for (int i = 0; i < key.length; i++) {
lowerBoundRes[i] = compareObjects(key[i], splitKeyStart[i]);
upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);

int compareResult = 0;
for (int i = 0; i < leftSplit.length; i++) {
compareResult = compareObjects(leftSplit[i], rightSplit[i]);
if (compareResult != 0) {
break;
}
return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0)
&& (Arrays.stream(upperBoundRes).anyMatch(value -> value < 0)
&& Arrays.stream(upperBoundRes).allMatch(value -> value <= 0));
}
return compareResult;
}

@SuppressWarnings("unchecked")
Expand Down
Loading
Loading