Skip to content

Commit bd9317e

Browse files
authored
[core] Check that all fields with aggregate functions in partial-update should be protected by sequence-group (#5034)
1 parent a01f007 commit bd9317e

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.data.InternalRow;
2525
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
2626
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
27+
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
2728
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
2829
import org.apache.paimon.options.Options;
2930
import org.apache.paimon.types.DataField;
@@ -548,9 +549,13 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
548549

549550
String aggFuncName = getAggFuncName(options, fieldName);
550551
if (aggFuncName != null) {
552+
// last_non_null_value doesn't require sequence group
551553
checkArgument(
552-
!fieldSeqComparators.isEmpty(),
553-
"Must use sequence group for aggregation functions.");
554+
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
555+
|| fieldSeqComparators.containsKey(
556+
fieldNames.indexOf(fieldName)),
557+
"Must use sequence group for aggregation functions but not found for field %s.",
558+
fieldName);
554559
fieldAggregators.put(
555560
i,
556561
() ->

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.KeyValue;
2222
import org.apache.paimon.data.GenericRow;
2323
import org.apache.paimon.options.Options;
24+
import org.apache.paimon.types.DataType;
2425
import org.apache.paimon.types.DataTypes;
2526
import org.apache.paimon.types.RowKind;
2627
import org.apache.paimon.types.RowType;
@@ -31,6 +32,7 @@
3132
import org.junit.jupiter.api.Test;
3233

3334
import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
35+
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
3436
import static org.assertj.core.api.Assertions.assertThat;
3537
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3638

@@ -820,14 +822,39 @@ public void testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown()
820822

821823
@Test
822824
public void testAggregationWithoutSequenceGroup() {
823-
Options options = new Options();
824-
options.set("fields.f1.aggregate-function", "listagg");
825-
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT());
825+
RowType rowType =
826+
RowType.of(
827+
new DataType[] {
828+
DataTypes.INT(),
829+
DataTypes.INT(),
830+
DataTypes.INT(),
831+
DataTypes.INT(),
832+
DataTypes.INT()
833+
},
834+
new String[] {"pk", "f0", "g0", "f1", "g1"});
835+
836+
Options options1 = new Options();
837+
options1.set("fields.f0.aggregate-function", "listagg");
838+
options1.set("fields.f1.aggregate-function", "listagg");
826839
assertThatThrownBy(
827840
() ->
828841
PartialUpdateMergeFunction.factory(
829-
options, rowType, ImmutableList.of("f0")))
830-
.hasMessageContaining("Must use sequence group for aggregation functions");
842+
options1, rowType, ImmutableList.of("pk")))
843+
.satisfies(
844+
anyCauseMatches(
845+
IllegalArgumentException.class,
846+
"Must use sequence group for aggregation functions but not found for field f0."));
847+
848+
Options options2 = new Options(options1.toMap());
849+
options2.set("fields.g0.sequence-group", "f0");
850+
assertThatThrownBy(
851+
() ->
852+
PartialUpdateMergeFunction.factory(
853+
options2, rowType, ImmutableList.of("pk")))
854+
.satisfies(
855+
anyCauseMatches(
856+
IllegalArgumentException.class,
857+
"Must use sequence group for aggregation functions but not found for field f1."));
831858
}
832859

833860
private void add(MergeFunction<KeyValue> function, Integer... f) {

0 commit comments

Comments
 (0)