Skip to content

Commit 6aa7a03

Browse files
committed
[core] Enables buffer spill when targetFileSize is greater than the write buffer size.
1 parent b00016d commit 6aa7a03

File tree

4 files changed

+11
-4
lines changed

4 files changed

+11
-4
lines changed

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,9 +1947,14 @@ public long writeBufferSize() {
19471947
return options.get(WRITE_BUFFER_SIZE).getBytes();
19481948
}
19491949

1950-
public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreaming) {
1950+
public boolean writeBufferSpillable(
1951+
boolean usingObjectStore, boolean isStreaming, boolean hasPrimaryKey) {
19511952
// if not streaming mode, we turn spillable on by default.
1952-
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
1953+
return options.getOptional(WRITE_BUFFER_SPILLABLE)
1954+
.orElse(
1955+
usingObjectStore
1956+
|| !isStreaming
1957+
|| targetFileSize(hasPrimaryKey) > writeBufferSize());
19531958
}
19541959

19551960
public MemorySize writeBufferSpillDiskSize() {

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected RecordWriter<InternalRow> createWriter(
125125
pathFactory.createDataFilePathFactory(partition, bucket),
126126
restoreIncrement,
127127
options.useWriteBufferForAppend() || forceBufferSpill,
128-
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode)
128+
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, false)
129129
|| forceBufferSpill,
130130
options.fileCompression(),
131131
options.spillCompressOptions(),

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ protected MergeTreeWriter createWriter(
229229

230230
@VisibleForTesting
231231
public boolean bufferSpillable() {
232-
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
232+
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, true);
233233
}
234234

235235
private CompactManager createCompactManager(

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void testPrimaryKeyTableMetrics() throws Exception {
8787
Options options = new Options();
8888
options.set("bucket", "1");
8989
options.set("write-buffer-size", "256 b");
90+
options.set("write-buffer-spillable", "false");
9091
options.set("page-size", "32 b");
9192

9293
FileStoreTable table =
@@ -332,6 +333,7 @@ public void testNumWritersMetric() throws Exception {
332333
Options options = new Options();
333334
options.set("bucket", "1");
334335
options.set("write-buffer-size", "256 b");
336+
options.set("write-buffer-spillable", "false");
335337
options.set("page-size", "32 b");
336338

337339
FileStoreTable fileStoreTable =

0 commit comments

Comments
 (0)