Skip to content

Commit 93d2623

Browse files
authored
[core] Introduce 'nested_partial_update' agg func (#6924)
1 parent 3082e5d commit 93d2623

File tree

5 files changed

+216
-1
lines changed

5 files changed

+216
-1
lines changed

docs/content/primary-key-table/merge-engine/aggregation.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,11 @@ public static class BitmapContainsUDF extends ScalarFunction {
376376

377377
{{< /tabs >}}
378378

379+
### nested_partial_update
380+
The nested_partial_update function collects multiple rows into one array<row> (so-called 'nested table'). It supports
381+
ARRAY<ROW> data types. You need to use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the
382+
nested table. The values in each row are written by partial updating some columns.
383+
379384
### collect
380385
The collect function collects elements into an Array. You can set `fields.<field-name>.distinct=true` to deduplicate elements.
381386
It only supports ARRAY type.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact.aggregate;
20+
21+
import org.apache.paimon.codegen.Projection;
22+
import org.apache.paimon.data.BinaryRow;
23+
import org.apache.paimon.data.GenericArray;
24+
import org.apache.paimon.data.GenericRow;
25+
import org.apache.paimon.data.InternalArray;
26+
import org.apache.paimon.data.InternalRow;
27+
import org.apache.paimon.data.InternalRow.FieldGetter;
28+
import org.apache.paimon.types.ArrayType;
29+
import org.apache.paimon.types.RowType;
30+
31+
import java.util.ArrayList;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
37+
import static org.apache.paimon.utils.Preconditions.checkArgument;
38+
39+
/**
40+
* Used to partial update a field which representing a nested table. The data type of nested table
41+
* field is {@code ARRAY<ROW>}.
42+
*/
43+
public class FieldNestedPartialUpdateAgg extends FieldAggregator {
44+
45+
private static final long serialVersionUID = 1L;
46+
47+
private final int nestedFields;
48+
private final Projection keyProjection;
49+
private final FieldGetter[] fieldGetters;
50+
51+
public FieldNestedPartialUpdateAgg(String name, ArrayType dataType, List<String> nestedKey) {
52+
super(name, dataType);
53+
RowType nestedType = (RowType) dataType.getElementType();
54+
this.nestedFields = nestedType.getFieldCount();
55+
checkArgument(!nestedKey.isEmpty());
56+
this.keyProjection = newProjection(nestedType, nestedKey);
57+
this.fieldGetters = new FieldGetter[nestedFields];
58+
for (int i = 0; i < nestedFields; i++) {
59+
fieldGetters[i] = InternalRow.createFieldGetter(nestedType.getTypeAt(i), i);
60+
}
61+
}
62+
63+
@Override
64+
public Object agg(Object accumulator, Object inputField) {
65+
if (accumulator == null || inputField == null) {
66+
return accumulator == null ? inputField : accumulator;
67+
}
68+
69+
InternalArray acc = (InternalArray) accumulator;
70+
InternalArray input = (InternalArray) inputField;
71+
72+
List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
73+
addNonNullRows(acc, rows);
74+
addNonNullRows(input, rows);
75+
76+
if (keyProjection != null) {
77+
Map<BinaryRow, GenericRow> map = new HashMap<>();
78+
for (InternalRow row : rows) {
79+
BinaryRow key = keyProjection.apply(row).copy();
80+
GenericRow toUpdate = map.computeIfAbsent(key, k -> new GenericRow(nestedFields));
81+
partialUpdate(toUpdate, row);
82+
}
83+
84+
rows = new ArrayList<>(map.values());
85+
}
86+
87+
return new GenericArray(rows.toArray());
88+
}
89+
90+
private void addNonNullRows(InternalArray array, List<InternalRow> rows) {
91+
for (int i = 0; i < array.size(); i++) {
92+
if (array.isNullAt(i)) {
93+
continue;
94+
}
95+
rows.add(array.getRow(i, nestedFields));
96+
}
97+
}
98+
99+
private void partialUpdate(GenericRow toUpdate, InternalRow input) {
100+
for (int i = 0; i < fieldGetters.length; i++) {
101+
FieldGetter fieldGetter = fieldGetters[i];
102+
Object field = fieldGetter.getFieldOrNull(input);
103+
if (field != null) {
104+
toUpdate.setField(i, field);
105+
}
106+
}
107+
}
108+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact.aggregate.factory;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedPartialUpdateAgg;
23+
import org.apache.paimon.types.ArrayType;
24+
import org.apache.paimon.types.DataType;
25+
import org.apache.paimon.types.RowType;
26+
27+
import java.util.List;
28+
29+
import static org.apache.paimon.utils.Preconditions.checkArgument;
30+
31+
/** Factory for #{@link FieldNestedPartialUpdateAgg}. */
32+
public class FieldNestedPartialUpdateAggFactory implements FieldAggregatorFactory {
33+
34+
public static final String NAME = "nested_partial_update";
35+
36+
@Override
37+
public FieldNestedPartialUpdateAgg create(
38+
DataType fieldType, CoreOptions options, String field) {
39+
return createFieldNestedPartialUpdateAgg(
40+
fieldType, options.fieldNestedUpdateAggNestedKey(field));
41+
}
42+
43+
@Override
44+
public String identifier() {
45+
return NAME;
46+
}
47+
48+
private FieldNestedPartialUpdateAgg createFieldNestedPartialUpdateAgg(
49+
DataType fieldType, List<String> nestedKey) {
50+
checkArgument(!nestedKey.isEmpty());
51+
String typeErrorMsg =
52+
"Data type for nested table column must be 'Array<Row>' but was '%s'.";
53+
checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
54+
ArrayType arrayType = (ArrayType) fieldType;
55+
checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType);
56+
return new FieldNestedPartialUpdateAgg(identifier(), arrayType, nestedKey);
57+
}
58+
}

paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
3030
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
3131
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
3232
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
33+
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory
3334
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory
3435
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory
3536
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
4141
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
4242
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
43+
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory;
4344
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
4445
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
4546
import org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
@@ -626,7 +627,7 @@ private GenericArray singletonArray(InternalRow row) {
626627
return new GenericArray(new InternalRow[] {row});
627628
}
628629

629-
private InternalRow row(int k0, int k1, String v) {
630+
private InternalRow row(Integer k0, Integer k1, String v) {
630631
return GenericRow.of(k0, k1, BinaryString.fromString(v));
631632
}
632633

@@ -1154,6 +1155,48 @@ public void testCustomAgg() throws IOException {
11541155
assertThat(agg).isEqualTo("test");
11551156
}
11561157

1158+
@Test
1159+
public void testFieldNestedPartialUpdateAgg() {
1160+
DataType elementRowType =
1161+
DataTypes.ROW(
1162+
DataTypes.FIELD(0, "k", DataTypes.INT()),
1163+
DataTypes.FIELD(1, "v1", DataTypes.INT()),
1164+
DataTypes.FIELD(2, "v2", DataTypes.STRING()));
1165+
FieldNestedPartialUpdateAgg agg =
1166+
new FieldNestedPartialUpdateAgg(
1167+
FieldNestedPartialUpdateAggFactory.NAME,
1168+
DataTypes.ARRAY(
1169+
DataTypes.ROW(
1170+
DataTypes.FIELD(0, "k", DataTypes.INT()),
1171+
DataTypes.FIELD(1, "v1", DataTypes.INT()),
1172+
DataTypes.FIELD(2, "v2", DataTypes.STRING()))),
1173+
Collections.singletonList("k"));
1174+
1175+
InternalArray accumulator;
1176+
InternalArray.ElementGetter elementGetter =
1177+
InternalArray.createElementGetter(elementRowType);
1178+
1179+
InternalRow current = row(0, 0, null);
1180+
accumulator = (InternalArray) agg.agg(null, singletonArray(current));
1181+
assertThat(unnest(accumulator, elementGetter))
1182+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(current));
1183+
1184+
current = row(0, null, "A");
1185+
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
1186+
assertThat(unnest(accumulator, elementGetter))
1187+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 0, "A")));
1188+
1189+
current = row(0, 1, "B");
1190+
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
1191+
assertThat(unnest(accumulator, elementGetter))
1192+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
1193+
1194+
current = row(1, 2, "C");
1195+
accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current));
1196+
assertThat(unnest(accumulator, elementGetter))
1197+
.containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(1, 2, "C")));
1198+
}
1199+
11571200
private Map<Object, Object> toMap(Object... kvs) {
11581201
Map<Object, Object> result = new HashMap<>();
11591202
for (int i = 0; i < kvs.length; i += 2) {

0 commit comments

Comments
 (0)