Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.append;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.paimon.utils.SinkWriter;
import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
import org.apache.paimon.utils.SinkWriter.SortedBufferedSinkWriter;
import org.apache.paimon.utils.StatsCollectorFactories;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
@Nullable private final IOManager ioManager;
private final FileIndexOptions fileIndexOptions;
private final MemorySize maxDiskSize;
private final CoreOptions coreOptions;
private final boolean sortEnabled;

@Nullable private CompactDeletionFile compactDeletionFile;
private SinkWriter<InternalRow> sinkWriter;
Expand Down Expand Up @@ -129,7 +133,8 @@ public AppendOnlyWriter(
boolean asyncFileWrite,
boolean statsDenseStore,
boolean dataEvolutionEnabled,
@Nullable BlobFileContext blobContext) {
@Nullable BlobFileContext blobContext,
CoreOptions coreOptions) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand Down Expand Up @@ -159,11 +164,18 @@ public AppendOnlyWriter(
this.statsCollectorFactories = statsCollectorFactories;
this.maxDiskSize = maxDiskSize;
this.fileIndexOptions = fileIndexOptions;
this.coreOptions = coreOptions;

this.sinkWriter =
useWriteBuffer
? createBufferedSinkWriter(spillable)
: new DirectSinkWriter<>(this::createRollingRowWriter);
// Determine if we need to enable sorting based on clustering configuration
List<String> clusteringColumns = coreOptions.clusteringColumns();
this.sortEnabled =
coreOptions.clusteringIncrementalEnabled()
&& coreOptions.clusteringIncrementalOptimizeWrite()
&& coreOptions.clusteringIncrementalMode()
== CoreOptions.ClusteringIncrementalMode.LOCAL_SORT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LOCAL_SORT is described as Task-Level sorting, but what we have actually implemented is File-Level sorting.

Do we need to introduce a mode similar to "file_local" to represent this specific granularity of File-Level sorting functionality?

 /**
         * Sort rows only within each compaction task (no global shuffle). Every output file is
         * internally ordered by the clustering columns, which is sufficient for per-file Parquet
         * lookup optimizations.
         */
        LOCAL_SORT(
                "local-sort",
                "Sort rows only within each compaction task without global shuffle. Every output file is internally ordered.");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi What do you think of LOCAL_SORT? In our previous discussion, this was meant for local sorting in a single file. However, judging from the current situation, it is used for data sorting at the task level.

&& !clusteringColumns.isEmpty();

this.sinkWriter = createSinkWriter(useWriteBuffer, spillable);

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
Expand All @@ -174,16 +186,31 @@ public AppendOnlyWriter(
}
}

private BufferedSinkWriter<InternalRow> createBufferedSinkWriter(boolean spillable) {
return new BufferedSinkWriter<>(
this::createRollingRowWriter,
t -> t,
t -> t,
ioManager,
writeSchema,
spillable,
maxDiskSize,
spillCompression);
private SinkWriter<InternalRow> createSinkWriter(boolean useWriteBuffer, boolean spillable) {
if (useWriteBuffer) {
if (sortEnabled) {
return new SortedBufferedSinkWriter<>(
this::createRollingRowWriter,
t -> t,
t -> t,
ioManager,
writeSchema,
coreOptions,
spillable);
} else {
return new BufferedSinkWriter<>(
this::createRollingRowWriter,
t -> t,
t -> t,
ioManager,
writeSchema,
spillable,
maxDiskSize,
spillCompression);
}
} else {
return new DirectSinkWriter<>(this::createRollingRowWriter);
}
}

@Override
Expand All @@ -209,9 +236,9 @@ public void write(InternalRow rowData) throws Exception {
@Override
public void writeBundle(BundleRecords bundle) throws Exception {
if (sinkWriter instanceof BufferedSinkWriter) {
for (InternalRow row : bundle) {
write(row);
}
((BufferedSinkWriter<InternalRow>) sinkWriter).writeBundle(bundle);
} else if (sinkWriter instanceof SortedBufferedSinkWriter) {
((SortedBufferedSinkWriter<InternalRow>) sinkWriter).writeBundle(bundle);
} else {
((DirectSinkWriter<?>) sinkWriter).writeBundle(bundle);
}
Expand Down Expand Up @@ -292,7 +319,7 @@ public void toBufferedWriter() throws Exception {
List<DataFileMeta> files = sinkWriter.flush();

sinkWriter.close();
sinkWriter = createBufferedSinkWriter(true);
sinkWriter = createSinkWriter(true, true);
sinkWriter.setMemoryPool(memorySegmentPool);

// rewrite small files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ protected RecordWriter<InternalRow> createWriter(
options.asyncFileWrite(),
options.statsDenseStore(),
options.dataEvolutionEnabled(),
blobContext);
blobContext,
options);
}

@Override
Expand Down
175 changes: 161 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.paimon.utils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
Expand All @@ -28,6 +33,9 @@
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.sort.SortBuffer;
import org.apache.paimon.types.RowType;

import java.io.IOException;
Expand Down Expand Up @@ -123,17 +131,49 @@ public boolean bufferSpillableWriter() {
}
}

/**
* Base class for buffered sink writers, contains common logic for flushing, writing bundles,
* and managing lifecycle.
*/
abstract class BaseBufferedSinkWriter<T> implements SinkWriter<T> {

protected final Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier;
protected final Function<T, InternalRow> toRow;
protected final Function<InternalRow, T> fromRow;
protected final IOManager ioManager;
protected final RowType rowType;

protected BaseBufferedSinkWriter(
Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier,
Function<T, InternalRow> toRow,
Function<InternalRow, T> fromRow,
IOManager ioManager,
RowType rowType) {
this.writerSupplier = writerSupplier;
this.toRow = toRow;
this.fromRow = fromRow;
this.ioManager = ioManager;
this.rowType = rowType;
}

public void writeBundle(BundleRecords bundle) throws IOException {
for (InternalRow row : bundle) {
write(fromRow.apply(row));
}
}

protected void writeToWriter(RollingFileWriter<T, DataFileMeta> writer, InternalRow row)
throws IOException {
writer.write(fromRow.apply(row));
}
}

/**
* Use buffered writer, segment pooled from segment pool. When spillable, may delay checkpoint
* acknowledge time. When non-spillable, may cause too many small files.
*/
class BufferedSinkWriter<T> implements SinkWriter<T> {
class BufferedSinkWriter<T> extends BaseBufferedSinkWriter<T> {

private final Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier;
private final Function<T, InternalRow> toRow;
private final Function<InternalRow, T> fromRow;
private final IOManager ioManager;
private final RowType rowType;
private final boolean spillable;
private final MemorySize maxDiskSize;
private final CompressOptions compression;
Expand All @@ -149,11 +189,7 @@ public BufferedSinkWriter(
boolean spillable,
MemorySize maxDiskSize,
CompressOptions compression) {
this.writerSupplier = writerSupplier;
this.toRow = toRow;
this.fromRow = fromRow;
this.ioManager = ioManager;
this.rowType = rowType;
super(writerSupplier, toRow, fromRow, ioManager, rowType);
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
this.compression = compression;
Expand All @@ -176,20 +212,18 @@ public List<DataFileMeta> flush() throws IOException {
IOException exception = null;
try (RowBuffer.RowBufferIterator iterator = writeBuffer.newIterator()) {
while (iterator.advanceNext()) {
writer.write(fromRow.apply(iterator.getRow()));
writeToWriter(writer, iterator.getRow());
}
} catch (IOException e) {
exception = e;
} finally {
if (exception != null) {
IOUtils.closeQuietly(writer);
// cleanup code that might throw another exception
throw exception;
}
writer.close();
}
flushedFiles.addAll(writer.result());
// reuse writeBuffer
writeBuffer.reset();
}
return flushedFiles;
Expand Down Expand Up @@ -230,4 +264,117 @@ public boolean flushMemory() throws IOException {
return writeBuffer.flushMemory();
}
}

/** Sink writer that sorts data within each file according to clustering configuration. */
class SortedBufferedSinkWriter<T> extends BaseBufferedSinkWriter<T> {

private final CoreOptions options;
private final boolean spillable;

private SortBuffer sortBuffer;
private boolean useSpillSortBuffer;

public SortedBufferedSinkWriter(
Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier,
Function<T, InternalRow> toRow,
Function<InternalRow, T> fromRow,
IOManager ioManager,
RowType rowType,
CoreOptions options,
boolean spillable) {
super(writerSupplier, toRow, fromRow, ioManager, rowType);
this.options = options;
this.spillable = spillable;
}

public SortBuffer sortBuffer() {
return sortBuffer;
}

@Override
public boolean write(T data) throws IOException {
return sortBuffer.write(toRow.apply(data));
}

@Override
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (sortBuffer.size() > 0) {
RollingFileWriter<T, DataFileMeta> writer = writerSupplier.get();
IOException exception = null;
try {
MutableObjectIterator<BinaryRow> sorted = sortBuffer.sortedIterator();
BinaryRow row = new BinaryRow(rowType.getFieldCount());
BinaryRow reuse;
while ((reuse = sorted.next(row)) != null) {
writeToWriter(writer, reuse);
}
} catch (IOException e) {
exception = e;
} finally {
if (exception != null) {
IOUtils.closeQuietly(writer);
throw exception;
}
writer.close();
}
flushedFiles.addAll(writer.result());
sortBuffer.clear();
}
return flushedFiles;
}

@Override
public long memoryOccupancy() {
return sortBuffer.getOccupancy();
}

@Override
public void close() {
if (sortBuffer != null) {
sortBuffer.clear();
sortBuffer = null;
}
}

@Override
public void setMemoryPool(MemorySegmentPool memoryPool) {
List<String> clusteringColumns = options.clusteringColumns();
int[] keyFields =
clusteringColumns.stream().mapToInt(rowType.getFieldNames()::indexOf).toArray();
RecordComparator comparator =
CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), keyFields);
BinaryInMemorySortBuffer inMemorySortBuffer =
BinaryInMemorySortBuffer.createBuffer(
CodeGenUtils.newNormalizedKeyComputer(
rowType.getFieldTypes(), keyFields),
new InternalRowSerializer(rowType),
comparator,
memoryPool);

this.useSpillSortBuffer = ioManager != null && spillable;
this.sortBuffer =
useSpillSortBuffer
? new BinaryExternalSortBuffer(
new BinaryRowSerializer(rowType.getFieldCount()),
comparator,
memoryPool.pageSize(),
inMemorySortBuffer,
ioManager,
options.localSortMaxNumFileHandles(),
options.spillCompressOptions(),
options.writeBufferSpillDiskSize())
: inMemorySortBuffer;
}

@Override
public boolean bufferSpillableWriter() {
return useSpillSortBuffer;
}

@Override
public boolean flushMemory() throws IOException {
return sortBuffer.flushMemory();
}
}
}
Loading