Skip to content

Commit 214dcc5

Browse files
committed
[core] Validate merge function creation in SchemaValidation
1 parent 7a7373b commit 214dcc5

File tree

3 files changed

+25
-107
lines changed

3 files changed

+25
-107
lines changed

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 6 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.paimon.format.FileFormat;
2727
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
2828
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
29-
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
3029
import org.apache.paimon.options.ConfigOption;
3130
import org.apache.paimon.options.Options;
3231
import org.apache.paimon.table.BucketMode;
@@ -44,16 +43,12 @@
4443
import org.apache.paimon.utils.Preconditions;
4544
import org.apache.paimon.utils.StringUtils;
4645

47-
import java.util.ArrayList;
4846
import java.util.Arrays;
49-
import java.util.Collection;
5047
import java.util.Collections;
5148
import java.util.HashMap;
52-
import java.util.HashSet;
5349
import java.util.List;
5450
import java.util.Map;
5551
import java.util.Optional;
56-
import java.util.Set;
5752
import java.util.stream.Collectors;
5853

5954
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
@@ -77,7 +72,7 @@
7772
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
7873
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
7974
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
80-
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
75+
import static org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
8176
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
8277
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
8378
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
@@ -96,8 +91,6 @@ public class SchemaValidation {
9691
/**
9792
* Validate the {@link TableSchema} and {@link CoreOptions}.
9893
*
99-
* <p>TODO validate all items in schema and all keys in options.
100-
*
10194
* @param schema the schema to be validated
10295
*/
10396
public static void validateTableSchema(TableSchema schema) {
@@ -122,7 +115,7 @@ public static void validateTableSchema(TableSchema schema) {
122115

123116
validateSequenceField(schema, options);
124117

125-
validateSequenceGroup(schema, options);
118+
validateMergeFunction(schema);
126119

127120
ChangelogProducer changelogProducer = options.changelogProducer();
128121
if (schema.primaryKeys().isEmpty() && changelogProducer != ChangelogProducer.NONE) {
@@ -449,90 +442,12 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
449442
});
450443
}
451444

452-
private static void validateSequenceGroup(TableSchema schema, CoreOptions options) {
453-
Map<String, Set<String>> fields2Group = new HashMap<>();
454-
Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
455-
List<String> fieldNames = schema.fieldNames();
456-
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
457-
String k = entry.getKey();
458-
String v = entry.getValue();
459-
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
460-
Arrays.stream(v.split(FIELDS_SEPARATOR))
461-
.map(fieldName -> requireField(fieldName, fieldNames))
462-
.forEach(sequenceGroupFieldIndexs::add);
463-
String[] sequenceFieldNames =
464-
k.substring(
465-
FIELDS_PREFIX.length() + 1,
466-
k.length() - SEQUENCE_GROUP.length() - 1)
467-
.split(FIELDS_SEPARATOR);
468-
469-
for (String field : v.split(FIELDS_SEPARATOR)) {
470-
if (!fieldNames.contains(field)) {
471-
throw new IllegalArgumentException(
472-
String.format("Field %s can not be found in table schema.", field));
473-
}
474-
475-
List<String> sequenceFieldsList = new ArrayList<>();
476-
for (String sequenceFieldName : sequenceFieldNames) {
477-
if (!fieldNames.contains(sequenceFieldName)) {
478-
throw new IllegalArgumentException(
479-
String.format(
480-
"The sequence field group: %s can not be found in table schema.",
481-
sequenceFieldName));
482-
}
483-
sequenceFieldsList.add(sequenceFieldName);
484-
}
485-
486-
if (fields2Group.containsKey(field)) {
487-
List<List<String>> sequenceGroups = new ArrayList<>();
488-
sequenceGroups.add(new ArrayList<>(fields2Group.get(field)));
489-
sequenceGroups.add(sequenceFieldsList);
490-
491-
throw new IllegalArgumentException(
492-
String.format(
493-
"Field %s is defined repeatedly by multiple groups: %s.",
494-
field, sequenceGroups));
495-
}
496-
497-
Set<String> group = fields2Group.computeIfAbsent(field, p -> new HashSet<>());
498-
group.addAll(sequenceFieldsList);
499-
}
500-
501-
// add self
502-
Arrays.stream(sequenceFieldNames)
503-
.mapToInt(fieldName -> requireField(fieldName, fieldNames))
504-
.forEach(sequenceGroupFieldIndexs::add);
505-
}
506-
}
507-
508-
if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
509-
for (String fieldName : fieldNames) {
510-
String aggFunc = options.fieldAggFunc(fieldName);
511-
String aggFuncName = aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
512-
if (schema.primaryKeys().contains(fieldName)) {
513-
continue;
514-
}
515-
if (aggFuncName != null) {
516-
// last_non_null_value doesn't require sequence group
517-
checkArgument(
518-
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
519-
|| sequenceGroupFieldIndexs.contains(
520-
fieldNames.indexOf(fieldName)),
521-
"Must use sequence group for aggregation functions but not found for field %s.",
522-
fieldName);
523-
}
524-
}
445+
private static void validateMergeFunction(TableSchema schema) {
446+
if (schema.primaryKeys().isEmpty()) {
447+
return;
525448
}
526449

527-
Set<String> illegalGroup =
528-
fields2Group.values().stream()
529-
.flatMap(Collection::stream)
530-
.filter(g -> options.fieldAggFunc(g) != null)
531-
.collect(Collectors.toSet());
532-
if (!illegalGroup.isEmpty()) {
533-
throw new IllegalArgumentException(
534-
"Should not defined aggregation function on sequence group: " + illegalGroup);
535-
}
450+
createMergeFunctionFactory(schema);
536451
}
537452

538453
private static void validateForDeletionVectors(CoreOptions options) {

paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,25 +1711,28 @@ public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception {
17111711
options.set("partial-update.remove-record-on-sequence-group", "seq2");
17121712
},
17131713
rowType);
1714-
FileStoreTable wrongTable =
1715-
createFileStoreTable(
1716-
options -> {
1717-
options.set("merge-engine", "partial-update");
1718-
options.set("fields.seq1.sequence-group", "b");
1719-
options.set("fields.seq2.sequence-group", "c,d");
1720-
options.set("partial-update.remove-record-on-sequence-group", "b");
1721-
},
1722-
rowType);
1723-
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
17241714

1725-
assertThatThrownBy(() -> wrongTable.newWrite(""))
1715+
assertThatThrownBy(
1716+
() ->
1717+
createFileStoreTable(
1718+
options -> {
1719+
options.set("merge-engine", "partial-update");
1720+
options.set("fields.seq1.sequence-group", "b");
1721+
options.set("fields.seq2.sequence-group", "c,d");
1722+
options.set(
1723+
"partial-update.remove-record-on-sequence-group",
1724+
"b");
1725+
},
1726+
rowType))
17261727
.hasMessageContaining(
17271728
"field 'b' defined in 'partial-update.remove-record-on-sequence-group' option must be part of sequence groups");
17281729

17291730
SnapshotReader snapshotReader = table.newSnapshotReader();
17301731
TableRead read = table.newRead();
17311732
StreamTableWrite write = table.newWrite("");
17321733
StreamTableCommit commit = table.newCommit("");
1734+
Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
1735+
17331736
// 1. Inserts
17341737
write.write(GenericRow.of(1, 1, 10, 1, 20, 20, 1));
17351738
write.write(GenericRow.of(1, 1, 11, 2, 25, 25, 0));

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void testInvalidSequenceGroup() {
304304
+ "'merge-engine'='partial-update', "
305305
+ "'fields.g_1.sequence-group'='a1,b', "
306306
+ "'fields.g_2.sequence-group'='c,d');"))
307-
.hasRootCauseMessage("Field a1 can not be found in table schema.");
307+
.hasRootCauseMessage("Field a1 can not be found in table schema");
308308

309309
Assertions.assertThatThrownBy(
310310
() ->
@@ -315,8 +315,8 @@ public void testInvalidSequenceGroup() {
315315
+ "'merge-engine'='partial-update', "
316316
+ "'fields.g_1.sequence-group'='a,b', "
317317
+ "'fields.g_2.sequence-group'='a,d');"))
318-
.hasRootCauseMessage(
319-
"Field a is defined repeatedly by multiple groups: [[g_1], [g_2]].");
318+
.rootCause()
319+
.hasMessageContaining("Field a is defined repeatedly by multiple groups");
320320

321321
Assertions.assertThatThrownBy(
322322
() ->
@@ -327,8 +327,8 @@ public void testInvalidSequenceGroup() {
327327
+ "'merge-engine'='partial-update', "
328328
+ "'fields.g_1.sequence-group'='a,b', "
329329
+ "'fields.g_2,g_3.sequence-group'='a,d');"))
330-
.hasRootCauseMessage(
331-
"Field a is defined repeatedly by multiple groups: [[g_1], [g_2, g_3]].");
330+
.rootCause()
331+
.hasMessageContaining("Field a is defined repeatedly by multiple groups");
332332
}
333333

334334
@Test

0 commit comments

Comments
 (0)