Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,12 @@
<td>String</td>
<td>Time field for record level expire. It supports the following types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
<tr>
<td><h5>row-id-push-down.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable row id push down for scan. Currently, only the data evolution table supports row id push down.</td>
</tr>
<tr>
<td><h5>row-tracking.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,14 @@ public InlineElement getDescription() {
.withDescription(
"Whether to try upgrading the data files after overwriting a primary key table.");

public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
key("row-id-push-down.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this option, it is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this option, it is useless.

I think this option is necessary, it disables rowId pushdown for non-data-evolution tables. Currently, due to the issue in #6747, rowId pushdown should not be enabled for non-data-evolution tables.

.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable row id push down for scan."
+ " Currently, only the data evolution table supports row id push down.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -3330,6 +3338,10 @@ public boolean overwriteUpgrade() {
return options.get(OVERWRITE_UPGRADE);
}

public boolean rowIdPushDownEnabled() {
return options.get(ROW_ID_PUSH_DOWN_ENABLED);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.predicate;

import org.apache.paimon.utils.Range;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.table.SpecialFields.ROW_ID;

/**
* The {@link PredicateVisitor} to extract a list of row id ranges from predicates. The returned row
* id ranges can be pushed down to manifest readers and file readers to enable efficient random
* access.
*
* <p>Note that there is a significant distinction between returning {@code null} and returning an
* empty list:
*
* <ul>
* <li>{@code null} indicates that the predicate cannot be converted into a random-access pattern,
* meaning the filter is not consumable by this visitor.
* <li>An empty list indicates that no rows satisfy the predicate (e.g. {@code WHERE _ROW_ID = 3
* AND _ROW_ID IN (1, 2)}).
* </ul>
*/
public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {

@Override
public List<Range> visit(LeafPredicate predicate) {
if (ROW_ID.name().equals(predicate.fieldName())) {
LeafFunction function = predicate.function();
if (function instanceof Equal || function instanceof In) {
ArrayList<Long> rowIds = new ArrayList<>();
for (Object literal : predicate.literals()) {
rowIds.add((Long) literal);
}
// The list output by getRangesFromList is already sorted,
// and has no overlap
return Range.getRangesFromList(rowIds);
}
}
return null;
}

@Override
public List<Range> visit(CompoundPredicate predicate) {
CompoundPredicate.Function function = predicate.function();
List<Range> rowIds = null;
// `And` means we should get the intersection of all children.
if (function instanceof And) {
for (Predicate child : predicate.children()) {
List<Range> childList = child.visit(this);
if (childList == null) {
continue;
}

if (rowIds == null) {
rowIds = childList;
} else {
rowIds = Range.and(rowIds, childList);
}

// shortcut for intersection
if (rowIds.isEmpty()) {
return rowIds;
}
}
} else if (function instanceof Or) {
// `Or` means we should get the union of all children
rowIds = new ArrayList<>();
for (Predicate child : predicate.children()) {
List<Range> childList = child.visit(this);
if (childList == null) {
return null;
}

rowIds.addAll(childList);
rowIds = Range.sortAndMergeOverlap(rowIds, true);
}
} else {
// unexpected function type, just return null
return null;
}
return rowIds;
}

@Override
public List<Range> visit(TransformPredicate predicate) {
// do not support transform predicate now.
return null;
}
}
29 changes: 29 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/** Range represents from (inclusive) and to (inclusive). */
public class Range implements Serializable {
Expand Down Expand Up @@ -177,6 +178,34 @@ public static List<Range> mergeSortedAsPossible(List<Range> ranges) {
return result;
}

public static List<Range> getRangesFromList(List<Long> origLongs) {
if (origLongs == null || origLongs.isEmpty()) {
return Collections.emptyList();
}

List<Long> longs = origLongs.stream().distinct().sorted().collect(Collectors.toList());

ArrayList<Range> ranges = new ArrayList<>();
Long rangeStart = null;
Long rangeEnd = null;
for (Long cur : longs) {
if (rangeStart == null) {
rangeStart = cur;
rangeEnd = cur;
} else if (rangeEnd == cur - 1) {
rangeEnd = cur;
} else {
ranges.add(new Range(rangeStart, rangeEnd));
rangeStart = cur;
rangeEnd = cur;
}
}
if (rangeStart != null) {
ranges.add(new Range(rangeStart, rangeEnd));
}
return ranges;
}

/**
* Computes the intersection of two lists of ranges.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,12 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
"Data evolution config must disabled with deletion-vectors.enabled");
}

if (options.rowIdPushDownEnabled()) {
checkArgument(
options.dataEvolutionEnabled(),
"Row id push down config must enabled with data-evolution.enabled");
}

List<String> blobNames =
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
if (!blobNames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.paimon.table.source;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.RowIdPredicateVisitor;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.predicate.VectorSearch;
import org.apache.paimon.table.InnerTable;
Expand All @@ -33,12 +37,14 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static org.apache.paimon.utils.Preconditions.checkState;

/** Implementation for {@link ReadBuilder}. */
Expand All @@ -65,7 +71,7 @@ public class ReadBuilderImpl implements ReadBuilder {

private @Nullable RowType readType;
private @Nullable VariantAccessInfo[] variantAccessInfo;
private @Nullable List<Range> rowRanges;
public @Nullable @VisibleForTesting List<Range> rowRanges;
private @Nullable VectorSearch vectorSearch;

private boolean dropStats = false;
Expand Down Expand Up @@ -99,9 +105,55 @@ public ReadBuilder withFilter(Predicate filter) {
} else {
this.filter = PredicateBuilder.and(this.filter, filter);
}
calculateRowRanges(this.filter);
this.filter = removeRowIdFilter(this.filter);
return this;
}

private void calculateRowRanges(Predicate filter) {
if (filter == null) {
return;
}

RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
List<Range> ranges = filter.visit(visitor);
// When rowRanges is not null, filter data based on rowRanges.
// If rowRanges is empty, it means no data will be read.
if (ranges != null) {
withRowRanges(ranges);
}
}

private Predicate removeRowIdFilter(Predicate filter) {
if (filter == null) {
return null;
}

if (filter instanceof LeafPredicate
&& ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) {
return null;
} else if (filter instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) filter;

List<Predicate> newChildren = new ArrayList<>();
for (Predicate child : compoundPredicate.children()) {
Predicate newChild = removeRowIdFilter(child);
if (newChild != null) {
newChildren.add(newChild);
}
}

if (newChildren.isEmpty()) {
return null;
} else if (newChildren.size() == 1) {
return newChildren.get(0);
} else {
return new CompoundPredicate(compoundPredicate.function(), newChildren);
}
}
return filter;
}

@Override
public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
if (partitionSpec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.paimon.spark

import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
import org.apache.paimon.table.Table
import org.apache.paimon.types.RowType
import org.apache.paimon.table.{InnerTable, Table}
import org.apache.paimon.table.SpecialFields.ROW_ID
import org.apache.paimon.types.{DataField, DataTypes, RowType}

import org.apache.spark.sql.connector.read.{SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

import java.util.{List => JList}
import java.util.{ArrayList, List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -41,6 +43,7 @@ abstract class PaimonBaseScanBuilder
val table: Table
val partitionKeys: JList[String] = table.partitionKeys()
val rowType: RowType = table.rowType()
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())

private var pushedSparkFilters = Array.empty[Filter]
protected var hasPostScanPredicates = false
Expand All @@ -65,7 +68,14 @@ abstract class PaimonBaseScanBuilder
val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
val postScan = mutable.ArrayBuffer.empty[Filter]

val converter = new SparkFilterConverter(rowType)
var newRowType = rowType
if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
dataFieldsWithRowId.add(
new DataField(rowType.getFieldCount, ROW_ID.name(), DataTypes.BIGINT()))
newRowType = rowType.copy(dataFieldsWithRowId)
}
val converter = new SparkFilterConverter(newRowType)
val partitionPredicateVisitor = new PartitionPredicateVisitor(partitionKeys)
filters.foreach {
filter =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class RowIdPushDownTest extends RowIdPushDownTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class RowIdPushDownTest extends RowIdPushDownTestBase {}
Loading
Loading