diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index b03a6cdf7a2f..e3cd1989009c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -20,7 +20,9 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.FlinkRowWrapper; @@ -49,7 +51,9 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Range; @@ -63,8 +67,10 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,6 +83,9 @@ /** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */ public class BTreeIndexTopoBuilder { + private static final String BUILD_TASK_ID_FIELD = "_BTREE_BUILD_TASK_ID"; + private static final int BUILD_TASK_ID_FIELD_ID = -1; + public static boolean buildIndex( StreamExecutionEnvironment env, Supplier indexBuilderSupplier, @@ -114,27 +123,34 @@ public static boolean buildIndex( List selectedColumns = new ArrayList<>(); selectedColumns.add(indexColumn); - RowType readType = + RowType dataReadType = SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns)); - int indexFieldPos = readType.getFieldIndex(indexColumn); - int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name()); - DataType indexFieldType = readType.getTypeAt(indexFieldPos); + String buildTaskIdField = buildTaskIdFieldName(dataReadType); + RowType sortReadType = withBuildTaskId(dataReadType, buildTaskIdField); + int taskIdPos = sortReadType.getFieldIndex(buildTaskIdField); + int indexFieldPos = sortReadType.getFieldIndex(indexColumn); + int rowIdPos = sortReadType.getFieldIndex(SpecialFields.ROW_ID.name()); + DataType indexFieldType = sortReadType.getTypeAt(indexFieldPos); // 3. Calculate maximum parallelism bound long recordsPerRange = userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); int maxParallelism = userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM); - // 4. Build one topology per contiguous row range + // 4. Build one topology for all contiguous row ranges CoreOptions coreOptions = table.coreOptions(); - ReadBuilder readBuilder = table.newReadBuilder().withReadType(readType); + ReadBuilder readBuilder = table.newReadBuilder().withReadType(dataReadType); List sortColumns = new ArrayList<>(); + sortColumns.add(buildTaskIdField); sortColumns.add(indexColumn); int partitionFieldSize = table.partitionKeys().size(); BinaryRowSerializer binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize); + List buildTasks = new ArrayList<>(); + List splitTasks = new ArrayList<>(); for (Map.Entry>> partitionEntry : partitionRangeSplits.entrySet()) { BinaryRow partition = partitionEntry.getKey(); + byte[] partitionBytes = binaryRowSerializer.serializeToBytes(partition); for (Map.Entry> entry : partitionEntry.getValue().entrySet()) { Range range = entry.getKey(); List rangeSplits = entry.getValue(); @@ -142,27 +158,37 @@ public static boolean buildIndex( continue; } - DataStream commitMessages = - executeForPartitionRange( - env, - range, - rangeSplits, - readBuilder, - indexBuilder, - partitionFieldSize, - binaryRowSerializer.serializeToBytes(partition), - indexFieldPos, - rowIdPos, - indexFieldType, - sortColumns, - coreOptions, - readType, - recordsPerRange, - maxParallelism); - - allStreams.add(commitMessages); + int taskId = buildTasks.size(); + buildTasks.add(new BTreeBuildTask(taskId, range, partitionBytes)); + for (Split split : rangeSplits) { + splitTasks.add(new BTreeSplitTask(taskId, split)); + } } } + + if (buildTasks.isEmpty()) { + return false; + } + + DataStream commitMessages = + executeForBuildTasks( + env, + buildTasks, + splitTasks, + readBuilder, + indexBuilder, + partitionFieldSize, + taskIdPos, + indexFieldPos, + rowIdPos, + indexFieldType, + sortColumns, + coreOptions, + sortReadType, + recordsPerRange, + maxParallelism); + + allStreams.add(commitMessages); } if (!allStreams.isEmpty()) { @SuppressWarnings("unchecked") @@ -192,14 +218,14 @@ public static void buildIndexAndExecute( } } - protected static DataStream executeForPartitionRange( + protected static DataStream executeForBuildTasks( StreamExecutionEnvironment env, - Range range, - List rangeSplits, + List buildTasks, + List splitTasks, ReadBuilder readBuilder, BTreeGlobalIndexBuilder indexBuilder, int partitionFieldSize, - byte[] partition, + int taskIdPos, int indexFieldPos, int rowIdPos, DataType indexFieldType, @@ -208,21 +234,18 @@ protected static DataStream executeForPartitionRange( RowType readType, long recordsPerRange, int maxParallelism) { - int parallelism = Math.max((int) (range.count() / recordsPerRange), 1); - parallelism = Math.min(parallelism, maxParallelism); + int parallelism = calculateParallelism(buildTasks, recordsPerRange, maxParallelism); - DataStream sourceStream = + DataStream sourceStream = StreamExecutionEnvironmentUtils.fromData( - env, - new JavaTypeInfo<>(Split.class), - rangeSplits.toArray(new Split[0])) - .name("Global Index Source " + " range=" + range) + env, splitTasks, new JavaTypeInfo<>(BTreeSplitTask.class)) + .name("Global Index Source") .setParallelism(1); DataStream rowDataStream = sourceStream .transform( - "Read Data " + range, + "Read Data", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)), new ReadDataOperator(readBuilder)) .setParallelism(parallelism); @@ -243,19 +266,51 @@ protected static DataStream executeForPartitionRange( return sortedStream .transform( - "write-btree-index " + range, + "write-btree-index", new CommittableTypeInfo(), new WriteIndexOperator( - range, + buildTasks, partitionFieldSize, - partition, indexBuilder, + taskIdPos, indexFieldPos, rowIdPos, indexFieldType)) .setParallelism(parallelism); } + static int calculateParallelism( + List buildTasks, long recordsPerRange, int maxParallelism) { + long totalRecords = 0; + for (BTreeBuildTask task : buildTasks) { + long count = task.rowRange.count(); + if (Long.MAX_VALUE - totalRecords < count) { + totalRecords = Long.MAX_VALUE; + } else { + totalRecords += count; + } + } + + long parallelism = Math.max(totalRecords / recordsPerRange, 1); + return (int) Math.min(parallelism, maxParallelism); + } + + private static String buildTaskIdFieldName(RowType readType) { + String fieldName = BUILD_TASK_ID_FIELD; + while (readType.containsField(fieldName)) { + fieldName = "_" + fieldName; + } + return fieldName; + } + + private static RowType withBuildTaskId(RowType readType, String buildTaskIdField) { + List fields = new ArrayList<>(); + fields.add( + new DataField(BUILD_TASK_ID_FIELD_ID, buildTaskIdField, DataTypes.INT().notNull())); + fields.addAll(readType.getFields()); + return new RowType(readType.isNullable(), fields); + } + private static void commit(FileStoreTable table, DataStream written) { OneInputStreamOperatorFactory committerOperator = new CommitterOperatorFactory<>( @@ -276,7 +331,7 @@ private static void commit(FileStoreTable table, DataStream written private static class ReadDataOperator extends org.apache.flink.table.runtime.operators.TableStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator< - Split, RowData> { + BTreeSplitTask, RowData> { private static final long serialVersionUID = 1L; @@ -295,43 +350,50 @@ public void open() throws Exception { } @Override - public void processElement(StreamRecord element) throws Exception { - Split split = element.getValue(); - try (RecordReader reader = tableRead.createReader(split)) { + public void processElement(StreamRecord element) throws Exception { + BTreeSplitTask buildTask = element.getValue(); + GenericRow taskId = GenericRow.of(buildTask.taskId); + try (RecordReader reader = tableRead.createReader(buildTask.split)) { reader.forEachRemaining( - row -> output.collect(new StreamRecord<>(new FlinkRowData(row)))); + row -> + output.collect( + new StreamRecord<>( + new FlinkRowData(new JoinedRow(taskId, row))))); } } } private static class WriteIndexOperator extends BoundedOneInputOperator { - private final Range rowRange; - private final byte[] partition; + private final List buildTasks; private final int partitionFieldSize; private final BTreeGlobalIndexBuilder builder; + private final int taskIdPos; private final int indexFieldPos; private final int rowIdPos; private final DataType indexFieldType; private transient long counter; + private transient BTreeBuildTask currentTask; + private transient BinaryRow currentPartition; private transient GlobalIndexParallelWriter currentWriter; private transient List commitMessages; + private transient Map buildTasksById; private transient InternalRow.FieldGetter indexFieldGetter; private transient BinaryRowSerializer binaryRowSerializer; public WriteIndexOperator( - Range rowRange, + List buildTasks, int partitionFieldSize, - byte[] partition, BTreeGlobalIndexBuilder builder, + int taskIdPos, int indexFieldPos, int rowIdPos, DataType indexFieldType) { - this.rowRange = rowRange; + this.buildTasks = buildTasks; this.partitionFieldSize = partitionFieldSize; - this.partition = partition; this.builder = builder; + this.taskIdPos = taskIdPos; this.indexFieldPos = indexFieldPos; this.rowIdPos = rowIdPos; this.indexFieldType = indexFieldType; @@ -341,6 +403,10 @@ public WriteIndexOperator( public void open() throws Exception { super.open(); commitMessages = new ArrayList<>(); + buildTasksById = new HashMap<>(); + for (BTreeBuildTask task : buildTasks) { + buildTasksById.put(task.taskId, task); + } indexFieldGetter = InternalRow.createFieldGetter(indexFieldType, indexFieldPos); this.binaryRowSerializer = new BinaryRowSerializer(partitionFieldSize); } @@ -348,14 +414,20 @@ public void open() throws Exception { @Override public void processElement(StreamRecord element) throws IOException { InternalRow row = new FlinkRowWrapper(element.getValue()); + int taskId = row.getInt(taskIdPos); + BTreeBuildTask task = buildTasksById.get(taskId); + if (task == null) { + throw new IllegalArgumentException("Unknown BTree build task id: " + taskId); + } + + if (currentTask == null || currentTask.taskId != taskId) { + flushCurrentWriter(); + currentTask = task; + currentPartition = binaryRowSerializer.deserializeFromBytes(task.partition); + } + if (currentWriter != null && counter >= builder.recordsPerRange()) { - commitMessages.add( - builder.flushIndex( - rowRange, - currentWriter.finish(), - binaryRowSerializer.deserializeFromBytes(partition))); - currentWriter = null; - counter = 0; + flushCurrentWriter(); } counter++; @@ -364,19 +436,13 @@ public void processElement(StreamRecord element) throws IOException { currentWriter = builder.createWriter(); } - long localRowId = row.getLong(rowIdPos) - rowRange.from; + long localRowId = row.getLong(rowIdPos) - currentTask.rowRange.from; currentWriter.write(indexFieldGetter.getFieldOrNull(row), localRowId); } @Override public void endInput() throws IOException { - if (counter > 0) { - commitMessages.add( - builder.flushIndex( - rowRange, - currentWriter.finish(), - binaryRowSerializer.deserializeFromBytes(partition))); - } + flushCurrentWriter(); for (CommitMessage message : commitMessages) { output.collect( new StreamRecord<>( @@ -384,5 +450,49 @@ public void endInput() throws IOException { } commitMessages.clear(); } + + private void flushCurrentWriter() throws IOException { + if (counter > 0 && currentWriter != null) { + commitMessages.add( + builder.flushIndex( + currentTask.rowRange, currentWriter.finish(), currentPartition)); + } + currentWriter = null; + counter = 0; + } + } + + /** Metadata for one BTree index build range. */ + public static class BTreeBuildTask implements Serializable { + + private static final long serialVersionUID = 1L; + + private int taskId; + private Range rowRange; + private byte[] partition; + + public BTreeBuildTask() {} + + BTreeBuildTask(int taskId, Range rowRange, byte[] partition) { + this.taskId = taskId; + this.rowRange = rowRange; + this.partition = partition; + } + } + + /** Split assigned to one BTree index build task. */ + public static class BTreeSplitTask implements Serializable { + + private static final long serialVersionUID = 1L; + + private int taskId; + private Split split; + + public BTreeSplitTask() {} + + BTreeSplitTask(int taskId, Split split) { + this.taskId = taskId; + this.split = split; + } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index 9aaa39e446f2..9433034d398b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -144,6 +144,40 @@ private void buildBTreeIndexForTable(String tableName, String indexColumn) { tableName, indexColumn); } + @Test + void testBTreeIndexWithSingleRangeAndParallelWriters() throws Catalog.TableNotExistException { + sql( + "CREATE TABLE T_SINGLE_RANGE_PARALLEL (id INT, name STRING) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + String values = + IntStream.range(0, 2_000) + .mapToObj(i -> String.format("(%s, %s)", i, "'name_" + i + "'")) + .collect(Collectors.joining(",")); + sql("INSERT INTO T_SINGLE_RANGE_PARALLEL VALUES " + values); + sql( + "CALL sys.create_global_index(`table` => 'default.T_SINGLE_RANGE_PARALLEL', " + + "index_column => 'id', index_type => 'btree', " + + "options => 'btree-index.records-per-range=100;" + + "btree-index.build.max-parallelism=4')"); + + FileStoreTable table = paimonTable("T_SINGLE_RANGE_PARALLEL"); + List btreeEntries = + table.store().newIndexFileHandler().scanEntries().stream() + .map(IndexManifestEntry::indexFile) + .filter(f -> "btree".equals(f.indexType())) + .collect(Collectors.toList()); + + long totalRowCount = btreeEntries.stream().mapToLong(IndexFileMeta::rowCount).sum(); + assertThat(btreeEntries).hasSizeGreaterThan(1); + assertThat(totalRowCount).isEqualTo(2_000L); + + assertThat(sql("SELECT * FROM T_SINGLE_RANGE_PARALLEL WHERE id = 1500")) + .containsOnly(Row.of(1500, "name_1500")); + } + @Test void testBTreeIndexWithManyPartitions() throws Catalog.TableNotExistException { int numPartitions = 50; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java new file mode 100644 index 000000000000..107e0ac04821 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.btree; + +import org.apache.paimon.flink.btree.BTreeIndexTopoBuilder.BTreeBuildTask; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BTreeIndexTopoBuilder}. */ +public class BTreeIndexTopoBuilderTest { + + @Test + public void testCalculateParallelismByTotalRowsInsteadOfRangeCount() { + List tasks = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + tasks.add(new BTreeBuildTask(i, new Range(i * 10L, i * 10L + 9), new byte[0])); + } + + assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 4096)).isEqualTo(1); + } + + @Test + public void testCalculateParallelismHonorsMaxParallelism() { + List tasks = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + tasks.add(new BTreeBuildTask(i, new Range(i * 1000L, i * 1000L + 999), new byte[0])); + } + + assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 16)).isEqualTo(16); + } + + @Test + public void testCalculateParallelismKeepsSingleRangeBehavior() { + List tasks = new ArrayList<>(); + tasks.add(new BTreeBuildTask(0, new Range(0, 1499), new byte[0])); + + assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L, 16)).isEqualTo(1); + } +}