Skip to content

Commit 23a220d

Browse files
authored
[core] Support ignoring specified fields while generating -U, +U changelog for the same record (#4126)
1 parent cef210d commit 23a220d

File tree

7 files changed

+188
-5
lines changed

7 files changed

+188
-5
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@
7474
<td>Boolean</td>
7575
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
7676
</tr>
77+
<tr>
78+
<td><h5>changelog-producer.row-deduplicate-ignore-fields</h5></td>
79+
<td style="word-wrap: break-word;">(none)</td>
80+
<td>String</td>
81+
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
82+
</tr>
7783
<tr>
7884
<td><h5>changelog.num-retained.max</h5></td>
7985
<td style="word-wrap: break-word;">(none)</td>

paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType], fields: Array[Int]) {
4040
val ctx = new CodeGeneratorContext
4141
val className = newName(name)
4242

43+
val containsIgnoreFields = fieldTypes.length > fields.length
4344
val equalsMethodCodes = for (idx <- fields) yield generateEqualsMethod(ctx, idx)
4445
val equalsMethodCalls = for (idx <- fields) yield {
4546
val methodName = getEqualsMethodName(idx)
@@ -57,7 +58,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType], fields: Array[Int]) {
5758

5859
@Override
5960
public boolean equals($ROW_DATA $LEFT_INPUT, $ROW_DATA $RIGHT_INPUT) {
60-
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW) {
61+
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW && !$containsIgnoreFields) {
6162
return $LEFT_INPUT.equals($RIGHT_INPUT);
6263
}
6364

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,13 @@ public class CoreOptions implements Serializable {
566566
.withDescription(
567567
"Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.");
568568

569+
public static final ConfigOption<String> CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS =
570+
key("changelog-producer.row-deduplicate-ignore-fields")
571+
.stringType()
572+
.noDefaultValue()
573+
.withDescription(
574+
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");
575+
569576
@Immutable
570577
public static final ConfigOption<String> SEQUENCE_FIELD =
571578
key("sequence.field")
@@ -1786,6 +1793,12 @@ public boolean changelogRowDeduplicate() {
17861793
return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
17871794
}
17881795

1796+
public List<String> changelogRowDeduplicateIgnoreFields() {
1797+
return options.getOptional(CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS)
1798+
.map(s -> Arrays.asList(s.split(",")))
1799+
.orElse(Collections.emptyList());
1800+
}
1801+
17891802
public boolean scanPlanSortPartition() {
17901803
return options.get(SCAN_PLAN_SORT_PARTITION);
17911804
}

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
6767
private final RowType valueType;
6868
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
6969
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
70-
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
70+
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
7171
private final MergeFunctionFactory<KeyValue> mfFactory;
7272
private final String tableName;
7373

@@ -93,7 +93,11 @@ public KeyValueFileStore(
9393
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
9494
this.mfFactory = mfFactory;
9595
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
96-
this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
96+
List<String> ignoreFields = options.changelogRowDeduplicateIgnoreFields();
97+
this.logDedupEqualSupplier =
98+
options.changelogRowDeduplicate() && !ignoreFields.isEmpty()
99+
? ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields)
100+
: new ValueEqualiserSupplier(valueType);
97101
this.tableName = tableName;
98102
}
99103

@@ -174,7 +178,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
174178
valueType,
175179
keyComparatorSupplier,
176180
() -> UserDefinedSeqComparator.create(valueType, options),
177-
valueEqualiserSupplier,
181+
logDedupEqualSupplier,
178182
mfFactory,
179183
pathFactory(),
180184
format2PathFactory(),

paimon-core/src/main/java/org/apache/paimon/utils/ValueEqualiserSupplier.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import org.apache.paimon.types.DataType;
2323
import org.apache.paimon.types.RowType;
2424

25+
import javax.annotation.Nullable;
26+
2527
import java.util.List;
2628
import java.util.function.Supplier;
29+
import java.util.stream.IntStream;
2730

2831
import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
2932

@@ -34,12 +37,36 @@ public class ValueEqualiserSupplier implements SerializableSupplier<RecordEquali
3437

3538
private final List<DataType> fieldTypes;
3639

40+
private final int[] projection;
41+
3742
public ValueEqualiserSupplier(RowType keyType) {
3843
this.fieldTypes = keyType.getFieldTypes();
44+
this.projection = null;
45+
}
46+
47+
public ValueEqualiserSupplier(RowType keyType, int[] projection) {
48+
this.fieldTypes = keyType.getFieldTypes();
49+
this.projection = projection;
3950
}
4051

4152
@Override
4253
public RecordEqualiser get() {
43-
return newRecordEqualiser(fieldTypes);
54+
return this.projection == null
55+
? newRecordEqualiser(fieldTypes)
56+
: newRecordEqualiser(fieldTypes, projection);
57+
}
58+
59+
public static ValueEqualiserSupplier fromIgnoreFields(
60+
RowType rowType, @Nullable List<String> ignoreFields) {
61+
int[] projection = getProjectionWithIgnoreFields(rowType, ignoreFields);
62+
return new ValueEqualiserSupplier(rowType, projection);
63+
}
64+
65+
private static int[] getProjectionWithIgnoreFields(RowType rowType, List<String> ignoreFields) {
66+
List<String> fieldNames = rowType.getFieldNames();
67+
IntStream projectionStream = IntStream.range(0, rowType.getFieldCount());
68+
return projectionStream
69+
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
70+
.toArray();
4471
}
4572
}

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
import org.apache.paimon.KeyValue;
2222
import org.apache.paimon.codegen.RecordEqualiser;
23+
import org.apache.paimon.types.DataType;
24+
import org.apache.paimon.types.DataTypes;
2325
import org.apache.paimon.types.RowKind;
26+
import org.apache.paimon.types.RowType;
27+
import org.apache.paimon.utils.ValueEqualiserSupplier;
2428

2529
import org.junit.jupiter.api.BeforeEach;
2630
import org.junit.jupiter.api.Test;
@@ -31,6 +35,7 @@
3135
import java.util.List;
3236

3337
import static org.apache.paimon.io.DataFileTestUtils.row;
38+
import static org.assertj.core.api.Assertions.assertThat;
3439

3540
/** Tests for {@link FullChangelogMergeFunctionWrapper}. */
3641
public abstract class FullChangelogMergeFunctionWrapperTestBase {
@@ -214,5 +219,63 @@ public static class WithChangelogRowDeduplicateMergeFunctionTest
214219
protected boolean changelogRowDeduplicate() {
215220
return true;
216221
}
222+
223+
@Test
224+
public void testFullChangelogMergeFunctionWrapperWithIgnoreFields() {
225+
RowType valueType =
226+
RowType.builder()
227+
.fields(
228+
new DataType[] {DataTypes.INT(), DataTypes.INT()},
229+
new String[] {"f0", "f1"})
230+
.build();
231+
List<String> ignoreFields = Collections.singletonList("f1");
232+
ValueEqualiserSupplier logDedupEqualSupplier =
233+
ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields);
234+
FullChangelogMergeFunctionWrapper function =
235+
new FullChangelogMergeFunctionWrapper(
236+
createMergeFunction(), MAX_LEVEL, logDedupEqualSupplier.get(), true);
237+
238+
// With level-0 'insert' record, with max level same record. Notice that the specified
239+
// ignored
240+
// fields in records are different.
241+
function.reset();
242+
function.add(
243+
new KeyValue()
244+
.replace(row(1), 1, RowKind.INSERT, row(1, 1))
245+
.setLevel(MAX_LEVEL));
246+
function.add(new KeyValue().replace(row(1), 2, RowKind.INSERT, row(1, 2)).setLevel(0));
247+
ChangelogResult result = function.getResult();
248+
assertThat(result).isNotNull();
249+
List<KeyValue> changelogs = result.changelogs();
250+
assertThat(changelogs).isEmpty();
251+
KeyValue kv = result.result();
252+
assertThat(kv).isNotNull();
253+
assertThat(kv.valueKind()).isEqualTo(RowKind.INSERT);
254+
assertThat(kv.value().getInt(0)).isEqualTo(1);
255+
assertThat(kv.value().getInt(1)).isEqualTo(2);
256+
257+
// With level-0 'insert' record, with max level different record.
258+
function.reset();
259+
function.add(
260+
new KeyValue()
261+
.replace(row(1), 1, RowKind.INSERT, row(1, 1))
262+
.setLevel(MAX_LEVEL));
263+
function.add(new KeyValue().replace(row(1), 2, RowKind.INSERT, row(2, 2)).setLevel(0));
264+
result = function.getResult();
265+
assertThat(result).isNotNull();
266+
changelogs = result.changelogs();
267+
assertThat(changelogs).hasSize(2);
268+
assertThat(changelogs.get(0).valueKind()).isEqualTo(RowKind.UPDATE_BEFORE);
269+
assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
270+
assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(1);
271+
assertThat(changelogs.get(1).valueKind()).isEqualTo(RowKind.UPDATE_AFTER);
272+
assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
273+
assertThat(changelogs.get(1).value().getInt(1)).isEqualTo(2);
274+
kv = result.result();
275+
assertThat(kv).isNotNull();
276+
assertThat(kv.valueKind()).isEqualTo(RowKind.INSERT);
277+
assertThat(kv.value().getInt(0)).isEqualTo(2);
278+
assertThat(kv.value().getInt(1)).isEqualTo(2);
279+
}
217280
}
218281
}

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
3030
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
3131
import org.apache.paimon.types.DataField;
32+
import org.apache.paimon.types.DataType;
3233
import org.apache.paimon.types.DataTypes;
3334
import org.apache.paimon.types.IntType;
3435
import org.apache.paimon.types.RowType;
3536
import org.apache.paimon.utils.UserDefinedSeqComparator;
37+
import org.apache.paimon.utils.ValueEqualiserSupplier;
3638

3739
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
3840

@@ -41,6 +43,7 @@
4143
import org.junit.jupiter.params.ParameterizedTest;
4244
import org.junit.jupiter.params.provider.ValueSource;
4345

46+
import java.util.Collections;
4447
import java.util.HashMap;
4548
import java.util.HashSet;
4649
import java.util.List;
@@ -214,6 +217,72 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
214217
assertThat(kv.value().getInt(0)).isEqualTo(2);
215218
}
216219

220+
@Test
221+
public void testDeduplicateWithIgnoreFields() {
222+
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
223+
RowType valueType =
224+
RowType.builder()
225+
.fields(
226+
new DataType[] {DataTypes.INT(), DataTypes.INT()},
227+
new String[] {"f0", "f1"})
228+
.build();
229+
UserDefinedSeqComparator userDefinedSeqComparator =
230+
UserDefinedSeqComparator.create(
231+
valueType, CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f1")));
232+
assert userDefinedSeqComparator != null;
233+
List<String> ignoreFields = Collections.singletonList("f1");
234+
ValueEqualiserSupplier logDedupEqualSupplier =
235+
ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields);
236+
LookupChangelogMergeFunctionWrapper function =
237+
new LookupChangelogMergeFunctionWrapper(
238+
LookupMergeFunction.wrap(
239+
DeduplicateMergeFunction.factory(),
240+
RowType.of(DataTypes.INT()),
241+
valueType),
242+
highLevel::get,
243+
logDedupEqualSupplier.get(),
244+
true,
245+
LookupStrategy.from(false, true, false, false),
246+
null,
247+
userDefinedSeqComparator);
248+
249+
// With level-0 'insert' record, with level-x (x > 0) same record. Notice that the specified
250+
// ignored
251+
// fields in records are different.
252+
function.reset();
253+
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(2));
254+
function.add(new KeyValue().replace(row(1), 2, INSERT, row(1, 2)).setLevel(0));
255+
ChangelogResult result = function.getResult();
256+
assertThat(result).isNotNull();
257+
List<KeyValue> changelogs = result.changelogs();
258+
assertThat(changelogs).isEmpty();
259+
KeyValue kv = result.result();
260+
assertThat(kv).isNotNull();
261+
assertThat(kv.valueKind()).isEqualTo(INSERT);
262+
assertThat(kv.value().getInt(0)).isEqualTo(1);
263+
assertThat(kv.value().getInt(1)).isEqualTo(2);
264+
265+
// With level-0 'insert' record, with level-x (x > 0) different record.
266+
function.reset();
267+
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(1));
268+
function.add(new KeyValue().replace(row(1), 2, INSERT, row(2, 2)).setLevel(0));
269+
result = function.getResult();
270+
assertThat(result).isNotNull();
271+
changelogs = result.changelogs();
272+
assertThat(changelogs).hasSize(2);
273+
assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
274+
assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
275+
assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(1);
276+
assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
277+
assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
278+
assertThat(changelogs.get(1).value().getInt(1)).isEqualTo(2);
279+
kv = result.result();
280+
assertThat(kv).isNotNull();
281+
assertThat(kv.valueKind()).isEqualTo(INSERT);
282+
assertThat(kv.value().getInt(0)).isEqualTo(2);
283+
assertThat(kv.value().getInt(1)).isEqualTo(2);
284+
}
285+
217286
@ParameterizedTest
218287
@ValueSource(booleans = {false, true})
219288
public void testSum(boolean changelogRowDeduplicate) {

0 commit comments

Comments
 (0)