Skip to content

Commit 4b34e36

Browse files
zhaoshuJingsongLi
authored andcommitted
paimon-spark supports row id push down
update format code
1 parent f5e5ada commit 4b34e36

File tree

14 files changed

+444
-9
lines changed

14 files changed

+444
-9
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,12 @@
10371037
<td>String</td>
10381038
<td>Time field for record level expire. It supports the following types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.</td>
10391039
</tr>
1040+
<tr>
1041+
<td><h5>row-id-push-down.enabled</h5></td>
1042+
<td style="word-wrap: break-word;">false</td>
1043+
<td>Boolean</td>
1044+
<td>Whether to enable row id push down for scan. Currently, only the data evolution table supports row id push down.</td>
1045+
</tr>
10401046
<tr>
10411047
<td><h5>row-tracking.enabled</h5></td>
10421048
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,14 @@ public InlineElement getDescription() {
21462146
.withDescription(
21472147
"Whether to try upgrading the data files after overwriting a primary key table.");
21482148

2149+
public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
2150+
key("row-id-push-down.enabled")
2151+
.booleanType()
2152+
.defaultValue(false)
2153+
.withDescription(
2154+
"Whether to enable row id push down for scan."
2155+
+ " Currently, only the data evolution table supports row id push down.");
2156+
21492157
private final Options options;
21502158

21512159
public CoreOptions(Map<String, String> options) {
@@ -3330,6 +3338,10 @@ public boolean overwriteUpgrade() {
33303338
return options.get(OVERWRITE_UPGRADE);
33313339
}
33323340

3341+
public boolean rowIdPushDownEnabled() {
3342+
return options.get(ROW_ID_PUSH_DOWN_ENABLED);
3343+
}
3344+
33333345
/** Specifies the merge engine for table with primary key. */
33343346
public enum MergeEngine implements DescribedEnum {
33353347
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import org.apache.paimon.utils.Range;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
import static org.apache.paimon.table.SpecialFields.ROW_ID;
27+
28+
/**
29+
* The {@link PredicateVisitor} to extract a list of row id ranges from predicates. The returned row
30+
* id ranges can be pushed down to manifest readers and file readers to enable efficient random
31+
* access.
32+
*
33+
* <p>Note that there is a significant distinction between returning {@code null} and returning an
34+
* empty list:
35+
*
36+
* <ul>
37+
* <li>{@code null} indicates that the predicate cannot be converted into a random-access pattern,
38+
* meaning the filter is not consumable by this visitor.
39+
* <li>An empty list indicates that no rows satisfy the predicate (e.g. {@code WHERE _ROW_ID = 3
40+
* AND _ROW_ID IN (1, 2)}).
41+
* </ul>
42+
*/
43+
public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
44+
45+
@Override
46+
public List<Range> visit(LeafPredicate predicate) {
47+
if (ROW_ID.name().equals(predicate.fieldName())) {
48+
LeafFunction function = predicate.function();
49+
if (function instanceof Equal || function instanceof In) {
50+
ArrayList<Long> rowIds = new ArrayList<>();
51+
for (Object literal : predicate.literals()) {
52+
rowIds.add((Long) literal);
53+
}
54+
// The list output by getRangesFromList is already sorted,
55+
// and has no overlap
56+
return Range.getRangesFromList(rowIds);
57+
}
58+
}
59+
return null;
60+
}
61+
62+
@Override
63+
public List<Range> visit(CompoundPredicate predicate) {
64+
CompoundPredicate.Function function = predicate.function();
65+
List<Range> rowIds = null;
66+
// `And` means we should get the intersection of all children.
67+
if (function instanceof And) {
68+
for (Predicate child : predicate.children()) {
69+
List<Range> childList = child.visit(this);
70+
if (childList == null) {
71+
continue;
72+
}
73+
74+
if (rowIds == null) {
75+
rowIds = childList;
76+
} else {
77+
rowIds = Range.and(rowIds, childList);
78+
}
79+
80+
// shortcut for intersection
81+
if (rowIds.isEmpty()) {
82+
return rowIds;
83+
}
84+
}
85+
} else if (function instanceof Or) {
86+
// `Or` means we should get the union of all children
87+
rowIds = new ArrayList<>();
88+
for (Predicate child : predicate.children()) {
89+
List<Range> childList = child.visit(this);
90+
if (childList == null) {
91+
return null;
92+
}
93+
94+
rowIds.addAll(childList);
95+
rowIds = Range.sortAndMergeOverlap(rowIds, true);
96+
}
97+
} else {
98+
// unexpected function type, just return null
99+
return null;
100+
}
101+
return rowIds;
102+
}
103+
104+
@Override
105+
public List<Range> visit(TransformPredicate predicate) {
106+
// do not support transform predicate now.
107+
return null;
108+
}
109+
}

paimon-common/src/main/java/org/apache/paimon/utils/Range.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Comparator;
2525
import java.util.List;
2626
import java.util.Objects;
27+
import java.util.stream.Collectors;
2728

2829
/** Range represents from (inclusive) and to (inclusive). */
2930
public class Range implements Serializable {
@@ -177,6 +178,34 @@ public static List<Range> mergeSortedAsPossible(List<Range> ranges) {
177178
return result;
178179
}
179180

181+
public static List<Range> getRangesFromList(List<Long> origLongs) {
182+
if (origLongs == null || origLongs.isEmpty()) {
183+
return Collections.emptyList();
184+
}
185+
186+
List<Long> longs = origLongs.stream().distinct().sorted().collect(Collectors.toList());
187+
188+
ArrayList<Range> ranges = new ArrayList<>();
189+
Long rangeStart = null;
190+
Long rangeEnd = null;
191+
for (Long cur : longs) {
192+
if (rangeStart == null) {
193+
rangeStart = cur;
194+
rangeEnd = cur;
195+
} else if (rangeEnd == cur - 1) {
196+
rangeEnd = cur;
197+
} else {
198+
ranges.add(new Range(rangeStart, rangeEnd));
199+
rangeStart = cur;
200+
rangeEnd = cur;
201+
}
202+
}
203+
if (rangeStart != null) {
204+
ranges.add(new Range(rangeStart, rangeEnd));
205+
}
206+
return ranges;
207+
}
208+
180209
/**
181210
* Computes the intersection of two lists of ranges.
182211
*

paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,12 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
653653
"Data evolution config must disabled with deletion-vectors.enabled");
654654
}
655655

656+
if (options.rowIdPushDownEnabled()) {
657+
checkArgument(
658+
options.dataEvolutionEnabled(),
659+
"Row id push down config must enabled with data-evolution.enabled");
660+
}
661+
656662
List<String> blobNames =
657663
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
658664
if (!blobNames.isEmpty()) {

paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package org.apache.paimon.table.source;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.annotation.VisibleForTesting;
2223
import org.apache.paimon.data.variant.VariantAccessInfo;
2324
import org.apache.paimon.data.variant.VariantAccessInfoUtils;
2425
import org.apache.paimon.partition.PartitionPredicate;
26+
import org.apache.paimon.predicate.CompoundPredicate;
27+
import org.apache.paimon.predicate.LeafPredicate;
2528
import org.apache.paimon.predicate.Predicate;
2629
import org.apache.paimon.predicate.PredicateBuilder;
30+
import org.apache.paimon.predicate.RowIdPredicateVisitor;
2731
import org.apache.paimon.predicate.TopN;
2832
import org.apache.paimon.predicate.VectorSearch;
2933
import org.apache.paimon.table.InnerTable;
@@ -33,12 +37,14 @@
3337

3438
import javax.annotation.Nullable;
3539

40+
import java.util.ArrayList;
3641
import java.util.List;
3742
import java.util.Map;
3843
import java.util.Objects;
3944

4045
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
4146
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
47+
import static org.apache.paimon.table.SpecialFields.ROW_ID;
4248
import static org.apache.paimon.utils.Preconditions.checkState;
4349

4450
/** Implementation for {@link ReadBuilder}. */
@@ -65,7 +71,7 @@ public class ReadBuilderImpl implements ReadBuilder {
6571

6672
private @Nullable RowType readType;
6773
private @Nullable VariantAccessInfo[] variantAccessInfo;
68-
private @Nullable List<Range> rowRanges;
74+
public @Nullable @VisibleForTesting List<Range> rowRanges;
6975
private @Nullable VectorSearch vectorSearch;
7076

7177
private boolean dropStats = false;
@@ -99,9 +105,55 @@ public ReadBuilder withFilter(Predicate filter) {
99105
} else {
100106
this.filter = PredicateBuilder.and(this.filter, filter);
101107
}
108+
calculateRowRanges(this.filter);
109+
this.filter = removeRowIdFilter(this.filter);
102110
return this;
103111
}
104112

113+
private void calculateRowRanges(Predicate filter) {
114+
if (filter == null) {
115+
return;
116+
}
117+
118+
RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
119+
List<Range> ranges = filter.visit(visitor);
120+
// When rowRanges is not null, filter data based on rowRanges.
121+
// If rowRanges is empty, it means no data will be read.
122+
if (ranges != null) {
123+
withRowRanges(ranges);
124+
}
125+
}
126+
127+
private Predicate removeRowIdFilter(Predicate filter) {
128+
if (filter == null) {
129+
return null;
130+
}
131+
132+
if (filter instanceof LeafPredicate
133+
&& ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) {
134+
return null;
135+
} else if (filter instanceof CompoundPredicate) {
136+
CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
137+
138+
List<Predicate> newChildren = new ArrayList<>();
139+
for (Predicate child : compoundPredicate.children()) {
140+
Predicate newChild = removeRowIdFilter(child);
141+
if (newChild != null) {
142+
newChildren.add(newChild);
143+
}
144+
}
145+
146+
if (newChildren.isEmpty()) {
147+
return null;
148+
} else if (newChildren.size() == 1) {
149+
return newChildren.get(0);
150+
} else {
151+
return new CompoundPredicate(compoundPredicate.function(), newChildren);
152+
}
153+
}
154+
return filter;
155+
}
156+
105157
@Override
106158
public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
107159
if (partitionSpec != null) {

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818

1919
package org.apache.paimon.spark
2020

21+
import org.apache.paimon.CoreOptions
2122
import org.apache.paimon.partition.PartitionPredicate
2223
import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
2324
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
24-
import org.apache.paimon.table.Table
25-
import org.apache.paimon.types.RowType
25+
import org.apache.paimon.table.{InnerTable, Table}
26+
import org.apache.paimon.table.SpecialFields.ROW_ID
27+
import org.apache.paimon.types.{DataField, DataTypes, RowType}
2628

2729
import org.apache.spark.sql.connector.read.{SupportsPushDownFilters, SupportsPushDownRequiredColumns}
2830
import org.apache.spark.sql.sources.Filter
2931
import org.apache.spark.sql.types.StructType
3032

31-
import java.util.{List => JList}
33+
import java.util.{ArrayList, List => JList}
3234

3335
import scala.collection.JavaConverters._
3436
import scala.collection.mutable
@@ -41,6 +43,7 @@ abstract class PaimonBaseScanBuilder
4143
val table: Table
4244
val partitionKeys: JList[String] = table.partitionKeys()
4345
val rowType: RowType = table.rowType()
46+
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
4447

4548
private var pushedSparkFilters = Array.empty[Filter]
4649
protected var hasPostScanPredicates = false
@@ -65,7 +68,14 @@ abstract class PaimonBaseScanBuilder
6568
val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
6669
val postScan = mutable.ArrayBuffer.empty[Filter]
6770

68-
val converter = new SparkFilterConverter(rowType)
71+
var newRowType = rowType
72+
if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
73+
val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
74+
dataFieldsWithRowId.add(
75+
new DataField(rowType.getFieldCount, ROW_ID.name(), DataTypes.BIGINT()))
76+
newRowType = rowType.copy(dataFieldsWithRowId)
77+
}
78+
val converter = new SparkFilterConverter(newRowType)
6979
val partitionPredicateVisitor = new PartitionPredicateVisitor(partitionKeys)
7080
filters.foreach {
7181
filter =>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}

0 commit comments

Comments
 (0)