Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@
<td><h5>write-buffer-spillable</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Whether the write buffer can be spillable. Enabled by default when using object storage.</td>
<td>Whether the write buffer can be spillable. Enabled by default when using object storage or when 'target-file-size' is greater than 'write-buffer-size'.</td>
</tr>
<tr>
<td><h5>write-manifest-cache</h5></td>
Expand Down
11 changes: 8 additions & 3 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public class CoreOptions implements Serializable {
.booleanType()
.noDefaultValue()
.withDescription(
"Whether the write buffer can be spillable. Enabled by default when using object storage.");
"Whether the write buffer can be spillable. Enabled by default when using object storage or when 'target-file-size' is greater than 'write-buffer-size'.");

public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
key("write-buffer-for-append")
Expand Down Expand Up @@ -1957,9 +1957,14 @@ public long writeBufferSize() {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}

public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreaming) {
public boolean writeBufferSpillable(
boolean usingObjectStore, boolean isStreaming, boolean hasPrimaryKey) {
// if not streaming mode, we turn spillable on by default.
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
return options.getOptional(WRITE_BUFFER_SPILLABLE)
.orElse(
usingObjectStore
|| !isStreaming
|| targetFileSize(hasPrimaryKey) > writeBufferSize());
}

public MemorySize writeBufferSpillDiskSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected RecordWriter<InternalRow> createWriter(
pathFactory.createDataFilePathFactory(partition, bucket),
restoreIncrement,
options.useWriteBufferForAppend() || forceBufferSpill,
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode)
options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, false)
|| forceBufferSpill,
options.fileCompression(),
options.spillCompressOptions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ protected MergeTreeWriter createWriter(

@VisibleForTesting
public boolean bufferSpillable() {
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode, true);
}

private CompactManager createCompactManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,54 @@ public void testNoBuffer() throws Exception {
writer.close();
}

@Test
public void tesWriteBufferSpillAutoEnabled() {
HashMap<String, String> map = new HashMap<>();
// This is the default behavior,no object store and streaming mode.
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
.isFalse();

// Using object store.
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(true, true, false))
.isTrue();

// Batch mode.
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, false))
.isTrue();

// Append only table.
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "200 MB");
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, false))
.isTrue();

// Primary key table.
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "100 MB");
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, false, true))
.isTrue();

// targetFileSize is greater than write buffer size.
map.clear();
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
.isTrue();

// target-file-size is smaller than write-buffer-size.
map.clear();
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "1 b");
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "2 b");
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
.isFalse();

// Set to false manually.
map.clear();
map.put(CoreOptions.TARGET_FILE_SIZE.key(), "2 b");
map.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "1 b");
map.put(CoreOptions.WRITE_BUFFER_SPILLABLE.key(), "false");
Assertions.assertThat(CoreOptions.fromMap(map).writeBufferSpillable(false, true, false))
.isFalse();
}

@Test
public void testMultipleFlush() throws Exception {
AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void testPrimaryKeyTableMetrics() throws Exception {
Options options = new Options();
options.set("bucket", "1");
options.set("write-buffer-size", "256 b");
options.set("write-buffer-spillable", "false");
options.set("page-size", "32 b");

FileStoreTable table =
Expand Down Expand Up @@ -332,6 +333,7 @@ public void testNumWritersMetric() throws Exception {
Options options = new Options();
options.set("bucket", "1");
options.set("write-buffer-size", "256 b");
options.set("write-buffer-spillable", "false");
options.set("page-size", "32 b");

FileStoreTable fileStoreTable =
Expand Down
Loading