Skip to content

Commit a2892b1

Browse files
committed
[core] Refactor logDedupEqualSupplier creation and names
1 parent 23a220d commit a2892b1

10 files changed

+38
-56
lines changed

paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ public KeyValueFileStore(
9393
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
9494
this.mfFactory = mfFactory;
9595
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
96-
List<String> ignoreFields = options.changelogRowDeduplicateIgnoreFields();
96+
List<String> logDedupIgnoreFields = options.changelogRowDeduplicateIgnoreFields();
9797
this.logDedupEqualSupplier =
98-
options.changelogRowDeduplicate() && !ignoreFields.isEmpty()
99-
? ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields)
100-
: new ValueEqualiserSupplier(valueType);
98+
options.changelogRowDeduplicate()
99+
? ValueEqualiserSupplier.fromIgnoreFields(valueType, logDedupIgnoreFields)
100+
: () -> null;
101101
this.tableName = tableName;
102102
}
103103

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.paimon.types.RowKind;
2424
import org.apache.paimon.utils.Preconditions;
2525

26+
import javax.annotation.Nullable;
27+
2628
/**
2729
* Wrapper for {@link MergeFunction}s to produce changelog during a full compaction.
2830
*
@@ -39,8 +41,7 @@ public class FullChangelogMergeFunctionWrapper implements MergeFunctionWrapper<C
3941

4042
private final MergeFunction<KeyValue> mergeFunction;
4143
private final int maxLevel;
42-
private final RecordEqualiser valueEqualiser;
43-
private final boolean changelogRowDeduplicate;
44+
@Nullable private final RecordEqualiser valueEqualiser;
4445

4546
// only full compaction will write files into maxLevel, see UniversalCompaction class
4647
private KeyValue topLevelKv;
@@ -54,12 +55,10 @@ public class FullChangelogMergeFunctionWrapper implements MergeFunctionWrapper<C
5455
public FullChangelogMergeFunctionWrapper(
5556
MergeFunction<KeyValue> mergeFunction,
5657
int maxLevel,
57-
RecordEqualiser valueEqualiser,
58-
boolean changelogRowDeduplicate) {
58+
@Nullable RecordEqualiser valueEqualiser) {
5959
this.mergeFunction = mergeFunction;
6060
this.maxLevel = maxLevel;
6161
this.valueEqualiser = valueEqualiser;
62-
this.changelogRowDeduplicate = changelogRowDeduplicate;
6362
}
6463

6564
@Override
@@ -106,7 +105,7 @@ public ChangelogResult getResult() {
106105
} else {
107106
if (!merged.isAdd()) {
108107
reusedResult.addChangelog(replace(reusedBefore, RowKind.DELETE, topLevelKv));
109-
} else if (!changelogRowDeduplicate
108+
} else if (valueEqualiser == null
110109
|| !valueEqualiser.equals(topLevelKv.value(), merged.value())) {
111110
reusedResult
112111
.addChangelog(replace(reusedBefore, RowKind.UPDATE_BEFORE, topLevelKv))

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@
4242
/** A {@link MergeTreeCompactRewriter} which produces changelog files for each full compaction. */
4343
public class FullChangelogMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter {
4444

45-
private final RecordEqualiser valueEqualiser;
46-
private final boolean changelogRowDeduplicate;
45+
@Nullable private final RecordEqualiser valueEqualiser;
4746

4847
public FullChangelogMergeTreeCompactRewriter(
4948
int maxLevel,
@@ -54,8 +53,7 @@ public FullChangelogMergeTreeCompactRewriter(
5453
@Nullable FieldsComparator userDefinedSeqComparator,
5554
MergeFunctionFactory<KeyValue> mfFactory,
5655
MergeSorter mergeSorter,
57-
RecordEqualiser valueEqualiser,
58-
boolean changelogRowDeduplicate) {
56+
@Nullable RecordEqualiser valueEqualiser) {
5957
super(
6058
maxLevel,
6159
mergeEngine,
@@ -68,7 +66,6 @@ public FullChangelogMergeTreeCompactRewriter(
6866
true,
6967
false);
7068
this.valueEqualiser = valueEqualiser;
71-
this.changelogRowDeduplicate = changelogRowDeduplicate;
7269
}
7370

7471
@Override
@@ -90,8 +87,7 @@ protected UpgradeStrategy upgradeStrategy(int outputLevel, DataFileMeta file) {
9087

9188
@Override
9289
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel) {
93-
return new FullChangelogMergeFunctionWrapper(
94-
mfFactory.create(), maxLevel, valueEqualiser, changelogRowDeduplicate);
90+
return new FullChangelogMergeFunctionWrapper(mfFactory.create(), maxLevel, valueEqualiser);
9591
}
9692

9793
@Override

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,15 @@ public class LookupChangelogMergeFunctionWrapper<T>
6363
private final ChangelogResult reusedResult = new ChangelogResult();
6464
private final KeyValue reusedBefore = new KeyValue();
6565
private final KeyValue reusedAfter = new KeyValue();
66-
private final RecordEqualiser valueEqualiser;
67-
private final boolean changelogRowDeduplicate;
66+
@Nullable private final RecordEqualiser valueEqualiser;
6867
private final LookupStrategy lookupStrategy;
6968
private final @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer;
7069
private final Comparator<KeyValue> comparator;
7170

7271
public LookupChangelogMergeFunctionWrapper(
7372
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
7473
Function<InternalRow, T> lookup,
75-
RecordEqualiser valueEqualiser,
76-
boolean changelogRowDeduplicate,
74+
@Nullable RecordEqualiser valueEqualiser,
7775
LookupStrategy lookupStrategy,
7876
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
7977
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
@@ -91,7 +89,6 @@ public LookupChangelogMergeFunctionWrapper(
9189
this.mergeFunction2 = mergeFunctionFactory.create();
9290
this.lookup = lookup;
9391
this.valueEqualiser = valueEqualiser;
94-
this.changelogRowDeduplicate = changelogRowDeduplicate;
9592
this.lookupStrategy = lookupStrategy;
9693
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
9794
this.comparator = createSequenceComparator(userDefinedSeqComparator);
@@ -179,7 +176,7 @@ private void setChangelog(@Nullable KeyValue before, KeyValue after) {
179176
} else {
180177
if (!after.isAdd()) {
181178
reusedResult.addChangelog(replaceBefore(RowKind.DELETE, before));
182-
} else if (!changelogRowDeduplicate
179+
} else if (valueEqualiser == null
183180
|| !valueEqualiser.equals(before.value(), after.value())) {
184181
reusedResult
185182
.addChangelog(replaceBefore(RowKind.UPDATE_BEFORE, before))

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,18 +152,15 @@ MergeFunctionWrapper<ChangelogResult> create(
152152
public static class LookupMergeFunctionWrapperFactory<T>
153153
implements MergeFunctionWrapperFactory<T> {
154154

155-
private final RecordEqualiser valueEqualiser;
156-
private final boolean changelogRowDeduplicate;
155+
@Nullable private final RecordEqualiser valueEqualiser;
157156
private final LookupStrategy lookupStrategy;
158157
@Nullable private final UserDefinedSeqComparator userDefinedSeqComparator;
159158

160159
public LookupMergeFunctionWrapperFactory(
161-
RecordEqualiser valueEqualiser,
162-
boolean changelogRowDeduplicate,
160+
@Nullable RecordEqualiser valueEqualiser,
163161
LookupStrategy lookupStrategy,
164162
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
165163
this.valueEqualiser = valueEqualiser;
166-
this.changelogRowDeduplicate = changelogRowDeduplicate;
167164
this.lookupStrategy = lookupStrategy;
168165
this.userDefinedSeqComparator = userDefinedSeqComparator;
169166
}
@@ -184,7 +181,6 @@ public MergeFunctionWrapper<ChangelogResult> create(
184181
}
185182
},
186183
valueEqualiser,
187-
changelogRowDeduplicate,
188184
lookupStrategy,
189185
deletionVectorsMaintainer,
190186
userDefinedSeqComparator);

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> {
9999
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
100100
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
101101
private final Supplier<FieldsComparator> udsComparatorSupplier;
102-
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
102+
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
103103
private final MergeFunctionFactory<KeyValue> mfFactory;
104104
private final CoreOptions options;
105105
private final FileIO fileIO;
@@ -119,7 +119,7 @@ public KeyValueFileStoreWrite(
119119
RowType valueType,
120120
Supplier<Comparator<InternalRow>> keyComparatorSupplier,
121121
Supplier<FieldsComparator> udsComparatorSupplier,
122-
Supplier<RecordEqualiser> valueEqualiserSupplier,
122+
Supplier<RecordEqualiser> logDedupEqualSupplier,
123123
MergeFunctionFactory<KeyValue> mfFactory,
124124
FileStorePathFactory pathFactory,
125125
Map<String, FileStorePathFactory> format2PathFactory,
@@ -165,7 +165,7 @@ public KeyValueFileStoreWrite(
165165
format2PathFactory,
166166
options.targetFileSize(true));
167167
this.keyComparatorSupplier = keyComparatorSupplier;
168-
this.valueEqualiserSupplier = valueEqualiserSupplier;
168+
this.logDedupEqualSupplier = logDedupEqualSupplier;
169169
this.mfFactory = mfFactory;
170170
this.options = options;
171171
}
@@ -294,8 +294,7 @@ private MergeTreeCompactRewriter createRewriter(
294294
userDefinedSeqComparator,
295295
mfFactory,
296296
mergeSorter,
297-
valueEqualiserSupplier.get(),
298-
options.changelogRowDeduplicate());
297+
logDedupEqualSupplier.get());
299298
} else if (lookupStrategy.needLookup) {
300299
LookupLevels.ValueProcessor<?> processor;
301300
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> wrapperFactory;
@@ -323,8 +322,7 @@ private MergeTreeCompactRewriter createRewriter(
323322
: new KeyValueProcessor(valueType);
324323
wrapperFactory =
325324
new LookupMergeFunctionWrapperFactory<>(
326-
valueEqualiserSupplier.get(),
327-
options.changelogRowDeduplicate(),
325+
logDedupEqualSupplier.get(),
328326
lookupStrategy,
329327
UserDefinedSeqComparator.create(valueType, options));
330328
}

paimon-core/src/main/java/org/apache/paimon/utils/ValueEqualiserSupplier.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,14 @@ public RecordEqualiser get() {
5858

5959
public static ValueEqualiserSupplier fromIgnoreFields(
6060
RowType rowType, @Nullable List<String> ignoreFields) {
61-
int[] projection = getProjectionWithIgnoreFields(rowType, ignoreFields);
61+
int[] projection = null;
62+
if (ignoreFields != null) {
63+
List<String> fieldNames = rowType.getFieldNames();
64+
projection =
65+
IntStream.range(0, rowType.getFieldCount())
66+
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
67+
.toArray();
68+
}
6269
return new ValueEqualiserSupplier(rowType, projection);
6370
}
64-
65-
private static int[] getProjectionWithIgnoreFields(RowType rowType, List<String> ignoreFields) {
66-
List<String> fieldNames = rowType.getFieldNames();
67-
IntStream projectionStream = IntStream.range(0, rowType.getFieldCount());
68-
return projectionStream
69-
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
70-
.toArray();
71-
}
7271
}

paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,7 @@ protected UpgradeStrategy upgradeStrategy(int outputLevel, DataFileMeta file) {
306306

307307
@Override
308308
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel) {
309-
return new FullChangelogMergeFunctionWrapper(
310-
mfFactory.create(), MAX_LEVEL, null, false);
309+
return new FullChangelogMergeFunctionWrapper(mfFactory.create(), MAX_LEVEL, null);
311310
}
312311

313312
@Override

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public abstract class FullChangelogMergeFunctionWrapperTestBase {
5555
public void beforeEach() {
5656
wrapper =
5757
new FullChangelogMergeFunctionWrapper(
58-
createMergeFunction(), MAX_LEVEL, EQUALISER, changelogRowDeduplicate());
58+
createMergeFunction(),
59+
MAX_LEVEL,
60+
changelogRowDeduplicate() ? EQUALISER : null);
5961
}
6062

6163
private static final List<List<KeyValue>> INPUT_KVS =
@@ -233,7 +235,7 @@ public void testFullChangelogMergeFunctionWrapperWithIgnoreFields() {
233235
ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields);
234236
FullChangelogMergeFunctionWrapper function =
235237
new FullChangelogMergeFunctionWrapper(
236-
createMergeFunction(), MAX_LEVEL, logDedupEqualSupplier.get(), true);
238+
createMergeFunction(), MAX_LEVEL, logDedupEqualSupplier.get());
237239

238240
// With level-0 'insert' record, with max level same record. Notice that the specified
239241
// ignored

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
7474
RowType.of(DataTypes.INT()),
7575
RowType.of(DataTypes.INT())),
7676
highLevel::get,
77-
EQUALISER,
78-
changelogRowDeduplicate,
77+
changelogRowDeduplicate ? EQUALISER : null,
7978
LookupStrategy.from(false, true, false, false),
8079
null,
8180
null);
@@ -241,7 +240,6 @@ public void testDeduplicateWithIgnoreFields() {
241240
valueType),
242241
highLevel::get,
243242
logDedupEqualSupplier.get(),
244-
true,
245243
LookupStrategy.from(false, true, false, false),
246244
null,
247245
userDefinedSeqComparator);
@@ -300,8 +298,7 @@ public void testSum(boolean changelogRowDeduplicate) {
300298
RowType.of(DataTypes.INT()),
301299
RowType.of(DataTypes.INT())),
302300
key -> null,
303-
EQUALISER,
304-
changelogRowDeduplicate,
301+
changelogRowDeduplicate ? EQUALISER : null,
305302
LookupStrategy.from(false, true, false, false),
306303
null,
307304
null);
@@ -389,8 +386,7 @@ public void testMergeHighLevelOrder() {
389386
RowType.of(DataTypes.INT()),
390387
RowType.of(DataTypes.INT())),
391388
highLevel::get,
392-
EQUALISER,
393-
false,
389+
null,
394390
LookupStrategy.from(false, true, false, false),
395391
null,
396392
UserDefinedSeqComparator.create(

0 commit comments

Comments
 (0)