Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class BTreeIndexOptions {
.withDescription("The compression algorithm to use for BTreeIndex");

public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
ConfigOptions.key("btree-index.compression")
ConfigOptions.key("btree-index.compression-level")
.intType()
.defaultValue(1)
.withDescription("The compression level of the compression algorithm");
Expand All @@ -54,4 +54,16 @@ public class BTreeIndexOptions {
.doubleType()
.defaultValue(0.1)
.withDescription("The high priority pool ratio to use for BTreeIndex");

public static final ConfigOption<Long> BTREE_INDEX_RECORDS_PER_RANGE =
ConfigOptions.key("btree-index.records-per-range")
.longType()
.defaultValue(1000_000L)
.withDescription("The expected number of records per BTree Index File.");

public static final ConfigOption<Integer> BTREE_INDEX_BUILD_MAX_PARALLELISM =
ConfigOptions.key("btree-index.build.max-parallelism")
.intType()
.defaultValue(4096)
.withDescription("The max parallelism of Flink/Spark for building BTreeIndex.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# limitations under the License.

org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public InternalRow toRow(IndexFileMeta record) {
globalIndexMeta.rowRangeStart(),
globalIndexMeta.rowRangeEnd(),
globalIndexMeta.indexFieldId(),
globalIndexMeta.indexMeta() == null
globalIndexMeta.extraFieldIds() == null
? null
: new GenericArray(globalIndexMeta.extraFieldIds()),
globalIndexMeta.indexMeta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public InternalRow convertTo(IndexManifestEntry record) {
globalIndexMeta.rowRangeStart(),
globalIndexMeta.rowRangeEnd(),
globalIndexMeta.indexFieldId(),
globalIndexMeta.indexMeta() == null
globalIndexMeta.extraFieldIds() == null
? null
: new GenericArray(globalIndexMeta.extraFieldIds()),
globalIndexMeta.indexMeta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,60 @@

package org.apache.paimon.spark.globalindex;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.LongCounter;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/** Default {@link GlobalIndexBuilder}. */
public class DefaultGlobalIndexBuilder extends GlobalIndexBuilder {
public DefaultGlobalIndexBuilder(GlobalIndexBuilderContext context) {
super(context);
}

@Override
public List<CommitMessage> build(CloseableIterator<InternalRow> data) throws IOException {
LongCounter rowCounter = new LongCounter(0);
List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
List<IndexFileMeta> indexFileMetas =
convertToIndexMeta(
context.startOffset(),
context.startOffset() + rowCounter.getValue() - 1,
resultEntries);
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas);
return Collections.singletonList(
new CommitMessageImpl(
context.partition(),
0,
null,
dataIncrement,
CompactIncrement.emptyIncrement()));
}

private List<ResultEntry> writePaimonRows(
CloseableIterator<InternalRow> rows, LongCounter rowCounter) throws IOException {
GlobalIndexSingletonWriter indexWriter = (GlobalIndexSingletonWriter) createIndexWriter();

InternalRow.FieldGetter getter =
InternalRow.createFieldGetter(
context.indexField().type(),
context.readType().getFieldIndex(context.indexField().name()));
rows.forEachRemaining(
row -> {
Object indexO = getter.getFieldOrNull(row);
indexWriter.write(indexO);
rowCounter.add(1);
});
return indexWriter.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.CloseableIterator;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -47,35 +41,19 @@ protected GlobalIndexBuilder(GlobalIndexBuilderContext context) {
this.context = context;
}

public CommitMessage build(IndexedSplit indexedSplit) throws IOException {
ReadBuilder builder = context.table().newReadBuilder();
builder.withReadType(context.readType());
RecordReader<InternalRow> rows = builder.newRead().createReader(indexedSplit);
LongCounter rowCounter = new LongCounter(0);
List<ResultEntry> resultEntries = writePaimonRows(context, rows, rowCounter);
List<IndexFileMeta> indexFileMetas =
convertToIndexMeta(context, rowCounter.getValue(), resultEntries);
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
context.partition(), 0, null, dataIncrement, CompactIncrement.emptyIncrement());
}
public abstract List<CommitMessage> build(CloseableIterator<InternalRow> data)
throws IOException;

private static List<IndexFileMeta> convertToIndexMeta(
GlobalIndexBuilderContext context, long totalRowCount, List<ResultEntry> entries)
throws IOException {
protected List<IndexFileMeta> convertToIndexMeta(
long rangeStart, long rangeEnd, List<ResultEntry> entries) throws IOException {
List<IndexFileMeta> results = new ArrayList<>();
long rangeEnd = context.startOffset() + totalRowCount - 1;
for (ResultEntry entry : entries) {
String fileName = entry.fileName();
GlobalIndexFileReadWrite readWrite = context.globalIndexFileReadWrite();
long fileSize = readWrite.fileSize(fileName);
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(
context.startOffset(),
rangeEnd,
context.indexField().id(),
null,
entry.meta());
rangeStart, rangeEnd, context.indexField().id(), null, entry.meta());
IndexFileMeta indexFileMeta =
new IndexFileMeta(
context.indexType(),
Expand All @@ -88,26 +66,9 @@ private static List<IndexFileMeta> convertToIndexMeta(
return results;
}

private static List<ResultEntry> writePaimonRows(
GlobalIndexBuilderContext context,
RecordReader<InternalRow> rows,
LongCounter rowCounter)
throws IOException {
protected GlobalIndexWriter createIndexWriter() throws IOException {
GlobalIndexer globalIndexer =
GlobalIndexer.create(context.indexType(), context.indexField(), context.options());
GlobalIndexSingletonWriter globalIndexWriter =
(GlobalIndexSingletonWriter)
globalIndexer.createWriter(context.globalIndexFileReadWrite());
InternalRow.FieldGetter getter =
InternalRow.createFieldGetter(
context.indexField().type(),
context.readType().getFieldIndex(context.indexField().name()));
rows.forEachRemaining(
row -> {
Object indexO = getter.getFieldOrNull(row);
globalIndexWriter.write(indexO);
rowCounter.add(1);
});
return globalIndexWriter.finish();
return globalIndexer.createWriter(context.globalIndexFileReadWrite());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -36,45 +39,57 @@
*
* <p>This class is serializable to support Spark distributed execution. The partition is stored
* both as a transient {@link BinaryRow} and as serialized bytes to ensure proper serialization
* across executor nodes.
* across executor nodes. Partition info can be null if the actual index partition is dynamically
* extracted from data.
*/
public class GlobalIndexBuilderContext implements Serializable {

private final FileStoreTable table;
private final BinaryRowSerializer binaryRowSerializer;
private final byte[] partitionBytes;
@Nullable private final BinaryRowSerializer binaryRowSerializer;
@Nullable private final byte[] partitionBytes;
private final RowType readType;
private final DataField indexField;
private final String indexType;
private final long startOffset;
private final Options options;
@Nullable private final Range fullRange;

public GlobalIndexBuilderContext(
FileStoreTable table,
BinaryRow partition,
@Nullable BinaryRow partition,
RowType readType,
DataField indexField,
String indexType,
long startOffset,
Options options) {
Options options,
@Nullable Range fullRange) {
this.table = table;
this.readType = readType;
this.indexField = indexField;
this.indexType = indexType;
this.startOffset = startOffset;
this.options = options;
this.fullRange = fullRange;

this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount());
try {
this.partitionBytes = binaryRowSerializer.serializeToBytes(partition);
} catch (IOException e) {
throw new RuntimeException(e);
if (partition != null) {
this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount());
try {
this.partitionBytes = binaryRowSerializer.serializeToBytes(partition);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
this.binaryRowSerializer = null;
this.partitionBytes = null;
}
}

@Nullable
public BinaryRow partition() {
try {
return binaryRowSerializer.deserializeFromBytes(partitionBytes);
return partitionBytes == null || binaryRowSerializer == null
? null
: binaryRowSerializer.deserializeFromBytes(partitionBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -109,4 +124,9 @@ public GlobalIndexFileReadWrite globalIndexFileReadWrite() {
IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
}

@Nullable
public Range fullRange() {
return fullRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@

package org.apache.paimon.spark.globalindex;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/** User defined topology builder. */
public interface GlobalIndexTopoBuilder {

List<CommitMessage> buildIndex(
JavaSparkContext javaSparkContext,
SparkSession spark,
DataSourceV2Relation relation,
PartitionPredicate partitionPredicate,
FileStoreTable table,
Map<BinaryRow, List<IndexedSplit>> preparedDS,
String indexType,
RowType readType,
DataField indexField,
Expand Down
Loading
Loading