diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 6f081bf2a0e6..9df6f5884d6a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -18,6 +18,7 @@ package org.apache.paimon.append; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.compact.CompactDeletionFile; import org.apache.paimon.compact.CompactManager; @@ -51,6 +52,7 @@ import org.apache.paimon.utils.SinkWriter; import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter; import org.apache.paimon.utils.SinkWriter.DirectSinkWriter; +import org.apache.paimon.utils.SinkWriter.SortedBufferedSinkWriter; import org.apache.paimon.utils.StatsCollectorFactories; import javax.annotation.Nullable; @@ -97,6 +99,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { @Nullable private final IOManager ioManager; private final FileIndexOptions fileIndexOptions; private final MemorySize maxDiskSize; + private final CoreOptions coreOptions; + private final boolean sortEnabled; @Nullable private CompactDeletionFile compactDeletionFile; private SinkWriter sinkWriter; @@ -129,7 +133,8 @@ public AppendOnlyWriter( boolean asyncFileWrite, boolean statsDenseStore, boolean dataEvolutionEnabled, - @Nullable BlobFileContext blobContext) { + @Nullable BlobFileContext blobContext, + CoreOptions coreOptions) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -159,11 +164,18 @@ public AppendOnlyWriter( this.statsCollectorFactories = statsCollectorFactories; this.maxDiskSize = maxDiskSize; this.fileIndexOptions = fileIndexOptions; + this.coreOptions = coreOptions; - this.sinkWriter = - useWriteBuffer - ? createBufferedSinkWriter(spillable) - : new DirectSinkWriter<>(this::createRollingRowWriter); + // Determine if we need to enable sorting based on clustering configuration + List clusteringColumns = coreOptions.clusteringColumns(); + this.sortEnabled = + coreOptions.clusteringIncrementalEnabled() + && coreOptions.clusteringIncrementalOptimizeWrite() + && coreOptions.clusteringIncrementalMode() + == CoreOptions.ClusteringIncrementalMode.LOCAL_SORT + && !clusteringColumns.isEmpty(); + + this.sinkWriter = createSinkWriter(useWriteBuffer, spillable); if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); @@ -174,16 +186,31 @@ public AppendOnlyWriter( } } - private BufferedSinkWriter createBufferedSinkWriter(boolean spillable) { - return new BufferedSinkWriter<>( - this::createRollingRowWriter, - t -> t, - t -> t, - ioManager, - writeSchema, - spillable, - maxDiskSize, - spillCompression); + private SinkWriter createSinkWriter(boolean useWriteBuffer, boolean spillable) { + if (useWriteBuffer) { + if (sortEnabled) { + return new SortedBufferedSinkWriter<>( + this::createRollingRowWriter, + t -> t, + t -> t, + ioManager, + writeSchema, + coreOptions, + spillable); + } else { + return new BufferedSinkWriter<>( + this::createRollingRowWriter, + t -> t, + t -> t, + ioManager, + writeSchema, + spillable, + maxDiskSize, + spillCompression); + } + } else { + return new DirectSinkWriter<>(this::createRollingRowWriter); + } } @Override @@ -209,9 +236,9 @@ public void write(InternalRow rowData) throws Exception { @Override public void writeBundle(BundleRecords bundle) throws Exception { if (sinkWriter instanceof BufferedSinkWriter) { - for (InternalRow row : bundle) { - write(row); - } + ((BufferedSinkWriter) sinkWriter).writeBundle(bundle); + } else if (sinkWriter instanceof SortedBufferedSinkWriter) { + ((SortedBufferedSinkWriter) sinkWriter).writeBundle(bundle); } else { ((DirectSinkWriter) sinkWriter).writeBundle(bundle); } @@ -292,7 +319,7 @@ public void toBufferedWriter() throws Exception { List files = sinkWriter.flush(); sinkWriter.close(); - sinkWriter = createBufferedSinkWriter(true); + sinkWriter = createSinkWriter(true, true); sinkWriter.setMemoryPool(memorySegmentPool); // rewrite small files diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 94620205a232..5e46b74ad1ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -154,7 +154,8 @@ protected RecordWriter createWriter( options.asyncFileWrite(), options.statsDenseStore(), options.dataEvolutionEnabled(), - blobContext); + blobContext, + options); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java index 439eea707cf0..af1a5865d816 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java @@ -18,8 +18,13 @@ package org.apache.paimon.utils; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.compression.CompressOptions; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; @@ -28,6 +33,9 @@ import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.sort.BinaryExternalSortBuffer; +import org.apache.paimon.sort.BinaryInMemorySortBuffer; +import org.apache.paimon.sort.SortBuffer; import org.apache.paimon.types.RowType; import java.io.IOException; @@ -123,17 +131,49 @@ public boolean bufferSpillableWriter() { } } + /** + * Base class for buffered sink writers, contains common logic for flushing, writing bundles, + * and managing lifecycle. + */ + abstract class BaseBufferedSinkWriter implements SinkWriter { + + protected final Supplier> writerSupplier; + protected final Function toRow; + protected final Function fromRow; + protected final IOManager ioManager; + protected final RowType rowType; + + protected BaseBufferedSinkWriter( + Supplier> writerSupplier, + Function toRow, + Function fromRow, + IOManager ioManager, + RowType rowType) { + this.writerSupplier = writerSupplier; + this.toRow = toRow; + this.fromRow = fromRow; + this.ioManager = ioManager; + this.rowType = rowType; + } + + public void writeBundle(BundleRecords bundle) throws IOException { + for (InternalRow row : bundle) { + write(fromRow.apply(row)); + } + } + + protected void writeToWriter(RollingFileWriter writer, InternalRow row) + throws IOException { + writer.write(fromRow.apply(row)); + } + } + /** * Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint * acknowledge time. When non-spillable, may cause too many small files. */ - class BufferedSinkWriter implements SinkWriter { + class BufferedSinkWriter extends BaseBufferedSinkWriter { - private final Supplier> writerSupplier; - private final Function toRow; - private final Function fromRow; - private final IOManager ioManager; - private final RowType rowType; private final boolean spillable; private final MemorySize maxDiskSize; private final CompressOptions compression; @@ -149,11 +189,7 @@ public BufferedSinkWriter( boolean spillable, MemorySize maxDiskSize, CompressOptions compression) { - this.writerSupplier = writerSupplier; - this.toRow = toRow; - this.fromRow = fromRow; - this.ioManager = ioManager; - this.rowType = rowType; + super(writerSupplier, toRow, fromRow, ioManager, rowType); this.spillable = spillable; this.maxDiskSize = maxDiskSize; this.compression = compression; @@ -176,20 +212,18 @@ public List flush() throws IOException { IOException exception = null; try (RowBuffer.RowBufferIterator iterator = writeBuffer.newIterator()) { while (iterator.advanceNext()) { - writer.write(fromRow.apply(iterator.getRow())); + writeToWriter(writer, iterator.getRow()); } } catch (IOException e) { exception = e; } finally { if (exception != null) { IOUtils.closeQuietly(writer); - // cleanup code that might throw another exception throw exception; } writer.close(); } flushedFiles.addAll(writer.result()); - // reuse writeBuffer writeBuffer.reset(); } return flushedFiles; @@ -230,4 +264,117 @@ public boolean flushMemory() throws IOException { return writeBuffer.flushMemory(); } } + + /** Sink writer that sorts data within each file according to clustering configuration. */ + class SortedBufferedSinkWriter extends BaseBufferedSinkWriter { + + private final CoreOptions options; + private final boolean spillable; + + private SortBuffer sortBuffer; + private boolean useSpillSortBuffer; + + public SortedBufferedSinkWriter( + Supplier> writerSupplier, + Function toRow, + Function fromRow, + IOManager ioManager, + RowType rowType, + CoreOptions options, + boolean spillable) { + super(writerSupplier, toRow, fromRow, ioManager, rowType); + this.options = options; + this.spillable = spillable; + } + + public SortBuffer sortBuffer() { + return sortBuffer; + } + + @Override + public boolean write(T data) throws IOException { + return sortBuffer.write(toRow.apply(data)); + } + + @Override + public List flush() throws IOException { + List flushedFiles = new ArrayList<>(); + if (sortBuffer.size() > 0) { + RollingFileWriter writer = writerSupplier.get(); + IOException exception = null; + try { + MutableObjectIterator sorted = sortBuffer.sortedIterator(); + BinaryRow row = new BinaryRow(rowType.getFieldCount()); + BinaryRow reuse; + while ((reuse = sorted.next(row)) != null) { + writeToWriter(writer, reuse); + } + } catch (IOException e) { + exception = e; + } finally { + if (exception != null) { + IOUtils.closeQuietly(writer); + throw exception; + } + writer.close(); + } + flushedFiles.addAll(writer.result()); + sortBuffer.clear(); + } + return flushedFiles; + } + + @Override + public long memoryOccupancy() { + return sortBuffer.getOccupancy(); + } + + @Override + public void close() { + if (sortBuffer != null) { + sortBuffer.clear(); + sortBuffer = null; + } + } + + @Override + public void setMemoryPool(MemorySegmentPool memoryPool) { + List clusteringColumns = options.clusteringColumns(); + int[] keyFields = + clusteringColumns.stream().mapToInt(rowType.getFieldNames()::indexOf).toArray(); + RecordComparator comparator = + CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), keyFields); + BinaryInMemorySortBuffer inMemorySortBuffer = + BinaryInMemorySortBuffer.createBuffer( + CodeGenUtils.newNormalizedKeyComputer( + rowType.getFieldTypes(), keyFields), + new InternalRowSerializer(rowType), + comparator, + memoryPool); + + this.useSpillSortBuffer = ioManager != null && spillable; + this.sortBuffer = + useSpillSortBuffer + ? new BinaryExternalSortBuffer( + new BinaryRowSerializer(rowType.getFieldCount()), + comparator, + memoryPool.pageSize(), + inMemorySortBuffer, + ioManager, + options.localSortMaxNumFileHandles(), + options.spillCompressOptions(), + options.writeBufferSpillDiskSize()) + : inMemorySortBuffer; + } + + @Override + public boolean bufferSpillableWriter() { + return useSpillSortBuffer; + } + + @Override + public boolean flushMemory() throws IOException { + return sortBuffer.flushMemory(); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index c861296f2ba6..b7a5f39d517f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; @@ -45,17 +46,25 @@ import org.apache.paimon.operation.BlobFileContext; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.table.AppendOnlyFileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.types.BlobType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.Pair; @@ -78,6 +87,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -827,7 +837,8 @@ private Pair> createWriterBase( true, false, options.dataEvolutionEnabled(), - BlobFileContext.create(writeSchema, options)); + BlobFileContext.create(writeSchema, options), + options); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); @@ -880,4 +891,55 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I private InternalRow createBlobRow(int id, String name, byte[] blobData) { return GenericRow.of(id, BinaryString.fromString(name), new BlobData(blobData)); } + + @Test + public void testSortedBufferedSinkWriter() throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "id"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "true"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL_OPTIMIZE_WRITE.key(), "true"); + options.put(CoreOptions.CLUSTERING_INCREMENTAL_MODE.key(), "local-sort"); + options.put(CoreOptions.WRITE_BUFFER_FOR_APPEND.key(), "true"); + + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.options(options); + for (DataField dataField : SCHEMA.getFields()) { + schemaBuilder.column(dataField.name(), dataField.type()); + } + Schema schema = schemaBuilder.build(); + + FileIO fileIO = LocalFileIO.create(); + Path tablePath = pathFactory.newPath(UUID.randomUUID().toString()); + SchemaManager schemaManager = new SchemaManager(fileIO, tablePath); + TableSchema tableSchema = TableSchema.create(0, schema); + schemaManager.commit(tableSchema); + AppendOnlyFileStoreTable table = + (AppendOnlyFileStoreTable) + FileStoreTableFactory.create(fileIO, tablePath, tableSchema); + TableWriteImpl writer = table.newWrite("test"); + // Write unordered data + writer.write(row(3, "Name3", PART)); + writer.write(row(1, "Name1", PART)); + writer.write(row(4, "Name4", PART)); + writer.write(row(2, "Name2", PART)); + writer.write(row(5, "Name5", PART)); + + // Commit + List commitMessageList = writer.prepareCommit(); + TableCommitImpl committer = table.newCommit("test"); + committer.commit(commitMessageList); + committer.close(); + writer.close(); + + InnerTableRead tableRead = table.newRead(); + RecordReader reader = tableRead.createReader(table.newScan().plan()); + try (CloseableIterator iterator = reader.toCloseableIterator()) { + List ids = new ArrayList<>(); + while (iterator.hasNext()) { + ids.add(iterator.next().getInt(0)); + } + // Verify the data is ordered by id ascending + assertThat(ids).containsExactly(1, 2, 3, 4, 5); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 0169a6d5e3d6..837d52124930 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -295,7 +295,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception true, false, options.dataEvolutionEnabled(), - BlobFileContext.create(schema, options)); + BlobFileContext.create(schema, options), + options); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write(