Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>aggregation.remove-record-on-delete</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to remove the whole row in aggregation engine when -D records are received.</td>
</tr>
<tr>
<td><h5>async-file-write</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ public class CoreOptions implements Serializable {
.defaultValue(SortOrder.ASCENDING)
.withDescription("Specify the order of sequence.field.");

@Immutable
public static final ConfigOption<Boolean> AGGREGATION_REMOVE_RECORD_ON_DELETE =
key("aggregation.remove-record-on-delete")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to remove the whole row in aggregation engine when -D records are received.");

@Immutable
public static final ConfigOption<Boolean> PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
key("partial-update.remove-record-on-delete")
Expand Down Expand Up @@ -2618,6 +2626,10 @@ public boolean dataFileThinMode() {
return options.get(DATA_FILE_THIN_MODE);
}

public boolean aggregationRemoveRecordOnDelete() {
return options.get(AGGREGATION_REMOVE_RECORD_ON_DELETE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.paimon.CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

Expand All @@ -53,25 +54,42 @@ public class AggregateMergeFunction implements MergeFunction<KeyValue> {
private KeyValue latestKv;
private GenericRow row;
private KeyValue reused;
private boolean currentDeleteRow;
private final boolean removeRecordOnDelete;

public AggregateMergeFunction(
InternalRow.FieldGetter[] getters, FieldAggregator[] aggregators) {
this(getters, aggregators, false);
}

public AggregateMergeFunction(
InternalRow.FieldGetter[] getters,
FieldAggregator[] aggregators,
boolean removeRecordOnDelete) {
this.getters = getters;
this.aggregators = aggregators;
this.removeRecordOnDelete = removeRecordOnDelete;
}

@Override
public void reset() {
this.latestKv = null;
this.row = new GenericRow(getters.length);
Arrays.stream(aggregators).forEach(FieldAggregator::reset);
this.currentDeleteRow = false;
}

@Override
public void add(KeyValue kv) {
latestKv = kv;
boolean isRetract =
kv.valueKind() != RowKind.INSERT && kv.valueKind() != RowKind.UPDATE_AFTER;

currentDeleteRow = removeRecordOnDelete && isRetract;
if (currentDeleteRow) {
return;
}

for (int i = 0; i < getters.length; i++) {
FieldAggregator fieldAggregator = aggregators[i];
Object accumulator = getters[i].getFieldOrNull(row);
Expand All @@ -93,7 +111,8 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
return reused.replace(latestKv.key(), latestKv.sequenceNumber(), RowKind.INSERT, row);
RowKind rowKind = currentDeleteRow ? RowKind.DELETE : RowKind.INSERT;
return reused.replace(latestKv.key(), latestKv.sequenceNumber(), rowKind, row);
}

@Override
Expand All @@ -117,6 +136,7 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {
private final List<String> tableNames;
private final List<DataType> tableTypes;
private final List<String> primaryKeys;
private final boolean removeRecordOnDelete;

private Factory(
Options conf,
Expand All @@ -127,6 +147,7 @@ private Factory(
this.tableNames = tableNames;
this.tableTypes = tableTypes;
this.primaryKeys = primaryKeys;
this.removeRecordOnDelete = conf.get(AGGREGATION_REMOVE_RECORD_ON_DELETE);
}

@Override
Expand All @@ -150,7 +171,8 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
FieldAggregatorFactory.create(fieldType, fieldName, aggFuncName, options);
}

return new AggregateMergeFunction(createFieldGetters(fieldTypes), fieldAggregators);
return new AggregateMergeFunction(
createFieldGetters(fieldTypes), fieldAggregators, removeRecordOnDelete);
}

private String getAggFuncName(String fieldName, List<String> sequenceFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;

import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;

/** Factory for {@link FieldAggregator}. */
public interface FieldAggregatorFactory extends Factory {
Expand All @@ -46,10 +50,17 @@ static FieldAggregator create(
aggFuncName));
}

boolean removeRecordOnRetract = options.aggregationRemoveRecordOnDelete();
boolean fieldIgnoreRetract = options.fieldAggIgnoreRetract(fieldName);
Preconditions.checkState(
!(removeRecordOnRetract && fieldIgnoreRetract),
String.format(
"%s and %s have conflicting behavior so should not be enabled at the same time.",
CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE,
FIELDS_PREFIX + "." + fieldName + "." + IGNORE_RETRACT));

FieldAggregator fieldAggregator =
fieldAggregatorFactory.create(fieldType, options, fieldName);
return options.fieldAggIgnoreRetract(fieldName)
? new FieldIgnoreRetractAgg(fieldAggregator)
: fieldAggregator;
return fieldIgnoreRetract ? new FieldIgnoreRetractAgg(fieldAggregator) : fieldAggregator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,63 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}

@Test
public void testAggregationRemoveRecordOnDelete() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
},
new String[] {"pt", "a", "b", "c"});
FileStoreTable table =
createFileStoreTable(
options -> {
options.set("merge-engine", "aggregation");
options.set("aggregation.remove-record-on-delete", "true");
},
rowType);
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotReader snapshotReader = table.newSnapshotReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
// 1. Inserts
write.write(GenericRow.of(1, 1, 3, 3));
write.write(GenericRow.of(1, 1, 1, 1));
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(0, write.prepareCommit(true, 0));
List<String> result =
getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

// 2. Update Before
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
commit.commit(2, write.prepareCommit(true, 2));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");

// 4. Retracts
write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
commit.commit(3, write.prepareCommit(true, 3));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).isEmpty();

// 5. Inserts
write.write(GenericRow.of(1, 1, 2, 2));
commit.commit(4, write.prepareCommit(true, 4));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");

write.close();
commit.close();
}

@Test
public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
RowType rowType =
Expand Down
Loading