diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java index acac020db317..8d0758a1fcb1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java @@ -32,7 +32,7 @@ public class BTreeIndexOptions { .withDescription("The compression algorithm to use for BTreeIndex"); public static final ConfigOption BTREE_INDEX_COMPRESSION_LEVEL = - ConfigOptions.key("btree-index.compression") + ConfigOptions.key("btree-index.compression-level") .intType() .defaultValue(1) .withDescription("The compression level of the compression algorithm"); @@ -54,4 +54,16 @@ public class BTreeIndexOptions { .doubleType() .defaultValue(0.1) .withDescription("The high priority pool ratio to use for BTreeIndex"); + + public static final ConfigOption BTREE_INDEX_RECORDS_PER_RANGE = + ConfigOptions.key("btree-index.records-per-range") + .longType() + .defaultValue(1000_000L) + .withDescription("The expected number of records per BTree Index File."); + + public static final ConfigOption BTREE_INDEX_BUILD_MAX_PARALLELISM = + ConfigOptions.key("btree-index.build.max-parallelism") + .intType() + .defaultValue(4096) + .withDescription("The max parallelism of Flink/Spark for building BTreeIndex."); } diff --git a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory index e1e63b1057b2..4c3fe70db9c5 100644 --- a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory +++ b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory +org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java index 9e3ead690919..6d98e61248bb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java @@ -47,7 +47,7 @@ public InternalRow toRow(IndexFileMeta record) { globalIndexMeta.rowRangeStart(), globalIndexMeta.rowRangeEnd(), globalIndexMeta.indexFieldId(), - globalIndexMeta.indexMeta() == null + globalIndexMeta.extraFieldIds() == null ? null : new GenericArray(globalIndexMeta.extraFieldIds()), globalIndexMeta.indexMeta()); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java index cd7a1efd84ef..98ab4df6f136 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java @@ -54,7 +54,7 @@ public InternalRow convertTo(IndexManifestEntry record) { globalIndexMeta.rowRangeStart(), globalIndexMeta.rowRangeEnd(), globalIndexMeta.indexFieldId(), - globalIndexMeta.indexMeta() == null + globalIndexMeta.extraFieldIds() == null ? null : new GenericArray(globalIndexMeta.extraFieldIds()), globalIndexMeta.indexMeta()); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java index d2aa11b35c52..5ec81c73898f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java @@ -18,9 +18,60 @@ package org.apache.paimon.spark.globalindex; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.LongCounter; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + /** Default {@link GlobalIndexBuilder}. */ public class DefaultGlobalIndexBuilder extends GlobalIndexBuilder { public DefaultGlobalIndexBuilder(GlobalIndexBuilderContext context) { super(context); } + + @Override + public List build(CloseableIterator data) throws IOException { + LongCounter rowCounter = new LongCounter(0); + List resultEntries = writePaimonRows(data, rowCounter); + List indexFileMetas = + convertToIndexMeta( + context.startOffset(), + context.startOffset() + rowCounter.getValue() - 1, + resultEntries); + DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas); + return Collections.singletonList( + new CommitMessageImpl( + context.partition(), + 0, + null, + dataIncrement, + CompactIncrement.emptyIncrement())); + } + + private List writePaimonRows( + CloseableIterator rows, LongCounter rowCounter) throws IOException { + GlobalIndexSingletonWriter indexWriter = (GlobalIndexSingletonWriter) createIndexWriter(); + + InternalRow.FieldGetter getter = + InternalRow.createFieldGetter( + context.indexField().type(), + context.readType().getFieldIndex(context.indexField().name())); + rows.forEachRemaining( + row -> { + Object indexO = getter.getFieldOrNull(row); + indexWriter.write(indexO); + rowCounter.add(1); + }); + return indexWriter.finish(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java index 1bc3b1b8d123..621f5205aef7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java @@ -20,19 +20,13 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.globalindex.GlobalIndexFileReadWrite; -import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.GlobalIndexer; -import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; -import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.CommitMessageImpl; -import org.apache.paimon.table.source.ReadBuilder; -import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.CloseableIterator; import java.io.IOException; import java.util.ArrayList; @@ -47,35 +41,19 @@ protected GlobalIndexBuilder(GlobalIndexBuilderContext context) { this.context = context; } - public CommitMessage build(IndexedSplit indexedSplit) throws IOException { - ReadBuilder builder = context.table().newReadBuilder(); - builder.withReadType(context.readType()); - RecordReader rows = builder.newRead().createReader(indexedSplit); - LongCounter rowCounter = new LongCounter(0); - List resultEntries = writePaimonRows(context, rows, rowCounter); - List indexFileMetas = - convertToIndexMeta(context, rowCounter.getValue(), resultEntries); - DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas); - return new CommitMessageImpl( - context.partition(), 0, null, dataIncrement, CompactIncrement.emptyIncrement()); - } + public abstract List build(CloseableIterator data) + throws IOException; - private static List convertToIndexMeta( - GlobalIndexBuilderContext context, long totalRowCount, List entries) - throws IOException { + protected List convertToIndexMeta( + long rangeStart, long rangeEnd, List entries) throws IOException { List results = new ArrayList<>(); - long rangeEnd = context.startOffset() + totalRowCount - 1; for (ResultEntry entry : entries) { String fileName = entry.fileName(); GlobalIndexFileReadWrite readWrite = context.globalIndexFileReadWrite(); long fileSize = readWrite.fileSize(fileName); GlobalIndexMeta globalIndexMeta = new GlobalIndexMeta( - context.startOffset(), - rangeEnd, - context.indexField().id(), - null, - entry.meta()); + rangeStart, rangeEnd, context.indexField().id(), null, entry.meta()); IndexFileMeta indexFileMeta = new IndexFileMeta( context.indexType(), @@ -88,26 +66,9 @@ private static List convertToIndexMeta( return results; } - private static List writePaimonRows( - GlobalIndexBuilderContext context, - RecordReader rows, - LongCounter rowCounter) - throws IOException { + protected GlobalIndexWriter createIndexWriter() throws IOException { GlobalIndexer globalIndexer = GlobalIndexer.create(context.indexType(), context.indexField(), context.options()); - GlobalIndexSingletonWriter globalIndexWriter = - (GlobalIndexSingletonWriter) - globalIndexer.createWriter(context.globalIndexFileReadWrite()); - InternalRow.FieldGetter getter = - InternalRow.createFieldGetter( - context.indexField().type(), - context.readType().getFieldIndex(context.indexField().name())); - rows.forEachRemaining( - row -> { - Object indexO = getter.getFieldOrNull(row); - globalIndexWriter.write(indexO); - rowCounter.add(1); - }); - return globalIndexWriter.finish(); + return globalIndexer.createWriter(context.globalIndexFileReadWrite()); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java index 8e4a4d7fabfb..dbf2f9a74a85 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java @@ -27,6 +27,9 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Range; + +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; @@ -36,45 +39,57 @@ * *

This class is serializable to support Spark distributed execution. The partition is stored * both as a transient {@link BinaryRow} and as serialized bytes to ensure proper serialization - * across executor nodes. + * across executor nodes. Partition info can be null if the actual index partition is dynamically + * extracted from data. */ public class GlobalIndexBuilderContext implements Serializable { private final FileStoreTable table; - private final BinaryRowSerializer binaryRowSerializer; - private final byte[] partitionBytes; + @Nullable private final BinaryRowSerializer binaryRowSerializer; + @Nullable private final byte[] partitionBytes; private final RowType readType; private final DataField indexField; private final String indexType; private final long startOffset; private final Options options; + @Nullable private final Range fullRange; public GlobalIndexBuilderContext( FileStoreTable table, - BinaryRow partition, + @Nullable BinaryRow partition, RowType readType, DataField indexField, String indexType, long startOffset, - Options options) { + Options options, + @Nullable Range fullRange) { this.table = table; this.readType = readType; this.indexField = indexField; this.indexType = indexType; this.startOffset = startOffset; this.options = options; + this.fullRange = fullRange; - this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount()); - try { - this.partitionBytes = binaryRowSerializer.serializeToBytes(partition); - } catch (IOException e) { - throw new RuntimeException(e); + if (partition != null) { + this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount()); + try { + this.partitionBytes = binaryRowSerializer.serializeToBytes(partition); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + this.binaryRowSerializer = null; + this.partitionBytes = null; } } + @Nullable public BinaryRow partition() { try { - return binaryRowSerializer.deserializeFromBytes(partitionBytes); + return partitionBytes == null || binaryRowSerializer == null + ? null + : binaryRowSerializer.deserializeFromBytes(partitionBytes); } catch (IOException e) { throw new RuntimeException(e); } @@ -109,4 +124,9 @@ public GlobalIndexFileReadWrite globalIndexFileReadWrite() { IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); return new GlobalIndexFileReadWrite(fileIO, indexPathFactory); } + + @Nullable + public Range fullRange() { + return fullRange; + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java index dfa5916cd5b3..7e4d6ff8b801 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java @@ -18,27 +18,27 @@ package org.apache.paimon.spark.globalindex; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import java.io.IOException; import java.util.List; -import java.util.Map; /** User defined topology builder. */ public interface GlobalIndexTopoBuilder { List buildIndex( - JavaSparkContext javaSparkContext, + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, FileStoreTable table, - Map> preparedDS, String indexType, RowType readType, DataField indexField, diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java new file mode 100644 index 000000000000..f9443b0bdc65 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The {@link GlobalIndexBuilder} implementation for BTree Index. The caller of {@link + * BTreeGlobalIndexBuilder#build(CloseableIterator) build} must ensure the input data is sorted by + * partitions and indexed field. + */ +public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder { + private static final double FLOATING = 1.2; + + private final IndexFieldsExtractor extractor; + private final long recordsPerRange; + private BinaryRow currentPart = null; + private GlobalIndexParallelWriter currentWriter = null; + private LongCounter counter = new LongCounter(); + + protected BTreeGlobalIndexBuilder(GlobalIndexBuilderContext context) { + super(context); + Preconditions.checkNotNull( + context.fullRange(), "Full range cannot be null for BTreeGlobalIndexBuilder."); + + FileStoreTable table = context.table(); + List readColumns = new ArrayList<>(table.partitionKeys()); + readColumns.addAll(context.readType().getFieldNames()); + this.extractor = + new IndexFieldsExtractor( + SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns), + table.partitionKeys(), + context.indexField().name()); + + // Each partition boundary is derived from sampling, so we introduce a slack factor + // to avoid generating too many small files due to sampling variance. + this.recordsPerRange = + (long) + (context.options().get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) + * FLOATING); + } + + @Override + public List build(CloseableIterator data) throws IOException { + List commitMessages = new ArrayList<>(); + + while (data.hasNext()) { + InternalRow row = data.next(); + + BinaryRow partRow = extractor.extractPartition(row); + // may flush last part data + // this is correct only if the input is sorted by + if (currentPart != null && !partRow.equals(currentPart) + || counter.getValue() >= recordsPerRange) { + flushIndex(commitMessages); + } + + createWriterIfNeeded(); + + // write pair to index file + currentPart = partRow; + counter.add(1); + currentWriter.write(extractor.extractIndexField(row), extractor.extractRowId(row)); + } + + flushIndex(commitMessages); + + return commitMessages; + } + + private void flushIndex(List resultMessages) throws IOException { + if (counter.getValue() == 0 || currentWriter == null || currentPart == null) { + return; + } + + List resultEntries = currentWriter.finish(); + List fileMetas = + convertToIndexMeta(context.fullRange().from, context.fullRange().to, resultEntries); + DataIncrement dataIncrement = DataIncrement.indexIncrement(fileMetas); + CommitMessage commitMessage = + new CommitMessageImpl( + currentPart, 0, null, dataIncrement, CompactIncrement.emptyIncrement()); + + // reset writer + currentWriter = null; + currentPart = null; + counter.reset(); + + resultMessages.add(commitMessage); + } + + private void createWriterIfNeeded() throws IOException { + if (currentWriter == null) { + GlobalIndexWriter indexWriter = createIndexWriter(); + if (!(indexWriter instanceof GlobalIndexParallelWriter)) { + throw new RuntimeException( + "Unexpected implementation, the index writer of BTree should be an instance of GlobalIndexParallelWriter, but found: " + + indexWriter.getClass().getName()); + } + currentWriter = (GlobalIndexParallelWriter) indexWriter; + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java new file mode 100644 index 000000000000..38f09979154a --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory; +import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder; + +/** The {@link GlobalIndexBuilderFactory} implementation for BTree index type. */ +public class BTreeGlobalIndexBuilderFactory implements GlobalIndexBuilderFactory { + + // keep this identifier consistent with BTreeGlobalIndexerFactory + private static final String IDENTIFIER = "btree"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public GlobalIndexBuilder create(GlobalIndexBuilderContext context) { + return new BTreeGlobalIndexBuilder(context); + } + + @Override + public GlobalIndexTopoBuilder createTopoBuilder() { + return new BTreeIndexTopoBuilder(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java new file mode 100644 index 000000000000..e898a0e16856 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.spark.SparkRow; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils; +import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder; +import org.apache.paimon.spark.util.ScanPlanHelper$; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.Range; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.PaimonUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.functions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** The {@link GlobalIndexTopoBuilder} for BTree index. */ +public class BTreeIndexTopoBuilder implements GlobalIndexTopoBuilder { + + @Override + public List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + DataField indexField, + Options options) + throws IOException { + + // 1. read the whole dataset of target partitions + SnapshotReader snapshotReader = table.newSnapshotReader(); + if (partitionPredicate != null) { + snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate); + } + + List dataSplits = snapshotReader.read().dataSplits(); + Range fullRange = calcRowRange(dataSplits); + if (dataSplits.isEmpty() || fullRange == null) { + return Collections.emptyList(); + } + + // we need to read all partition columns for shuffle + List selectedColumns = new ArrayList<>(); + selectedColumns.addAll(table.partitionKeys()); + selectedColumns.addAll(readType.getFieldNames()); + + Dataset source = + PaimonUtils.createDataset( + spark, + ScanPlanHelper$.MODULE$.createNewScanPlan( + dataSplits.toArray(new DataSplit[0]), relation)); + + Dataset selected = + source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new)); + + // 2. shuffle and sort by partitions and index keys + Column[] sortFields = + selectedColumns.stream() + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .map(functions::col) + .toArray(Column[]::new); + + long recordsPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); + // this should be superfast since append only table can utilize count-start pushdown well. + long rowCount = source.count(); + int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1); + int maxParallelism = options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM); + partitionNum = Math.min(partitionNum, maxParallelism); + + // For efficiency, we do not repartition within each paimon partition. Instead, we directly + // divide ranges by , and each subtask is expected to process + // records from multiple partitions. The drawback is that if a Paimon partition spans + // multiple Spark partitions, the first and last output files may contain relatively few + // records. + Dataset partitioned = + selected.repartitionByRange(partitionNum, sortFields) + .sortWithinPartitions(sortFields); + + // 3. write index for each partition & range + final GlobalIndexBuilderContext context = + new GlobalIndexBuilderContext( + table, null, readType, indexField, indexType, 0, options, fullRange); + final RowType rowType = + SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns); + JavaRDD written = + partitioned + .javaRDD() + .map(row -> (InternalRow) (new SparkRow(rowType, row))) + .mapPartitions( + (FlatMapFunction, byte[]>) + iter -> { + CommitMessageSerializer commitMessageSerializer = + new CommitMessageSerializer(); + + GlobalIndexBuilder globalIndexBuilder = + GlobalIndexBuilderFactoryUtils + .createIndexBuilder(context); + + List commitMessages = + globalIndexBuilder.build( + CloseableIterator.adapterForIterator( + iter)); + List messageBytes = new ArrayList<>(); + + for (CommitMessage commitMessage : commitMessages) { + messageBytes.add( + commitMessageSerializer.serialize( + commitMessage)); + } + + return messageBytes.iterator(); + }); + + // 4. collect all commit messages and return + List commitBytes = written.collect(); + List result = new ArrayList<>(); + CommitMessageSerializer commitMessageSerializer = new CommitMessageSerializer(); + for (byte[] commitByte : commitBytes) { + result.add( + commitMessageSerializer.deserialize( + commitMessageSerializer.getVersion(), commitByte)); + } + + return result; + } + + private Range calcRowRange(List dataSplits) { + long start = Long.MAX_VALUE; + long end = Long.MIN_VALUE; + for (DataSplit dataSplit : dataSplits) { + for (DataFileMeta file : dataSplit.dataFiles()) { + if (file.firstRowId() != null) { + start = Math.min(start, file.firstRowId()); + end = Math.max(end, file.firstRowId() + file.rowCount()); + } + } + } + return start == Long.MAX_VALUE ? null : new Range(start, end); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java new file mode 100644 index 000000000000..bbe328d9d58d --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.Projection; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** The extractor to get partition, index field and row id from records. */ +public class IndexFieldsExtractor { + private final Projection partitionProjection; + private final InternalRow.FieldGetter indexFieldGetter; + private final int rowIdPos; + + public IndexFieldsExtractor(RowType readType, List partitionKeys, String indexField) { + this.partitionProjection = CodeGenUtils.newProjection(readType, partitionKeys); + int indexFieldPos = readType.getFieldIndex(indexField); + this.indexFieldGetter = + InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos); + this.rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name()); + } + + public BinaryRow extractPartition(InternalRow record) { + // projection will reuse returning record, copy is necessary + return partitionProjection.apply(record).copy(); + } + + @Nullable + public Object extractIndexField(InternalRow record) { + return indexFieldGetter.getFieldOrNull(record); + } + + public Long extractRowId(InternalRow record) { + return record.getLong(rowIdPos); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index 48a5b9d7b6c8..9b7c3e98bef7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils; @@ -36,11 +37,14 @@ import org.apache.paimon.table.sink.CommitMessageSerializer; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.ProcedureUtils; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.StringUtils; @@ -164,33 +168,39 @@ public InternalRow[] call(InternalRow args) { ProcedureUtils.putAllOptions(parsedOptions, optionString); Options userOptions = Options.fromMap(parsedOptions); Options tableOptions = new Options(table.options()); - long rowsPerShard = - tableOptions - .getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD) - .orElse(GLOBAL_INDEX_ROW_COUNT_PER_SHARD.defaultValue()); - checkArgument( - rowsPerShard > 0, - "Option 'global-index.row-count-per-shard' must be greater than 0."); - - // Step 1: generate splits for each partition&&shard - Map> splits = - split(table, partitionPredicate, rowsPerShard); List indexResults; - // Step 2: build index by certain index system - GlobalIndexTopoBuilder topoBuildr = + // Step 1: build index by certain index system + GlobalIndexTopoBuilder topoBuilder = GlobalIndexBuilderFactoryUtils.createTopoBuilder(indexType); - if (topoBuildr != null) { + + if (topoBuilder != null) { + // do not need to prepare index shards for custom topo builder indexResults = - topoBuildr.buildIndex( - new JavaSparkContext(spark().sparkContext()), + topoBuilder.buildIndex( + spark(), + relation, + partitionPredicate, table, - splits, indexType, readRowType, indexField, userOptions); } else { + long rowsPerShard = + tableOptions + .getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD) + .orElse( + GLOBAL_INDEX_ROW_COUNT_PER_SHARD + .defaultValue()); + checkArgument( + rowsPerShard > 0, + "Option 'global-index.row-count-per-shard' must be greater than 0."); + + // generate splits for each partition&&shard + Map> splits = + split(table, partitionPredicate, rowsPerShard); + indexResults = buildIndex( table, @@ -201,7 +211,7 @@ public InternalRow[] call(InternalRow args) { userOptions); } - // Step 3: commit index meta to a new snapshot + // Step 2: commit index meta to a new snapshot commit(table, indexResults); return new InternalRow[] {newInternalRow(true)}; @@ -241,7 +251,8 @@ private List buildIndex( indexField, indexType, indexedSplit.rowRanges().get(0).from, - options); + options, + null); byte[] dsBytes = InstantiationUtil.serializeObject(indexedSplit); taskList.add(Pair.of(builderContext, dsBytes)); @@ -264,8 +275,20 @@ private List buildIndex( GlobalIndexBuilder globalIndexBuilder = GlobalIndexBuilderFactoryUtils.createIndexBuilder( builderContext); - return commitMessageSerializer.serialize( - globalIndexBuilder.build(split)); + ReadBuilder builder = builderContext.table().newReadBuilder(); + builder.withReadType(builderContext.readType()); + + try (RecordReader + recordReader = + builder.newRead().createReader(split); + CloseableIterator + data = recordReader.toCloseableIterator()) { + List commitMessage = + globalIndexBuilder.build(data); + Preconditions.checkState(commitMessage.size() == 1); + return commitMessageSerializer.serialize( + commitMessage.get(0)); + } }) .collect(); diff --git a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory new file mode 100644 index 000000000000..e99a1bdb2e12 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.spark.globalindex.btree.BTreeGlobalIndexBuilderFactory \ No newline at end of file diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala index e4ecff618af7..23ed2921c664 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala @@ -18,7 +18,10 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.globalindex.btree.{BTreeIndexMeta, KeySerializer} +import org.apache.paimon.memory.MemorySlice import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.types.VarCharType import org.apache.paimon.utils.Range import org.apache.spark.sql.streaming.StreamTest @@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest assert(totalRowCount == 189088L) } } + + test("create btree global index") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'btree-index.records-per-range' = '1000') + |""".stripMargin) + + val values = + (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + val table = loadTable("T") + val btreeEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + .map(_.indexFile()) + table.store().newGlobalIndexScanBuilder().shardList() + assert(btreeEntries.nonEmpty) + + // 1. assert total row count and file count + val totalRowCount = btreeEntries.map(_.rowCount()).sum + assert(btreeEntries.size == 100) + assert(totalRowCount == 100000L) + + // 2. assert global index meta not null + btreeEntries.foreach(e => assert(e.globalIndexMeta() != null)) + + // 3. assert btree index file range non-overlapping + case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object) + val keySerializer = KeySerializer.create(new VarCharType()) + val comparator = keySerializer.createComparator() + + def deserialize(bytes: Array[Byte]): Object = { + keySerializer.deserialize(MemorySlice.wrap(bytes)) + } + + val btreeMetas = btreeEntries + .map(_.globalIndexMeta().indexMeta()) + .map(meta => BTreeIndexMeta.deserialize(meta)) + .map( + m => { + assert(m.getFirstKey != null) + assert(m.getLastKey != null) + MetaWithKey(m, deserialize(m.getFirstKey), deserialize(m.getLastKey)) + }) + + // sort by first key + val sorted = btreeMetas.sortWith((m1, m2) => comparator.compare(m1.first, m2.first) < 0) + + // should not overlap + sorted.sliding(2).foreach { + case Seq(prev: MetaWithKey, next: MetaWithKey) => + assert(comparator.compare(prev.last, next.first) <= 0) + case _ => // ignore + } + } + } + + test("create btree global index with multiple partitions") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + + assertMultiplePartitionsResult("T", 189088L, 3) + } + } + + test("create btree index within one spark partition") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + // force output parallelism = 1 + val output = + spark + .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000,btree-index.build.max-parallelism=1')") + .collect() + .head + + assert(output.getBoolean(0)) + + assertMultiplePartitionsResult("T", 100000L, 2) + } + } + + private def assertMultiplePartitionsResult( + tableName: String, + rowCount: Long, + partCount: Int + ): Unit = { + val table = loadTable(tableName) + val btreeEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + table.store().newGlobalIndexScanBuilder().shardList() + assert(btreeEntries.nonEmpty) + + // 1. assert total row count + val totalRowCount = btreeEntries.map(_.indexFile().rowCount()).sum + assert(totalRowCount == rowCount) + + // 2. assert global index meta not null + btreeEntries.foreach(e => assert(e.indexFile().globalIndexMeta() != null)) + + // 3. assert non-overlapped within each partition + val entriesByPart = btreeEntries.groupBy(_.partition()) + assert(entriesByPart.size == partCount) + + case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object) + val keySerializer = KeySerializer.create(new VarCharType()) + val comparator = keySerializer.createComparator() + + def deserialize(bytes: Array[Byte]): Object = { + keySerializer.deserialize(MemorySlice.wrap(bytes)) + } + + for ((k, v) <- entriesByPart) { + val metas = v + .map(_.indexFile().globalIndexMeta().indexMeta()) + .map(bytes => BTreeIndexMeta.deserialize(bytes)) + .map( + m => { + assert(m.getFirstKey != null) + assert(m.getLastKey != null) + MetaWithKey(m, deserialize(m.getFirstKey), deserialize(m.getLastKey)) + }) + + val sorted = metas.sortWith((m1, m2) => comparator.compare(m1.first, m2.first) < 0) + + // should not overlap + sorted.sliding(2).foreach { + case Seq(prev: MetaWithKey, next: MetaWithKey) => + assert( + comparator.compare(prev.last, next.first) <= 0, + s"Found overlap for partition ${k.getString(0).toString}. The last key ${prev.last}, next first key ${next.first}" + ) + case _ => // ignore + } + } + } }