Skip to content

Commit e7a9d6e

Browse files
JackeyLee007danzhewuju
authored andcommitted
[core] Support remove-record-on-delete for aggregation merge-engine (apache#5360)
1 parent 82efdb1 commit e7a9d6e

File tree

5 files changed

+113
-5
lines changed

5 files changed

+113
-5
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
</tr>
2727
</thead>
2828
<tbody>
29+
<tr>
30+
<td><h5>aggregation.remove-record-on-delete</h5></td>
31+
<td style="word-wrap: break-word;">false</td>
32+
<td>Boolean</td>
33+
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
34+
</tr>
2935
<tr>
3036
<td><h5>async-file-write</h5></td>
3137
<td style="word-wrap: break-word;">true</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,14 @@ public class CoreOptions implements Serializable {
665665
.defaultValue(SortOrder.ASCENDING)
666666
.withDescription("Specify the order of sequence.field.");
667667

668+
@Immutable
669+
public static final ConfigOption<Boolean> AGGREGATION_REMOVE_RECORD_ON_DELETE =
670+
key("aggregation.remove-record-on-delete")
671+
.booleanType()
672+
.defaultValue(false)
673+
.withDescription(
674+
"Whether to remove the whole row in aggregation engine when -D records are received.");
675+
668676
@Immutable
669677
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
670678
key("partial-update.remove-record-on-delete")
@@ -2618,6 +2626,10 @@ public boolean dataFileThinMode() {
26182626
return options.get(DATA_FILE_THIN_MODE);
26192627
}
26202628

2629+
public boolean aggregationRemoveRecordOnDelete() {
2630+
return options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE);
2631+
}
2632+
26212633
/** Specifies the merge engine for table with primary key. */
26222634
public enum MergeEngine implements DescribedEnum {
26232635
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Arrays;
3939
import java.util.List;
4040

41+
import static org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
4142
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
4243
import static org.apache.paimon.utils.Preconditions.checkNotNull;
4344

@@ -53,25 +54,42 @@ public class AggregateMergeFunction implements MergeFunction<KeyValue> {
5354
private KeyValue latestKv;
5455
private GenericRow row;
5556
private KeyValue reused;
57+
private boolean currentDeleteRow;
58+
private final boolean removeRecordOnDelete;
5659

5760
public AggregateMergeFunction(
5861
InternalRow.FieldGetter[] getters, FieldAggregator[] aggregators) {
62+
this(getters, aggregators, false);
63+
}
64+
65+
public AggregateMergeFunction(
66+
InternalRow.FieldGetter[] getters,
67+
FieldAggregator[] aggregators,
68+
boolean removeRecordOnDelete) {
5969
this.getters = getters;
6070
this.aggregators = aggregators;
71+
this.removeRecordOnDelete = removeRecordOnDelete;
6172
}
6273

6374
@Override
6475
public void reset() {
6576
this.latestKv = null;
6677
this.row = new GenericRow(getters.length);
6778
Arrays.stream(aggregators).forEach(FieldAggregator::reset);
79+
this.currentDeleteRow = false;
6880
}
6981

7082
@Override
7183
public void add(KeyValue kv) {
7284
latestKv = kv;
7385
boolean isRetract =
7486
kv.valueKind() != RowKind.INSERT && kv.valueKind() != RowKind.UPDATE_AFTER;
87+
88+
currentDeleteRow = removeRecordOnDelete && isRetract;
89+
if (currentDeleteRow) {
90+
return;
91+
}
92+
7593
for (int i = 0; i < getters.length; i++) {
7694
FieldAggregator fieldAggregator = aggregators[i];
7795
Object accumulator = getters[i].getFieldOrNull(row);
@@ -93,7 +111,8 @@ public KeyValue getResult() {
93111
if (reused == null) {
94112
reused = new KeyValue();
95113
}
96-
return reused.replace(latestKv.key(), latestKv.sequenceNumber(), RowKind.INSERT, row);
114+
RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;
115+
return reused.replace(latestKv.key(), latestKv.sequenceNumber(), rowKind, row);
97116
}
98117

99118
@Override
@@ -117,6 +136,7 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
117136
private final List<String> tableNames;
118137
private final List<DataType> tableTypes;
119138
private final List<String> primaryKeys;
139+
private final boolean removeRecordOnDelete;
120140

121141
private Factory(
122142
Options conf,
@@ -127,6 +147,7 @@ private Factory(
127147
this.tableNames = tableNames;
128148
this.tableTypes = tableTypes;
129149
this.primaryKeys = primaryKeys;
150+
this.removeRecordOnDelete = conf.get(AGGREGATION_REMOVE_RECORD_ON_DELETE);
130151
}
131152

132153
@Override
@@ -150,7 +171,8 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
150171
FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options);
151172
}
152173

153-
return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators);
174+
return new AggregateMergeFunction(
175+
createFieldGetters(fieldTypes), fieldAggregators, removeRecordOnDelete);
154176
}
155177

156178
private String getAggFuncName(String fieldName, List<String> sequenceFields) {

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
2525
import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg;
2626
import org.apache.paimon.types.DataType;
27+
import org.apache.paimon.utils.Preconditions;
28+
29+
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
30+
import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
2731

2832
/** Factory for {@link FieldAggregator}. */
2933
public interface FieldAggregatorFactory extends Factory {
@@ -46,10 +50,17 @@ static FieldAggregator create(
4650
aggFuncName));
4751
}
4852

53+
boolean removeRecordOnRetract = options.aggregationRemoveRecordOnDelete();
54+
boolean fieldIgnoreRetract = options.fieldAggIgnoreRetract(fieldName);
55+
Preconditions.checkState(
56+
!(removeRecordOnRetract && fieldIgnoreRetract),
57+
String.format(
58+
"%s and %s have conflicting behavior so should not be enabled at the same time.",
59+
CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE,
60+
FIELDS_PREFIX + "." + fieldName + "." + IGNORE_RETRACT));
61+
4962
FieldAggregator fieldAggregator =
5063
fieldAggregatorFactory.create(fieldType, options, fieldName);
51-
return options.fieldAggIgnoreRetract(fieldName)
52-
? new FieldIgnoreRetractAgg(fieldAggregator)
53-
: fieldAggregator;
64+
return fieldIgnoreRetract ? new FieldIgnoreRetractAgg(fieldAggregator) : fieldAggregator;
5465
}
5566
}

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,63 @@ public void testAuditLog() throws Exception {
12691269
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
12701270
}
12711271

1272+
@Test
1273+
public void testAggregationRemoveRecordOnDelete() throws Exception {
1274+
RowType rowType =
1275+
RowType.of(
1276+
new DataType[] {
1277+
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
1278+
},
1279+
new String[] {"pt", "a", "b", "c"});
1280+
FileStoreTable table =
1281+
createFileStoreTable(
1282+
options -> {
1283+
options.set("merge-engine", "aggregation");
1284+
options.set("aggregation.remove-record-on-delete", "true");
1285+
},
1286+
rowType);
1287+
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
1288+
SnapshotReader snapshotReader = table.newSnapshotReader();
1289+
TableRead read = table.newRead();
1290+
StreamTableWrite write = table.newWrite("");
1291+
StreamTableCommit commit = table.newCommit("");
1292+
// 1. Inserts
1293+
write.write(GenericRow.of(1, 1, 3, 3));
1294+
write.write(GenericRow.of(1, 1, 1, 1));
1295+
write.write(GenericRow.of(1, 1, 2, 2));
1296+
commit.commit(0, write.prepareCommit(true, 0));
1297+
List<String> result =
1298+
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1299+
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
1300+
1301+
// 2. Update Before
1302+
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
1303+
commit.commit(1, write.prepareCommit(true, 1));
1304+
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1305+
assertThat(result).isEmpty();
1306+
1307+
// 3. Update After
1308+
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
1309+
commit.commit(2, write.prepareCommit(true, 2));
1310+
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1311+
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
1312+
1313+
// 4. Retracts
1314+
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
1315+
commit.commit(3, write.prepareCommit(true, 3));
1316+
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1317+
assertThat(result).isEmpty();
1318+
1319+
// 5. Inserts
1320+
write.write(GenericRow.of(1, 1, 2, 2));
1321+
commit.commit(4, write.prepareCommit(true, 4));
1322+
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
1323+
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
1324+
1325+
write.close();
1326+
commit.close();
1327+
}
1328+
12721329
@Test
12731330
public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
12741331
RowType rowType =

0 commit comments

Comments
 (0)