Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class BinlogDeserializationSchema implements FlussDeserializationSchema<R
*/
private transient BinlogRowConverter converter;

/** The current split ID used for per-split UPDATE_BEFORE/UPDATE_AFTER pairing. */
private transient String currentSplitId;

/** Creates a new BinlogDeserializationSchema. */
public BinlogDeserializationSchema() {}

Expand All @@ -57,6 +60,15 @@ public void open(InitializationContext context) throws Exception {
}
}

/**
* Sets the current split ID for per-split UPDATE_BEFORE/UPDATE_AFTER pairing. Must be called
* before {@link #deserialize(LogRecord)} when records from multiple splits are interleaved
* through the same deserializer instance.
*/
public void setCurrentSplitId(String splitId) {
this.currentSplitId = splitId;
}

/**
* Deserializes a {@link LogRecord} into a Flink {@link RowData} object with nested before/after
* structure.
Expand All @@ -68,7 +80,7 @@ public RowData deserialize(LogRecord record) throws Exception {
throw new IllegalStateException(
"Converter not initialized. The open() method must be called before deserializing records.");
}
return converter.toBinlogRowData(record);
return converter.toBinlogRowData(record, currentSplitId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.flink.lake.LakeRecordRecordEmitter;
import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema;
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
import org.apache.fluss.flink.source.reader.FlinkSourceReader;
import org.apache.fluss.flink.source.reader.RecordAndPos;
Expand Down Expand Up @@ -73,6 +74,13 @@ public void emitRecord(
}
processAndEmitRecord(scanRecord, sourceOutput);
} else if (splitState.isLogSplitState()) {
// Set split context for BinlogDeserializationSchema to ensure per-split
// UPDATE_BEFORE/UPDATE_AFTER pairing when multiple bucket splits are
// processed by the same source reader.
if (deserializationSchema instanceof BinlogDeserializationSchema) {
((BinlogDeserializationSchema) deserializationSchema)
.setCurrentSplitId(splitState.splitId());
}
// Attempt to process and emit the record.
// For $binlog, this returns true only when a complete row (or the final part of
// a split) is emitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public final LogSplitState asLogSplitState() {
return (LogSplitState) this;
}

/** Returns the split ID without creating a new split object. */
public String splitId() {
return split.splitId();
}

public abstract SourceSplitBase toSourceSplit();

public boolean isLakeSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A converter that transforms Fluss's {@link LogRecord} to Flink's {@link RowData} with nested
Expand All @@ -46,10 +48,13 @@ public class BinlogRowConverter implements RecordToFlinkRowConverter {
private final org.apache.flink.table.types.logical.RowType producedType;

/**
* Buffer for the UPDATE_BEFORE (-U) record pending merge with the next UPDATE_AFTER (+U)
* record. Null when no update is in progress.
* Per-split buffer for UPDATE_BEFORE (-U) records pending merge with the next UPDATE_AFTER (+U)
* record. Keyed by split ID to prevent cross-split state corruption when multiple bucket splits
* are processed by the same source reader.
*/
@Nullable private LogRecord pendingUpdateBefore;
private final Map<String, LogRecord> pendingUpdateBeforeMap = new HashMap<>();

private static final String DEFAULT_SPLIT_ID = "__default__";

/** Creates a new BinlogRowConverter. */
public BinlogRowConverter(RowType rowType) {
Expand All @@ -60,6 +65,15 @@ public BinlogRowConverter(RowType rowType) {
/** Converts a LogRecord to a binlog RowData with nested before/after structure. */
@Nullable
public RowData toBinlogRowData(LogRecord record) {
return toBinlogRowData(record, DEFAULT_SPLIT_ID);
}

/**
* Converts a LogRecord to a binlog RowData with nested before/after structure, using a
* split-specific buffer for UPDATE_BEFORE/UPDATE_AFTER pairing.
*/
@Nullable
public RowData toBinlogRowData(LogRecord record, String splitId) {
ChangeType changeType = record.getChangeType();

switch (changeType) {
Expand All @@ -72,13 +86,14 @@ public RowData toBinlogRowData(LogRecord record) {
baseConverter.toFlinkRowData(record.getRow()));

case UPDATE_BEFORE:
// Buffer the -U record and return null.
// Buffer the -U record per split and return null.
// FlinkRecordEmitter.processAndEmitRecord() skips null results.
this.pendingUpdateBefore = record;
pendingUpdateBeforeMap.put(splitId, record);
return null;

case UPDATE_AFTER:
// Merge with the buffered -U record
// Merge with the buffered -U record for this split
LogRecord pendingUpdateBefore = pendingUpdateBeforeMap.remove(splitId);
if (pendingUpdateBefore == null) {
throw new IllegalStateException(
"Received UPDATE_AFTER (+U) without a preceding UPDATE_BEFORE (-U) record. "
Expand All @@ -89,7 +104,6 @@ public RowData toBinlogRowData(LogRecord record) {
// Use offset and timestamp from the -U record (first entry of update pair)
long offset = pendingUpdateBefore.logOffset();
long timestamp = pendingUpdateBefore.timestamp();
pendingUpdateBefore = null;
return buildBinlogRow("update", offset, timestamp, beforeRow, afterRow);

case DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,52 @@ void testMultipleUpdatesInSequence() throws Exception {
assertThat(result2.getLong(1)).isEqualTo(20L); // offset from second -U
}

@Test
void testCrossSplitInterleavingDoesNotCorrupt() throws Exception {
String splitA = "log-1-0";
String splitB = "log-1-1";

// Split A: send -U (buffered per split A)
RowData resultA1 =
converter.toBinlogRowData(
createLogRecord(ChangeType.UPDATE_BEFORE, 100L, 1000L, 1, "A-old", 100L),
splitA);
assertThat(resultA1).isNull();

// Split B: send -U then +U (should pair correctly within split B)
RowData resultB1 =
converter.toBinlogRowData(
createLogRecord(ChangeType.UPDATE_BEFORE, 200L, 2000L, 2, "B-old", 200L),
splitB);
assertThat(resultB1).isNull();

RowData resultB2 =
converter.toBinlogRowData(
createLogRecord(ChangeType.UPDATE_AFTER, 201L, 2000L, 2, "B-new", 300L),
splitB);
assertThat(resultB2).isNotNull();
assertThat(resultB2.getString(0)).isEqualTo(StringData.fromString("update"));
// Verify it paired with split B's -U
RowData beforeRowB = resultB2.getRow(3, 3);
assertThat(beforeRowB.getString(1).toString()).isEqualTo("B-old");
RowData afterRowB = resultB2.getRow(4, 3);
assertThat(afterRowB.getString(1).toString()).isEqualTo("B-new");

// Split A: send +U (should pair with split A's buffered -U, not corrupted by split B)
RowData resultA2 =
converter.toBinlogRowData(
createLogRecord(ChangeType.UPDATE_AFTER, 101L, 1000L, 1, "A-new", 150L),
splitA);
assertThat(resultA2).isNotNull();
assertThat(resultA2.getString(0)).isEqualTo(StringData.fromString("update"));
assertThat(resultA2.getLong(1)).isEqualTo(100L); // offset from split A's -U
// Verify it paired with split A's -U
RowData beforeRowA = resultA2.getRow(3, 3);
assertThat(beforeRowA.getString(1).toString()).isEqualTo("A-old");
RowData afterRowA = resultA2.getRow(4, 3);
assertThat(afterRowA.getString(1).toString()).isEqualTo("A-new");
}

private LogRecord createLogRecord(
ChangeType changeType, long offset, long timestamp, int id, String name, long amount)
throws Exception {
Expand Down
Loading