Skip to content

Commit eb70b2d

Browse files
committed
fix invalid partial update for pk table gh issue apache#2843
fix invalid partial update for pk table gh issue apache#2843
1 parent 0e265ad commit eb70b2d

File tree

4 files changed

+102
-1
lines changed

4 files changed

+102
-1
lines changed

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,13 @@ private long processUpsert(
562562

563563
byte[] oldValueBytes = getFromBufferOrKv(key);
564564
if (oldValueBytes == null) {
565+
// For partial updates on first insert, we need to merge to null out
566+
// non-target columns. Without this, the full row (including non-target
567+
// column values) would be stored as-is.
568+
BinaryValue valueToInsert = currentMerger.mergeInsert(currentValue);
565569
return applyInsert(
566570
key,
567-
currentValue,
571+
valueToInsert,
568572
walBuilder,
569573
latestSchemaRow,
570574
logOffset,

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/DefaultRowMerger.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public BinaryValue merge(BinaryValue oldValue, BinaryValue newValue) {
102102
return partialUpdater.updateRow(oldValue, newValue);
103103
}
104104

105+
@Override
106+
public BinaryValue mergeInsert(BinaryValue newValue) {
107+
return partialUpdater.updateRow(null, newValue);
108+
}
109+
105110
@Nullable
106111
@Override
107112
public BinaryValue delete(BinaryValue oldRow) {

fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/RowMerger.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ public interface RowMerger {
4343
*/
4444
BinaryValue merge(BinaryValue oldValue, BinaryValue newValue);
4545

46+
/**
47+
* Process a new value being inserted for the first time (no existing row). Most mergers simply
48+
* return the new value unchanged. Partial update mergers override this to null out non-target
49+
* columns.
50+
*
51+
* @param newValue the new row being inserted
52+
* @return the value to insert
53+
*/
54+
default BinaryValue mergeInsert(BinaryValue newValue) {
55+
return newValue;
56+
}
57+
4658
/**
4759
* Merge the old row with a delete row.
4860
*

fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,86 @@ void testPartialUpdateAndDelete() throws Exception {
463463
checkEqual(actualLogRecords, expectedLogs, rowType);
464464
}
465465

466+
@Test
467+
void testPartialUpdateFirstInsertNullsNonTargetColumns() throws Exception {
468+
initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>());
469+
RowType rowType = DATA2_SCHEMA.getRowType();
470+
KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
471+
KvRecordTestUtils.KvRecordFactory.of(rowType);
472+
473+
// Bug reproduction: partial update with targetColumns={0,1} (a, b) but the row
474+
// contains non-null values for ALL columns including non-target column c.
475+
// On first insert (no existing row), non-target columns should be set to null.
476+
KvRecordBatch kvRecordBatch =
477+
kvRecordBatchFactory.ofRecords(
478+
data2kvRecordFactory.ofRecord(
479+
"k1".getBytes(), new Object[] {1, "v1", "should_be_null"}));
480+
481+
int[] targetColumns = new int[] {0, 1};
482+
kvTablet.putAsLeader(kvRecordBatch, targetColumns);
483+
484+
// The stored value should have column c (index 2) set to null,
485+
// NOT "should_be_null", because c is not in targetColumns.
486+
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
487+
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null})));
488+
489+
// Also verify CDC log emits the correct row with null for non-target column
490+
LogRecords actualLogRecords = readLogRecords();
491+
List<MemoryLogRecords> expectedLogs =
492+
Collections.singletonList(
493+
logRecords(
494+
rowType,
495+
0,
496+
Collections.singletonList(ChangeType.INSERT),
497+
Collections.singletonList(new Object[] {1, "v1", null})));
498+
checkEqual(actualLogRecords, expectedLogs, rowType);
499+
}
500+
501+
@Test
502+
void testPartialUpdateFirstInsertThenUpdate() throws Exception {
503+
initLogTabletAndKvTablet(DATA2_SCHEMA, new HashMap<>());
504+
RowType rowType = DATA2_SCHEMA.getRowType();
505+
KvRecordTestUtils.KvRecordFactory data2kvRecordFactory =
506+
KvRecordTestUtils.KvRecordFactory.of(rowType);
507+
508+
// First insert: partial update columns a and b, column c should be null
509+
KvRecordBatch batch1 =
510+
kvRecordBatchFactory.ofRecords(
511+
data2kvRecordFactory.ofRecord(
512+
"k1".getBytes(), new Object[] {1, "v1", "ignored"}));
513+
kvTablet.putAsLeader(batch1, new int[] {0, 1});
514+
long endOffset = logTablet.localLogEndOffset();
515+
516+
// Verify first insert stored correctly with null for non-target column
517+
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
518+
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", null})));
519+
520+
// Second update: partial update columns a and c, column b should retain "v1"
521+
KvRecordBatch batch2 =
522+
kvRecordBatchFactory.ofRecords(
523+
data2kvRecordFactory.ofRecord(
524+
"k1".getBytes(), new Object[] {1, "ignored2", "c1"}));
525+
kvTablet.putAsLeader(batch2, new int[] {0, 2});
526+
527+
// Verify: b should retain "v1" from first insert, c should be updated to "c1"
528+
assertThat(kvTablet.getKvPreWriteBuffer().get(Key.of("k1".getBytes())))
529+
.isEqualTo(valueOf(compactedRow(rowType, new Object[] {1, "v1", "c1"})));
530+
531+
// Verify CDC log for the second update
532+
LogRecords actualLogRecords = readLogRecords(endOffset);
533+
List<MemoryLogRecords> expectedLogs =
534+
Collections.singletonList(
535+
logRecords(
536+
rowType,
537+
endOffset,
538+
Arrays.asList(
539+
ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER),
540+
Arrays.asList(
541+
new Object[] {1, "v1", null},
542+
new Object[] {1, "v1", "c1"})));
543+
checkEqual(actualLogRecords, expectedLogs, rowType);
544+
}
545+
466546
@Test
467547
void testPutWithMultiThread() throws Exception {
468548
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());

0 commit comments

Comments
 (0)