Skip to content

Commit 3ae1414

Browse files
authored
[core] [fix] Partial update should not return empty row with non-null column type exist (#5077)
This closes #5077.
1 parent 8abbc5e commit 3ae1414

File tree

4 files changed

+350
-62
lines changed

4 files changed

+350
-62
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.paimon.types.DataType;
3232
import org.apache.paimon.types.RowKind;
3333
import org.apache.paimon.types.RowType;
34+
import org.apache.paimon.utils.ArrayUtils;
3435
import org.apache.paimon.utils.FieldsComparator;
3536
import org.apache.paimon.utils.Preconditions;
3637
import org.apache.paimon.utils.Projection;
@@ -73,12 +74,20 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
7374
private final Map<Integer, FieldAggregator> fieldAggregators;
7475
private final boolean removeRecordOnDelete;
7576
private final Set<Integer> sequenceGroupPartialDelete;
77+
private final boolean[] nullables;
7678

7779
private InternalRow currentKey;
7880
private long latestSequenceNumber;
7981
private GenericRow row;
8082
private KeyValue reused;
8183
private boolean currentDeleteRow;
84+
private boolean notNullColumnFilled;
85+
/**
86+
* If the first value is retract, and no insert record is received, the row kind should be
87+
* RowKind.DELETE. (Partial update sequence group may not correctly set currentDeleteRow if no
88+
* RowKind.INSERT value is received)
89+
*/
90+
private boolean meetInsert;
8291

8392
protected PartialUpdateMergeFunction(
8493
InternalRow.FieldGetter[] getters,
@@ -87,19 +96,23 @@ protected PartialUpdateMergeFunction(
8796
Map<Integer, FieldAggregator> fieldAggregators,
8897
boolean fieldSequenceEnabled,
8998
boolean removeRecordOnDelete,
90-
Set<Integer> sequenceGroupPartialDelete) {
99+
Set<Integer> sequenceGroupPartialDelete,
100+
boolean[] nullables) {
91101
this.getters = getters;
92102
this.ignoreDelete = ignoreDelete;
93103
this.fieldSeqComparators = fieldSeqComparators;
94104
this.fieldAggregators = fieldAggregators;
95105
this.fieldSequenceEnabled = fieldSequenceEnabled;
96106
this.removeRecordOnDelete = removeRecordOnDelete;
97107
this.sequenceGroupPartialDelete = sequenceGroupPartialDelete;
108+
this.nullables = nullables;
98109
}
99110

100111
@Override
101112
public void reset() {
102113
this.currentKey = null;
114+
this.meetInsert = false;
115+
this.notNullColumnFilled = false;
103116
this.row = new GenericRow(getters.length);
104117
fieldAggregators.values().forEach(FieldAggregator::reset);
105118
}
@@ -109,14 +122,21 @@ public void add(KeyValue kv) {
109122
// refresh key object to avoid reference overwritten
110123
currentKey = kv.key();
111124
currentDeleteRow = false;
112-
113125
if (kv.valueKind().isRetract()) {
126+
127+
if (!notNullColumnFilled) {
128+
initRow(row, kv.value());
129+
notNullColumnFilled = true;
130+
}
131+
114132
// In 0.7- versions, the delete records might be written into data file even when
115133
// ignore-delete configured, so ignoreDelete still needs to be checked
116134
if (ignoreDelete) {
117135
return;
118136
}
119137

138+
latestSequenceNumber = kv.sequenceNumber();
139+
120140
if (fieldSequenceEnabled) {
121141
retractWithSequenceGroup(kv);
122142
return;
@@ -126,6 +146,7 @@ public void add(KeyValue kv) {
126146
if (kv.valueKind() == RowKind.DELETE) {
127147
currentDeleteRow = true;
128148
row = new GenericRow(getters.length);
149+
initRow(row, kv.value());
129150
}
130151
return;
131152
}
@@ -148,13 +169,19 @@ public void add(KeyValue kv) {
148169
} else {
149170
updateWithSequenceGroup(kv);
150171
}
172+
meetInsert = true;
173+
notNullColumnFilled = true;
151174
}
152175

153176
private void updateNonNullFields(KeyValue kv) {
154177
for (int i = 0; i < getters.length; i++) {
155178
Object field = getters[i].getFieldOrNull(kv.value());
156179
if (field != null) {
157180
row.setField(i, field);
181+
} else {
182+
if (!nullables[i]) {
183+
throw new IllegalArgumentException("Field " + i + " can not be null");
184+
}
158185
}
159186
}
160187
}
@@ -232,6 +259,7 @@ private void retractWithSequenceGroup(KeyValue kv) {
232259
&& sequenceGroupPartialDelete.contains(field)) {
233260
currentDeleteRow = true;
234261
row = new GenericRow(getters.length);
262+
initRow(row, kv.value());
235263
return;
236264
} else {
237265
row.setField(field, getters[field].getFieldOrNull(kv.value()));
@@ -263,13 +291,26 @@ private void retractWithSequenceGroup(KeyValue kv) {
263291
}
264292
}
265293

294+
private void initRow(GenericRow row, InternalRow value) {
295+
for (int i = 0; i < getters.length; i++) {
296+
Object field = getters[i].getFieldOrNull(value);
297+
if (!nullables[i]) {
298+
if (field != null) {
299+
row.setField(i, field);
300+
} else {
301+
throw new IllegalArgumentException("Field " + i + " can not be null");
302+
}
303+
}
304+
}
305+
}
306+
266307
@Override
267308
public KeyValue getResult() {
268309
if (reused == null) {
269310
reused = new KeyValue();
270311
}
271312

272-
RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;
313+
RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;
273314
return reused.replace(currentKey, latestSequenceNumber, rowKind, row);
274315
}
275316

@@ -442,14 +483,19 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
442483
}
443484
}
444485

486+
List<DataType> projectedTypes = Projection.of(projection).project(tableTypes);
445487
return new PartialUpdateMergeFunction(
446-
createFieldGetters(Projection.of(projection).project(tableTypes)),
488+
createFieldGetters(projectedTypes),
447489
ignoreDelete,
448490
projectedSeqComparators,
449491
projectedAggregators,
450492
!fieldSeqComparators.isEmpty(),
451493
removeRecordOnDelete,
452-
sequenceGroupPartialDelete);
494+
sequenceGroupPartialDelete,
495+
ArrayUtils.toPrimitiveBoolean(
496+
projectedTypes.stream()
497+
.map(DataType::isNullable)
498+
.toArray(Boolean[]::new)));
453499
} else {
454500
Map<Integer, FieldsComparator> fieldSeqComparators = new HashMap<>();
455501
this.fieldSeqComparators.forEach(
@@ -464,7 +510,11 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
464510
fieldAggregators,
465511
!fieldSeqComparators.isEmpty(),
466512
removeRecordOnDelete,
467-
sequenceGroupPartialDelete);
513+
sequenceGroupPartialDelete,
514+
ArrayUtils.toPrimitiveBoolean(
515+
rowType.getFieldTypes().stream()
516+
.map(DataType::isNullable)
517+
.toArray(Boolean[]::new)));
468518
}
469519
}
470520

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,31 @@ public void testAggregationWithoutSequenceGroup() {
857857
"Must use sequence group for aggregation functions but not found for field f1."));
858858
}
859859

860+
@Test
861+
public void testDeleteReproduceCorrectSequenceNumber() {
862+
Options options = new Options();
863+
options.set("partial-update.remove-record-on-delete", "true");
864+
RowType rowType =
865+
RowType.of(
866+
DataTypes.INT(),
867+
DataTypes.INT(),
868+
DataTypes.INT(),
869+
DataTypes.INT(),
870+
DataTypes.INT());
871+
872+
MergeFunctionFactory<KeyValue> factory =
873+
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));
874+
875+
MergeFunction<KeyValue> func = factory.create();
876+
877+
func.reset();
878+
879+
add(func, RowKind.INSERT, 1, 1, 1, 1, 1);
880+
add(func, RowKind.DELETE, 1, 1, 1, 1, 1);
881+
882+
assertThat(func.getResult().sequenceNumber()).isEqualTo(1);
883+
}
884+
860885
private void add(MergeFunction<KeyValue> function, Integer... f) {
861886
add(function, RowKind.INSERT, f);
862887
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.table;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.CatalogContext;
23+
import org.apache.paimon.catalog.CatalogFactory;
24+
import org.apache.paimon.catalog.Identifier;
25+
import org.apache.paimon.data.BinaryString;
26+
import org.apache.paimon.data.GenericRow;
27+
import org.apache.paimon.data.InternalRow;
28+
import org.apache.paimon.disk.IOManagerImpl;
29+
import org.apache.paimon.fs.Path;
30+
import org.apache.paimon.options.CatalogOptions;
31+
import org.apache.paimon.options.Options;
32+
import org.apache.paimon.reader.RecordReader;
33+
import org.apache.paimon.schema.Schema;
34+
import org.apache.paimon.table.sink.StreamTableCommit;
35+
import org.apache.paimon.table.sink.StreamTableWrite;
36+
import org.apache.paimon.table.sink.StreamWriteBuilder;
37+
import org.apache.paimon.table.source.ReadBuilder;
38+
import org.apache.paimon.table.source.TableScan;
39+
import org.apache.paimon.types.DataTypes;
40+
import org.apache.paimon.types.RowKind;
41+
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Test;
44+
import org.junit.jupiter.api.io.TempDir;
45+
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
48+
import static org.apache.paimon.types.DataTypesTest.assertThat;
49+
50+
/** Test partial update table. */
51+
public class PartialUpdateTableTest {
52+
53+
@TempDir public static java.nio.file.Path tempDir;
54+
private Catalog catalog;
55+
private final Identifier identifier = Identifier.create("my_db", "my_table");
56+
57+
@BeforeEach
58+
public void before() throws Exception {
59+
Options options = new Options();
60+
options.set(CatalogOptions.WAREHOUSE, new Path(path()).toUri().toString());
61+
catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
62+
catalog.createDatabase("my_db", true);
63+
catalog.createTable(identifier, schema(), true);
64+
}
65+
66+
private String path() {
67+
return tempDir.toString() + "/" + PartialUpdateTableTest.class.getSimpleName();
68+
}
69+
70+
private static Schema schema() {
71+
Schema.Builder schemaBuilder = Schema.newBuilder();
72+
schemaBuilder.column("biz_no", DataTypes.INT());
73+
schemaBuilder.column("customer_id", DataTypes.STRING());
74+
schemaBuilder.column("payable_amount", DataTypes.INT());
75+
schemaBuilder.column("g1", DataTypes.INT());
76+
schemaBuilder.primaryKey("biz_no");
77+
schemaBuilder.option("bucket", "1");
78+
schemaBuilder.option("file.format", "parquet");
79+
schemaBuilder.option("merge-engine", "partial-update");
80+
schemaBuilder.option("fields.g1.sequence-group", "payable_amount");
81+
schemaBuilder.option("fields.payable_amount.aggregation-function", "sum");
82+
schemaBuilder.option("deletion-vectors.enabled", "true");
83+
schemaBuilder.option("write-buffer-spillable", "true");
84+
return schemaBuilder.build();
85+
}
86+
87+
@Test
88+
public void testWriteDeleteRecordWithNoInsertData() throws Exception {
89+
Table table = catalog.getTable(identifier);
90+
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
91+
try (StreamTableCommit commit = writeBuilder.newCommit();
92+
StreamTableWrite write = writeBuilder.newWrite()) {
93+
write.withIOManager(new IOManagerImpl(tempDir.toString()));
94+
for (int snapshotId = 0; snapshotId < 100; snapshotId++) {
95+
int bizNo = snapshotId;
96+
String customerId = String.valueOf(snapshotId);
97+
int payableAmount = 1;
98+
int g1 = 1;
99+
write.write(
100+
GenericRow.ofKind(
101+
snapshotId == 0 || snapshotId == 10
102+
? RowKind.DELETE
103+
: RowKind.INSERT,
104+
bizNo,
105+
BinaryString.fromString(customerId),
106+
payableAmount,
107+
g1));
108+
commit.commit(snapshotId, write.prepareCommit(true, snapshotId));
109+
}
110+
}
111+
112+
ReadBuilder builder = table.newReadBuilder();
113+
TableScan scan = builder.newScan();
114+
TableScan.Plan plan = scan.plan();
115+
116+
AtomicInteger i = new AtomicInteger(0);
117+
try (RecordReader<InternalRow> reader = builder.newRead().createReader(plan)) {
118+
reader.forEachRemaining(
119+
row -> {
120+
if (i.get() == 0 || i.get() == 10) {
121+
i.incrementAndGet();
122+
}
123+
int index = i.get();
124+
assertThat(row.getInt(0)).isEqualTo(index);
125+
assertThat(row.getString(1).toString()).isEqualTo(String.valueOf(index));
126+
assertThat(row.getInt(2)).isEqualTo(1);
127+
assertThat(row.getInt(3)).isEqualTo(1);
128+
i.incrementAndGet();
129+
});
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)