Skip to content

Commit 22bc3ab

Browse files
authored
[spark] support building BTree index through procedure (#6956)
1 parent 0e0bd39 commit 22bc3ab

File tree

15 files changed

+820
-89
lines changed

15 files changed

+820
-89
lines changed

paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class BTreeIndexOptions {
3232
.withDescription("The compression algorithm to use for BTreeIndex");
3333

3434
public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
35-
ConfigOptions.key("btree-index.compression")
35+
ConfigOptions.key("btree-index.compression-level")
3636
.intType()
3737
.defaultValue(1)
3838
.withDescription("The compression level of the compression algorithm");
@@ -54,4 +54,16 @@ public class BTreeIndexOptions {
5454
.doubleType()
5555
.defaultValue(0.1)
5656
.withDescription("The high priority pool ratio to use for BTreeIndex");
57+
58+
public static final ConfigOption<Long> BTREE_INDEX_RECORDS_PER_RANGE =
59+
ConfigOptions.key("btree-index.records-per-range")
60+
.longType()
61+
.defaultValue(1000_000L)
62+
.withDescription("The expected number of records per BTree Index File.");
63+
64+
public static final ConfigOption<Integer> BTREE_INDEX_BUILD_MAX_PARALLELISM =
65+
ConfigOptions.key("btree-index.build.max-parallelism")
66+
.intType()
67+
.defaultValue(4096)
68+
.withDescription("The max parallelism of Flink/Spark for building BTreeIndex.");
5769
}

paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414
# limitations under the License.
1515

1616
org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
17+
org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory

paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public InternalRow toRow(IndexFileMeta record) {
4747
globalIndexMeta.rowRangeStart(),
4848
globalIndexMeta.rowRangeEnd(),
4949
globalIndexMeta.indexFieldId(),
50-
globalIndexMeta.indexMeta() == null
50+
globalIndexMeta.extraFieldIds() == null
5151
? null
5252
: new GenericArray(globalIndexMeta.extraFieldIds()),
5353
globalIndexMeta.indexMeta());

paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public InternalRow convertTo(IndexManifestEntry record) {
5454
globalIndexMeta.rowRangeStart(),
5555
globalIndexMeta.rowRangeEnd(),
5656
globalIndexMeta.indexFieldId(),
57-
globalIndexMeta.indexMeta() == null
57+
globalIndexMeta.extraFieldIds() == null
5858
? null
5959
: new GenericArray(globalIndexMeta.extraFieldIds()),
6060
globalIndexMeta.indexMeta());

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,60 @@
1818

1919
package org.apache.paimon.spark.globalindex;
2020

21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
23+
import org.apache.paimon.globalindex.ResultEntry;
24+
import org.apache.paimon.index.IndexFileMeta;
25+
import org.apache.paimon.io.CompactIncrement;
26+
import org.apache.paimon.io.DataIncrement;
27+
import org.apache.paimon.table.sink.CommitMessage;
28+
import org.apache.paimon.table.sink.CommitMessageImpl;
29+
import org.apache.paimon.utils.CloseableIterator;
30+
import org.apache.paimon.utils.LongCounter;
31+
32+
import java.io.IOException;
33+
import java.util.Collections;
34+
import java.util.List;
35+
2136
/** Default {@link GlobalIndexBuilder}. */
2237
public class DefaultGlobalIndexBuilder extends GlobalIndexBuilder {
2338
public DefaultGlobalIndexBuilder(GlobalIndexBuilderContext context) {
2439
super(context);
2540
}
41+
42+
@Override
43+
public List<CommitMessage> build(CloseableIterator<InternalRow> data) throws IOException {
44+
LongCounter rowCounter = new LongCounter(0);
45+
List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
46+
List<IndexFileMeta> indexFileMetas =
47+
convertToIndexMeta(
48+
context.startOffset(),
49+
context.startOffset() + rowCounter.getValue() - 1,
50+
resultEntries);
51+
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas);
52+
return Collections.singletonList(
53+
new CommitMessageImpl(
54+
context.partition(),
55+
0,
56+
null,
57+
dataIncrement,
58+
CompactIncrement.emptyIncrement()));
59+
}
60+
61+
private List<ResultEntry> writePaimonRows(
62+
CloseableIterator<InternalRow> rows, LongCounter rowCounter) throws IOException {
63+
GlobalIndexSingletonWriter indexWriter = (GlobalIndexSingletonWriter) createIndexWriter();
64+
65+
InternalRow.FieldGetter getter =
66+
InternalRow.createFieldGetter(
67+
context.indexField().type(),
68+
context.readType().getFieldIndex(context.indexField().name()));
69+
rows.forEachRemaining(
70+
row -> {
71+
Object indexO = getter.getFieldOrNull(row);
72+
indexWriter.write(indexO);
73+
rowCounter.add(1);
74+
});
75+
return indexWriter.finish();
76+
}
2677
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java

Lines changed: 9 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,13 @@
2020

2121
import org.apache.paimon.data.InternalRow;
2222
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
23-
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
23+
import org.apache.paimon.globalindex.GlobalIndexWriter;
2424
import org.apache.paimon.globalindex.GlobalIndexer;
25-
import org.apache.paimon.globalindex.IndexedSplit;
2625
import org.apache.paimon.globalindex.ResultEntry;
2726
import org.apache.paimon.index.GlobalIndexMeta;
2827
import org.apache.paimon.index.IndexFileMeta;
29-
import org.apache.paimon.io.CompactIncrement;
30-
import org.apache.paimon.io.DataIncrement;
31-
import org.apache.paimon.reader.RecordReader;
3228
import org.apache.paimon.table.sink.CommitMessage;
33-
import org.apache.paimon.table.sink.CommitMessageImpl;
34-
import org.apache.paimon.table.source.ReadBuilder;
35-
import org.apache.paimon.utils.LongCounter;
29+
import org.apache.paimon.utils.CloseableIterator;
3630

3731
import java.io.IOException;
3832
import java.util.ArrayList;
@@ -47,35 +41,19 @@ protected GlobalIndexBuilder(GlobalIndexBuilderContext context) {
4741
this.context = context;
4842
}
4943

50-
public CommitMessage build(IndexedSplit indexedSplit) throws IOException {
51-
ReadBuilder builder = context.table().newReadBuilder();
52-
builder.withReadType(context.readType());
53-
RecordReader<InternalRow> rows = builder.newRead().createReader(indexedSplit);
54-
LongCounter rowCounter = new LongCounter(0);
55-
List<ResultEntry> resultEntries = writePaimonRows(context, rows, rowCounter);
56-
List<IndexFileMeta> indexFileMetas =
57-
convertToIndexMeta(context, rowCounter.getValue(), resultEntries);
58-
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas);
59-
return new CommitMessageImpl(
60-
context.partition(), 0, null, dataIncrement, CompactIncrement.emptyIncrement());
61-
}
44+
public abstract List<CommitMessage> build(CloseableIterator<InternalRow> data)
45+
throws IOException;
6246

63-
private static List<IndexFileMeta> convertToIndexMeta(
64-
GlobalIndexBuilderContext context, long totalRowCount, List<ResultEntry> entries)
65-
throws IOException {
47+
protected List<IndexFileMeta> convertToIndexMeta(
48+
long rangeStart, long rangeEnd, List<ResultEntry> entries) throws IOException {
6649
List<IndexFileMeta> results = new ArrayList<>();
67-
long rangeEnd = context.startOffset() + totalRowCount - 1;
6850
for (ResultEntry entry : entries) {
6951
String fileName = entry.fileName();
7052
GlobalIndexFileReadWrite readWrite = context.globalIndexFileReadWrite();
7153
long fileSize = readWrite.fileSize(fileName);
7254
GlobalIndexMeta globalIndexMeta =
7355
new GlobalIndexMeta(
74-
context.startOffset(),
75-
rangeEnd,
76-
context.indexField().id(),
77-
null,
78-
entry.meta());
56+
rangeStart, rangeEnd, context.indexField().id(), null, entry.meta());
7957
IndexFileMeta indexFileMeta =
8058
new IndexFileMeta(
8159
context.indexType(),
@@ -88,26 +66,9 @@ private static List<IndexFileMeta> convertToIndexMeta(
8866
return results;
8967
}
9068

91-
private static List<ResultEntry> writePaimonRows(
92-
GlobalIndexBuilderContext context,
93-
RecordReader<InternalRow> rows,
94-
LongCounter rowCounter)
95-
throws IOException {
69+
protected GlobalIndexWriter createIndexWriter() throws IOException {
9670
GlobalIndexer globalIndexer =
9771
GlobalIndexer.create(context.indexType(), context.indexField(), context.options());
98-
GlobalIndexSingletonWriter globalIndexWriter =
99-
(GlobalIndexSingletonWriter)
100-
globalIndexer.createWriter(context.globalIndexFileReadWrite());
101-
InternalRow.FieldGetter getter =
102-
InternalRow.createFieldGetter(
103-
context.indexField().type(),
104-
context.readType().getFieldIndex(context.indexField().name()));
105-
rows.forEachRemaining(
106-
row -> {
107-
Object indexO = getter.getFieldOrNull(row);
108-
globalIndexWriter.write(indexO);
109-
rowCounter.add(1);
110-
});
111-
return globalIndexWriter.finish();
72+
return globalIndexer.createWriter(context.globalIndexFileReadWrite());
11273
}
11374
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import org.apache.paimon.table.FileStoreTable;
2828
import org.apache.paimon.types.DataField;
2929
import org.apache.paimon.types.RowType;
30+
import org.apache.paimon.utils.Range;
31+
32+
import javax.annotation.Nullable;
3033

3134
import java.io.IOException;
3235
import java.io.Serializable;
@@ -36,45 +39,57 @@
3639
*
3740
* <p>This class is serializable to support Spark distributed execution. The partition is stored
3841
* both as a transient {@link BinaryRow} and as serialized bytes to ensure proper serialization
39-
* across executor nodes.
42+
* across executor nodes. Partition info can be null if the actual index partition is dynamically
43+
* extracted from data.
4044
*/
4145
public class GlobalIndexBuilderContext implements Serializable {
4246

4347
private final FileStoreTable table;
44-
private final BinaryRowSerializer binaryRowSerializer;
45-
private final byte[] partitionBytes;
48+
@Nullable private final BinaryRowSerializer binaryRowSerializer;
49+
@Nullable private final byte[] partitionBytes;
4650
private final RowType readType;
4751
private final DataField indexField;
4852
private final String indexType;
4953
private final long startOffset;
5054
private final Options options;
55+
@Nullable private final Range fullRange;
5156

5257
public GlobalIndexBuilderContext(
5358
FileStoreTable table,
54-
BinaryRow partition,
59+
@Nullable BinaryRow partition,
5560
RowType readType,
5661
DataField indexField,
5762
String indexType,
5863
long startOffset,
59-
Options options) {
64+
Options options,
65+
@Nullable Range fullRange) {
6066
this.table = table;
6167
this.readType = readType;
6268
this.indexField = indexField;
6369
this.indexType = indexType;
6470
this.startOffset = startOffset;
6571
this.options = options;
72+
this.fullRange = fullRange;
6673

67-
this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount());
68-
try {
69-
this.partitionBytes = binaryRowSerializer.serializeToBytes(partition);
70-
} catch (IOException e) {
71-
throw new RuntimeException(e);
74+
if (partition != null) {
75+
this.binaryRowSerializer = new BinaryRowSerializer(partition.getFieldCount());
76+
try {
77+
this.partitionBytes = binaryRowSerializer.serializeToBytes(partition);
78+
} catch (IOException e) {
79+
throw new RuntimeException(e);
80+
}
81+
} else {
82+
this.binaryRowSerializer = null;
83+
this.partitionBytes = null;
7284
}
7385
}
7486

87+
@Nullable
7588
public BinaryRow partition() {
7689
try {
77-
return binaryRowSerializer.deserializeFromBytes(partitionBytes);
90+
return partitionBytes == null || binaryRowSerializer == null
91+
? null
92+
: binaryRowSerializer.deserializeFromBytes(partitionBytes);
7893
} catch (IOException e) {
7994
throw new RuntimeException(e);
8095
}
@@ -109,4 +124,9 @@ public GlobalIndexFileReadWrite globalIndexFileReadWrite() {
109124
IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
110125
return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
111126
}
127+
128+
@Nullable
129+
public Range fullRange() {
130+
return fullRange;
131+
}
112132
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,27 @@
1818

1919
package org.apache.paimon.spark.globalindex;
2020

21-
import org.apache.paimon.data.BinaryRow;
22-
import org.apache.paimon.globalindex.IndexedSplit;
2321
import org.apache.paimon.options.Options;
22+
import org.apache.paimon.partition.PartitionPredicate;
2423
import org.apache.paimon.table.FileStoreTable;
2524
import org.apache.paimon.table.sink.CommitMessage;
2625
import org.apache.paimon.types.DataField;
2726
import org.apache.paimon.types.RowType;
2827

29-
import org.apache.spark.api.java.JavaSparkContext;
28+
import org.apache.spark.sql.SparkSession;
29+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
3030

3131
import java.io.IOException;
3232
import java.util.List;
33-
import java.util.Map;
3433

3534
/** User defined topology builder. */
3635
public interface GlobalIndexTopoBuilder {
3736

3837
List<CommitMessage> buildIndex(
39-
JavaSparkContext javaSparkContext,
38+
SparkSession spark,
39+
DataSourceV2Relation relation,
40+
PartitionPredicate partitionPredicate,
4041
FileStoreTable table,
41-
Map<BinaryRow, List<IndexedSplit>> preparedDS,
4242
String indexType,
4343
RowType readType,
4444
DataField indexField,

0 commit comments

Comments
 (0)