Skip to content

Commit 9349579

Browse files
authored
[core] Fix aggregate delete bug and refactor SortBufferWriteBufferTestBase (#5414)
1 parent d31adc8 commit 9349579

File tree

7 files changed

+144
-60
lines changed

7 files changed

+144
-60
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public class AggregateMergeFunction implements MergeFunction<KeyValue> {
5757
private KeyValue reused;
5858
private boolean currentDeleteRow;
5959
private final boolean removeRecordOnDelete;
60-
private boolean notNullColumnFilled;
6160

6261
public AggregateMergeFunction(
6362
InternalRow.FieldGetter[] getters,
@@ -73,7 +72,6 @@ public AggregateMergeFunction(
7372
@Override
7473
public void reset() {
7574
this.latestKv = null;
76-
this.notNullColumnFilled = false;
7775
this.row = new GenericRow(getters.length);
7876
Arrays.stream(aggregators).forEach(FieldAggregator::reset);
7977
this.currentDeleteRow = false;
@@ -82,18 +80,15 @@ public void reset() {
8280
@Override
8381
public void add(KeyValue kv) {
8482
latestKv = kv;
85-
boolean isRetract =
86-
kv.valueKind() != RowKind.INSERT && kv.valueKind() != RowKind.UPDATE_AFTER;
8783

88-
currentDeleteRow = removeRecordOnDelete && isRetract;
84+
currentDeleteRow = removeRecordOnDelete && kv.valueKind() == RowKind.DELETE;
8985
if (currentDeleteRow) {
90-
if (!notNullColumnFilled) {
91-
initRow(row, kv.value());
92-
notNullColumnFilled = true;
93-
}
86+
row = new GenericRow(getters.length);
87+
initRow(row, kv.value());
9488
return;
9589
}
9690

91+
boolean isRetract = kv.valueKind().isRetract();
9792
for (int i = 0; i < getters.length; i++) {
9893
FieldAggregator fieldAggregator = aggregators[i];
9994
Object accumulator = getters[i].getFieldOrNull(row);
@@ -104,7 +99,6 @@ public void add(KeyValue kv) {
10499
: fieldAggregator.agg(accumulator, inputField);
105100
row.setField(i, mergedField);
106101
}
107-
notNullColumnFilled = true;
108102
}
109103

110104
private void initRow(GenericRow row, InternalRow value) {
@@ -140,42 +134,42 @@ public boolean requireCopy() {
140134

141135
public static MergeFunctionFactory<KeyValue> factory(
142136
Options conf,
143-
List<String> tableNames,
144-
List<DataType> tableTypes,
137+
List<String> fieldNames,
138+
List<DataType> fieldTypes,
145139
List<String> primaryKeys) {
146-
return new Factory(conf, tableNames, tableTypes, primaryKeys);
140+
return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
147141
}
148142

149143
private static class Factory implements MergeFunctionFactory<KeyValue> {
150144

151145
private static final long serialVersionUID = 1L;
152146

153147
private final CoreOptions options;
154-
private final List<String> tableNames;
155-
private final List<DataType> tableTypes;
148+
private final List<String> fieldNames;
149+
private final List<DataType> fieldTypes;
156150
private final List<String> primaryKeys;
157151
private final boolean removeRecordOnDelete;
158152

159153
private Factory(
160154
Options conf,
161-
List<String> tableNames,
162-
List<DataType> tableTypes,
155+
List<String> fieldNames,
156+
List<DataType> fieldTypes,
163157
List<String> primaryKeys) {
164158
this.options = new CoreOptions(conf);
165-
this.tableNames = tableNames;
166-
this.tableTypes = tableTypes;
159+
this.fieldNames = fieldNames;
160+
this.fieldTypes = fieldTypes;
167161
this.primaryKeys = primaryKeys;
168162
this.removeRecordOnDelete = options.aggregationRemoveRecordOnDelete();
169163
}
170164

171165
@Override
172166
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
173-
List<String> fieldNames = tableNames;
174-
List<DataType> fieldTypes = tableTypes;
167+
List<String> fieldNames = this.fieldNames;
168+
List<DataType> fieldTypes = this.fieldTypes;
175169
if (projection != null) {
176170
Projection project = Projection.of(projection);
177-
fieldNames = project.project(tableNames);
178-
fieldTypes = project.project(tableTypes);
171+
fieldNames = project.project(fieldNames);
172+
fieldTypes = project.project(fieldTypes);
179173
}
180174

181175
FieldAggregator[] fieldAggregators = new FieldAggregator[fieldNames.size()];

paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.mergetree;
2020

21+
import org.apache.paimon.CoreOptions;
2122
import org.apache.paimon.KeyValue;
2223
import org.apache.paimon.codegen.RecordComparator;
2324
import org.apache.paimon.compression.CompressOptions;
@@ -33,10 +34,7 @@
3334
import org.apache.paimon.options.MemorySize;
3435
import org.apache.paimon.options.Options;
3536
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
36-
import org.apache.paimon.types.BigIntType;
37-
import org.apache.paimon.types.DataField;
3837
import org.apache.paimon.types.DataTypes;
39-
import org.apache.paimon.types.IntType;
4038
import org.apache.paimon.types.RowType;
4139
import org.apache.paimon.utils.ReusingKeyValue;
4240
import org.apache.paimon.utils.ReusingTestData;
@@ -47,10 +45,12 @@
4745

4846
import java.io.EOFException;
4947
import java.io.IOException;
48+
import java.util.Arrays;
5049
import java.util.Collections;
5150
import java.util.LinkedList;
5251
import java.util.List;
5352
import java.util.Queue;
53+
import java.util.concurrent.ThreadLocalRandom;
5454

5555
import static org.apache.paimon.utils.Preconditions.checkState;
5656
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,9 +63,11 @@ public abstract class SortBufferWriteBufferTestBase {
6363

6464
protected final SortBufferWriteBuffer table =
6565
new SortBufferWriteBuffer(
66-
new RowType(Collections.singletonList(new DataField(0, "key", new IntType()))),
67-
new RowType(
68-
Collections.singletonList(new DataField(1, "value", new BigIntType()))),
66+
RowType.builder().field("key_f0", DataTypes.INT()).build(),
67+
RowType.builder()
68+
.field("f0", DataTypes.INT())
69+
.field("f1", DataTypes.BIGINT())
70+
.build(),
6971
null,
7072
new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
7173
false,
@@ -165,74 +167,105 @@ protected MergeFunction<KeyValue> createMergeFunction() {
165167
/** Test for {@link SortBufferWriteBuffer} with {@link PartialUpdateMergeFunction}. */
166168
public static class WithPartialUpdateMergeFunctionTest extends SortBufferWriteBufferTestBase {
167169

170+
private final boolean addOnly;
171+
172+
private WithPartialUpdateMergeFunctionTest() {
173+
this.addOnly = ThreadLocalRandom.current().nextBoolean();
174+
}
175+
168176
@Override
169177
protected boolean addOnly() {
170-
return true;
178+
return addOnly;
171179
}
172180

173181
@Override
174182
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
175-
return MergeFunctionTestUtils.getExpectedForPartialUpdate(input);
183+
return MergeFunctionTestUtils.getExpectedForPartialUpdate(input, addOnly);
176184
}
177185

178186
@Override
179187
protected MergeFunction<KeyValue> createMergeFunction() {
180188
Options options = new Options();
189+
options.set(CoreOptions.IGNORE_DELETE, !addOnly);
181190
return PartialUpdateMergeFunction.factory(
182-
options, RowType.of(DataTypes.BIGINT()), ImmutableList.of("f0"))
191+
options,
192+
RowType.of(DataTypes.INT().notNull(), DataTypes.BIGINT()),
193+
ImmutableList.of("f0"))
183194
.create();
184195
}
185196
}
186197

187198
/** Test for {@link SortBufferWriteBuffer} with {@link AggregateMergeFunction}. */
188199
public static class WithAggMergeFunctionTest extends SortBufferWriteBufferTestBase {
189200

201+
private final boolean addOnly;
202+
private final boolean removeRecordOnDelete;
203+
204+
private WithAggMergeFunctionTest() {
205+
ThreadLocalRandom random = ThreadLocalRandom.current();
206+
this.addOnly = random.nextBoolean();
207+
this.removeRecordOnDelete = !addOnly && random.nextBoolean();
208+
}
209+
190210
@Override
191211
protected boolean addOnly() {
192-
return false;
212+
return addOnly;
193213
}
194214

195215
@Override
196216
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
197-
return MergeFunctionTestUtils.getExpectedForAggSum(input);
217+
return MergeFunctionTestUtils.getExpectedForAggSum(
218+
input, addOnly, removeRecordOnDelete);
198219
}
199220

200221
@Override
201222
protected MergeFunction<KeyValue> createMergeFunction() {
202223
Options options = new Options();
203-
options.set("fields.value.aggregate-function", "sum");
224+
options.set("fields.f1.aggregate-function", "sum");
225+
options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, removeRecordOnDelete);
204226
return AggregateMergeFunction.factory(
205227
options,
206-
Collections.singletonList("value"),
207-
Collections.singletonList(DataTypes.BIGINT()),
208-
Collections.emptyList())
228+
Arrays.asList("f0", "f1"),
229+
Arrays.asList(DataTypes.INT().notNull(), DataTypes.BIGINT()),
230+
Collections.singletonList("f0"))
209231
.create();
210232
}
211233
}
212234

213235
/** Test for {@link SortBufferWriteBuffer} with {@link LookupMergeFunction}. */
214236
public static class WithLookupFunctionTest extends SortBufferWriteBufferTestBase {
215237

238+
private final boolean addOnly;
239+
private final boolean removeRecordOnDelete;
240+
241+
private WithLookupFunctionTest() {
242+
ThreadLocalRandom random = ThreadLocalRandom.current();
243+
this.addOnly = random.nextBoolean();
244+
this.removeRecordOnDelete = !addOnly && random.nextBoolean();
245+
}
246+
216247
@Override
217248
protected boolean addOnly() {
218-
return false;
249+
return addOnly;
219250
}
220251

221252
@Override
222253
protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
223-
return MergeFunctionTestUtils.getExpectedForAggSum(input);
254+
return MergeFunctionTestUtils.getExpectedForAggSum(
255+
input, addOnly, removeRecordOnDelete);
224256
}
225257

226258
@Override
227259
protected MergeFunction<KeyValue> createMergeFunction() {
228260
Options options = new Options();
229-
options.set("fields.value.aggregate-function", "sum");
261+
options.set("fields.f1.aggregate-function", "sum");
262+
options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, removeRecordOnDelete);
230263
MergeFunctionFactory<KeyValue> aggMergeFunction =
231264
AggregateMergeFunction.factory(
232265
options,
233-
Collections.singletonList("value"),
234-
Collections.singletonList(DataTypes.BIGINT()),
235-
Collections.emptyList());
266+
Arrays.asList("f0", "f1"),
267+
Arrays.asList(DataTypes.INT().notNull(), DataTypes.BIGINT()),
268+
Collections.singletonList("f0"));
236269
return LookupMergeFunction.wrap(aggMergeFunction).create();
237270
}
238271
}

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public static List<ReusingTestData> getExpectedForDeduplicate(List<ReusingTestDa
4646
return expected;
4747
}
4848

49-
public static List<ReusingTestData> getExpectedForPartialUpdate(List<ReusingTestData> input) {
49+
public static List<ReusingTestData> getExpectedForPartialUpdate(
50+
List<ReusingTestData> input, boolean addOnly) {
5051
input = new ArrayList<>(input);
5152
Collections.sort(input);
5253

@@ -60,17 +61,31 @@ public static List<ReusingTestData> getExpectedForPartialUpdate(List<ReusingTest
6061
if (group.size() == 1) {
6162
// due to ReducerMergeFunctionWrapper
6263
expected.add(group.get(0));
64+
} else if (addOnly) {
65+
// get the final value
66+
expected.add(group.get(group.size() - 1));
6367
} else {
64-
group.stream()
65-
.filter(d -> d.valueKind.isAdd())
66-
.reduce((first, second) -> second)
67-
.ifPresent(expected::add);
68+
if (group.stream().noneMatch(data -> data.valueKind == RowKind.INSERT)) {
69+
// No insert: fill the pk and left nullable fields to null; sequenceNumber =
70+
// latest
71+
ReusingTestData last = group.get(group.size() - 1);
72+
expected.add(
73+
new ReusingTestData(
74+
last.key, last.sequenceNumber, RowKind.DELETE, null));
75+
} else {
76+
// get the last INSERT data because later DELETE data are ignored
77+
group.stream()
78+
.filter(d -> d.valueKind.isAdd())
79+
.reduce((first, second) -> second)
80+
.ifPresent(expected::add);
81+
}
6882
}
6983
}
7084
return expected;
7185
}
7286

73-
public static List<ReusingTestData> getExpectedForAggSum(List<ReusingTestData> input) {
87+
public static List<ReusingTestData> getExpectedForAggSum(
88+
List<ReusingTestData> input, boolean addOnly, boolean removeRecordOndelete) {
7489
input = new ArrayList<>(input);
7590
Collections.sort(input);
7691

@@ -84,14 +99,37 @@ public static List<ReusingTestData> getExpectedForAggSum(List<ReusingTestData> i
8499
if (group.size() == 1) {
85100
// due to ReducerMergeFunctionWrapper
86101
expected.add(group.get(0));
87-
} else {
102+
} else if (addOnly || !removeRecordOndelete) {
88103
long sum =
89104
group.stream()
90105
.mapToLong(d -> d.valueKind.isAdd() ? d.value : -d.value)
91106
.sum();
92107
ReusingTestData last = group.get(group.size() - 1);
93108
expected.add(
94109
new ReusingTestData(last.key, last.sequenceNumber, RowKind.INSERT, sum));
110+
} else {
111+
if (group.stream().noneMatch(data -> data.valueKind == RowKind.INSERT)) {
112+
// No insert: fill the pk and left nullable fields to null; sequenceNumber =
113+
// latest
114+
ReusingTestData last = group.get(group.size() - 1);
115+
expected.add(
116+
new ReusingTestData(
117+
last.key, last.sequenceNumber, RowKind.DELETE, null));
118+
} else {
119+
RowKind rowKind = null;
120+
Long sum = null;
121+
for (ReusingTestData data : group) {
122+
if (data.valueKind == RowKind.INSERT) {
123+
rowKind = RowKind.INSERT;
124+
sum = sum == null ? data.value : sum + data.value;
125+
} else {
126+
rowKind = RowKind.DELETE;
127+
sum = null;
128+
}
129+
}
130+
ReusingTestData last = group.get(group.size() - 1);
131+
expected.add(new ReusingTestData(last.key, last.sequenceNumber, rowKind, sum));
132+
}
95133
}
96134
}
97135
return expected;

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,11 +1298,11 @@ public void testAggregationRemoveRecordOnDelete() throws Exception {
12981298
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
12991299
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
13001300

1301-
// 2. Update Before
1301+
// 2. Update Before: retract
13021302
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
13031303
commit.commit(1, write.prepareCommit(true, 1));
13041304
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1305-
assertThat(result).isEmpty();
1305+
assertThat(result).containsExactly("+I[1, 1, NULL, NULL]");
13061306

13071307
// 3. Update After
13081308
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));

paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class ValueContentRowDataRecordIteratorTest extends RowDataRecordIterator
3535
public void testIterator() throws Exception {
3636
List<ReusingTestData> input =
3737
ReusingTestData.parse("1, 1, +, 100 | 2, 2, +, 200 | 1, 3, -, 100 | 2, 4, +, 300");
38+
// to check ReusingTestData.key
39+
List<Integer> expectedKeys = Arrays.asList(1, 2, 1, 2);
3840
List<Long> expectedValues = Arrays.asList(100L, 200L, 100L, 300L);
3941
List<RowKind> expectedRowKinds =
4042
Arrays.asList(RowKind.INSERT, RowKind.INSERT, RowKind.DELETE, RowKind.INSERT);
@@ -43,8 +45,9 @@ public void testIterator() throws Exception {
4345
input,
4446
ValueContentRowDataRecordIterator::new,
4547
(rowData, idx) -> {
46-
assertThat(rowData.getFieldCount()).isEqualTo(1);
47-
assertThat(rowData.getLong(0)).isEqualTo(expectedValues.get(idx));
48+
assertThat(rowData.getFieldCount()).isEqualTo(2);
49+
assertThat(rowData.getInt(0)).isEqualTo(expectedKeys.get(idx));
50+
assertThat(rowData.getLong(1)).isEqualTo(expectedValues.get(idx));
4851
assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
4952
});
5053
}

0 commit comments

Comments
 (0)