Skip to content

Commit 75220e5

Browse files
author
zhaoshu
committed
paimon-spark supports row id push down
1 parent e14a811 commit 75220e5

File tree

17 files changed

+379
-26
lines changed

17 files changed

+379
-26
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import java.util.HashSet;
22+
import java.util.Set;
23+
24+
import static org.apache.paimon.table.SpecialFields.ROW_ID;
25+
26+
/**
27+
* The {@link PredicateVisitor} to extract a list of Row IDs from predicates. The returned Row IDs
28+
* can be pushed down to manifest readers and file readers to enable efficient random access.
29+
*
30+
* <p>Note that there is a significant distinction between returning {@code null} and returning an
31+
* empty set:
32+
*
33+
* <ul>
34+
* <li>{@code null} indicates that the predicate cannot be converted into a random-access pattern,
35+
* meaning the filter is not consumable by this visitor.
36+
* <li>An empty set indicates that no rows satisfy the predicate (e.g. {@code WHERE _ROW_ID = 3
37+
* AND _ROW_ID IN (1, 2)}).
38+
* </ul>
39+
*/
40+
public class RowIdPredicateVisitor implements PredicateVisitor<Set<Long>> {
41+
42+
@Override
43+
public Set<Long> visit(LeafPredicate predicate) {
44+
if (ROW_ID.name().equals(predicate.fieldName())) {
45+
LeafFunction function = predicate.function();
46+
if (function instanceof Equal || function instanceof In) {
47+
HashSet<Long> rowIds = new HashSet<>();
48+
for (Object literal : predicate.literals()) {
49+
rowIds.add((Long) literal);
50+
}
51+
return rowIds;
52+
}
53+
}
54+
return null;
55+
}
56+
57+
@Override
58+
public Set<Long> visit(CompoundPredicate predicate) {
59+
CompoundPredicate.Function function = predicate.function();
60+
HashSet<Long> rowIds = null;
61+
// `And` means we should get the intersection of all children.
62+
if (function instanceof And) {
63+
for (Predicate child : predicate.children()) {
64+
Set<Long> childSet = child.visit(this);
65+
if (childSet == null) {
66+
return null;
67+
}
68+
69+
if (rowIds == null) {
70+
rowIds = new HashSet<>(childSet);
71+
} else {
72+
rowIds.retainAll(childSet);
73+
}
74+
75+
// shortcut for intersection
76+
if (rowIds.isEmpty()) {
77+
return rowIds;
78+
}
79+
}
80+
} else if (function instanceof Or) {
81+
// `Or` means we should get the union of all children
82+
rowIds = new HashSet<>();
83+
for (Predicate child : predicate.children()) {
84+
Set<Long> childSet = child.visit(this);
85+
if (childSet == null) {
86+
return null;
87+
}
88+
89+
rowIds.addAll(childSet);
90+
}
91+
} else {
92+
// unexpected function type, just return null
93+
return null;
94+
}
95+
return rowIds;
96+
}
97+
98+
@Override
99+
public Set<Long> visit(TransformPredicate predicate) {
100+
// do not support transform predicate now.
101+
return null;
102+
}
103+
}

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818

1919
package org.apache.paimon.spark
2020

21-
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
22-
import org.apache.paimon.types.RowType
21+
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, RowIdPredicateVisitor}
22+
import org.apache.paimon.types.{DataField, DataTypes, RowType}
2323

2424
import org.apache.spark.sql.connector.read.SupportsPushDownFilters
2525
import org.apache.spark.sql.sources.Filter
2626

27-
import java.util.{List => JList}
27+
import java.lang.{Long => JLong}
28+
import java.util.{ArrayList, List => JList}
2829

30+
import scala.collection.JavaConverters._
2931
import scala.collection.mutable
3032

3133
/** Base trait for Paimon scan filter push down. */
@@ -35,6 +37,7 @@ trait PaimonBasePushDown extends SupportsPushDownFilters {
3537
protected var rowType: RowType
3638

3739
private var pushedSparkFilters = Array.empty[Filter]
40+
protected var pushedRowIds: Array[JLong] = null
3841
protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
3942
protected var reservedFilters: Array[Filter] = Array.empty
4043
protected var hasPostScanPredicates = false
@@ -45,30 +48,50 @@ trait PaimonBasePushDown extends SupportsPushDownFilters {
4548
* must be interpreted as ANDed together.
4649
*/
4750
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
48-
val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
51+
val pushableSparkFilters = mutable.ArrayBuffer.empty[Filter]
52+
val pushablePaimonPredicates = mutable.ArrayBuffer.empty[Predicate]
53+
var pushableRowIds: mutable.Set[JLong] = null
4954
val postScan = mutable.ArrayBuffer.empty[Filter]
5055
val reserved = mutable.ArrayBuffer.empty[Filter]
5156

52-
val converter = new SparkFilterConverter(rowType)
53-
val visitor = new PartitionPredicateVisitor(partitionKeys)
57+
val dataFieldWithRowId = new ArrayList[DataField](rowType.getFields)
58+
dataFieldWithRowId.add(new DataField(rowType.getFieldCount, "_ROW_ID", DataTypes.BIGINT()))
59+
val rowTypeWithRowId = rowType.copy(dataFieldWithRowId)
60+
val converter = new SparkFilterConverter(rowTypeWithRowId)
61+
val partitionVisitor = new PartitionPredicateVisitor(partitionKeys)
62+
val rowIdVisitor = new RowIdPredicateVisitor
63+
5464
filters.foreach {
5565
filter =>
5666
val predicate = converter.convertIgnoreFailure(filter)
5767
if (predicate == null) {
5868
postScan.append(filter)
5969
} else {
60-
pushable.append((filter, predicate))
61-
if (predicate.visit(visitor)) {
70+
if (predicate.visit(partitionVisitor)) {
71+
pushableSparkFilters.append(filter)
72+
pushablePaimonPredicates.append(predicate)
6273
reserved.append(filter)
74+
} else if (predicate.visit(rowIdVisitor) != null) {
75+
pushableSparkFilters.append(filter)
76+
if (pushableRowIds == null) {
77+
pushableRowIds = predicate.visit(rowIdVisitor).asScala
78+
} else {
79+
pushableRowIds.retain(predicate.visit(rowIdVisitor).asScala)
80+
}
6381
} else {
6482
postScan.append(filter)
6583
}
6684
}
6785
}
6886

69-
if (pushable.nonEmpty) {
70-
this.pushedSparkFilters = pushable.map(_._1).toArray
71-
this.pushedPaimonPredicates = pushable.map(_._2).toArray
87+
if (pushableSparkFilters.nonEmpty) {
88+
this.pushedSparkFilters = pushableSparkFilters.toArray
89+
}
90+
if (pushablePaimonPredicates.nonEmpty) {
91+
this.pushedPaimonPredicates = pushablePaimonPredicates.toArray
92+
}
93+
if (pushableRowIds != null) {
94+
this.pushedRowIds = pushableRowIds.toArray
7295
}
7396
if (reserved.nonEmpty) {
7497
this.reservedFilters = reserved.toArray

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@ import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering
2727
import org.apache.spark.sql.sources.{Filter, In}
2828
import org.apache.spark.sql.types.StructType
2929

30+
import java.lang.{Long => JLong}
31+
3032
import scala.collection.JavaConverters._
3133

3234
case class PaimonScan(
3335
table: InnerTable,
3436
requiredSchema: StructType,
3537
filters: Seq[Predicate],
38+
override val rowIds: Seq[JLong],
3639
reservedFilters: Seq[Filter],
3740
override val pushDownLimit: Option[Int],
3841
// no usage, just for compile compatibility

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ class PaimonScanBuilder(table: InnerTable)
3434
override protected var rowType: RowType = table.rowType()
3535

3636
override def build(): Scan = {
37-
PaimonScan(table, requiredSchema, pushedPaimonPredicates, reservedFilters, None, pushDownTopN)
37+
PaimonScan(
38+
table,
39+
requiredSchema,
40+
pushedPaimonPredicates,
41+
pushedRowIds,
42+
reservedFilters,
43+
None,
44+
pushDownTopN)
3845
}
3946
}

paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ import org.apache.spark.sql.connector.read.Scan
2424

2525
class PaimonSplitScanBuilder(table: KnownSplitsTable) extends PaimonScanBuilder(table) {
2626
override def build(): Scan = {
27-
PaimonSplitScan(table, table.splits(), requiredSchema, pushedPaimonPredicates)
27+
PaimonSplitScan(table, table.splits(), requiredSchema, pushedPaimonPredicates, pushedRowIds)
2828
}
2929
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}

paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ import org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning,
2929
import org.apache.spark.sql.sources.{Filter, In}
3030
import org.apache.spark.sql.types.StructType
3131

32+
import java.lang.{Long => JLong}
33+
3234
import scala.collection.JavaConverters._
3335

3436
case class PaimonScan(
3537
table: InnerTable,
3638
requiredSchema: StructType,
3739
filters: Seq[Predicate],
40+
override val rowIds: Seq[JLong],
3841
reservedFilters: Seq[Filter],
3942
override val pushDownLimit: Option[Int],
4043
override val pushDownTopN: Option[TopN],
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.sql
20+
21+
class RowIdPushDownTest extends RowIdPushDownTestBase {}

0 commit comments

Comments
 (0)