Skip to content

Commit a787f52

Browse files
committed
[core] Enable clustering before write phase for incremental clustering table
1 parent 917ec03 commit a787f52

File tree

5 files changed

+58
-8
lines changed

5 files changed

+58
-8
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@
5656
<td>Boolean</td>
5757
<td>Write blob field using blob descriptor rather than blob bytes.</td>
5858
</tr>
59+
<tr>
60+
<td><h5>blob-descriptor-field</h5></td>
61+
<td style="word-wrap: break-word;">(none)</td>
62+
<td>String</td>
63+
<td>Comma-separated BLOB field names to store as serialized BlobDescriptor bytes inline in data files.</td>
64+
</tr>
5965
<tr>
6066
<td><h5>blob-field</h5></td>
6167
<td style="word-wrap: break-word;">(none)</td>
@@ -68,12 +74,6 @@
6874
<td>Boolean</td>
6975
<td>Whether to consider blob file size as a factor when performing scan splitting.</td>
7076
</tr>
71-
<tr>
72-
<td><h5>blob-descriptor-field</h5></td>
73-
<td style="word-wrap: break-word;">(none)</td>
74-
<td>String</td>
75-
<td>Comma-separated BLOB field names to store as serialized BlobDescriptor bytes inline in data files.</td>
76-
</tr>
7777
<tr>
7878
<td><h5>blob.target-file-size</h5></td>
7979
<td style="word-wrap: break-word;">(none)</td>
@@ -200,6 +200,12 @@
200200
<td>Boolean</td>
201201
<td>Whether enable incremental clustering.</td>
202202
</tr>
203+
<tr>
204+
<td><h5>clustering.pre-write.enabled</h5></td>
205+
<td style="word-wrap: break-word;">false</td>
206+
<td>Boolean</td>
207+
<td>Whether enable perform clustering before write phase when incremental clustering is enabled.</td>
208+
</tr>
203209
<tr>
204210
<td><h5>clustering.strategy</h5></td>
205211
<td style="word-wrap: break-word;">"auto"</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,13 @@ public InlineElement getDescription() {
20712071
"The duration after which a partition without new updates is considered a historical partition. "
20722072
+ "Historical partitions will be automatically fully clustered during the cluster operation.");
20732073

2074+
public static final ConfigOption<Boolean> CLUSTERING_PRE_WRITE_ENABLED =
2075+
key("clustering.pre-write.enabled")
2076+
.booleanType()
2077+
.defaultValue(false)
2078+
.withDescription(
2079+
"Whether enable perform clustering before write phase when incremental clustering is enabled.");
2080+
20742081
@Immutable
20752082
public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
20762083
key("row-tracking.enabled")
@@ -3402,6 +3409,10 @@ public boolean clusteringIncrementalEnabled() {
34023409
return options.get(CLUSTERING_INCREMENTAL);
34033410
}
34043411

3412+
public boolean preClusteringEnabled() {
3413+
return options.get(CLUSTERING_PRE_WRITE_ENABLED);
3414+
}
3415+
34053416
public boolean bucketClusterEnabled() {
34063417
return !bucketAppendOrdered()
34073418
&& !deletionVectorsEnabled()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
4242
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
4343
import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
44+
import static org.apache.paimon.CoreOptions.CLUSTERING_PRE_WRITE_ENABLED;
4445
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
4546
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
4647
import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
@@ -122,7 +123,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
122123
new DataStream<>(
123124
dataStream.getExecutionEnvironment(),
124125
dataStream.getTransformation()));
125-
if (!conf.get(CLUSTERING_INCREMENTAL)) {
126+
if (!conf.get(CLUSTERING_INCREMENTAL)
127+
|| conf.get(CLUSTERING_PRE_WRITE_ENABLED)) {
126128
builder.clusteringIfPossible(
127129
conf.get(CLUSTERING_COLUMNS),
128130
conf.get(CLUSTERING_STRATEGY),

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForAppendTableITCase.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,34 @@ public void testRangePartitionAndSortWithHilbertStrategy() throws Exception {
247247
Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
248248
}
249249

250+
@Test
251+
public void testClusteringPreWriteEnabled() throws Exception {
252+
List<Row> inputRows = generateSinkRows();
253+
String id = TestValuesTableFactory.registerData(inputRows);
254+
batchSql(
255+
"CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 INT, col4 INT) WITH "
256+
+ "('connector'='values', 'bounded'='true', 'data-id'='%s')",
257+
id);
258+
batchSql(
259+
"INSERT INTO test_table /*+ OPTIONS('sink.clustering.by-columns' = 'col1', "
260+
+ "'sink.parallelism' = '10', 'sink.clustering.strategy' = 'zorder', "
261+
+ "'clustering.incremental' = 'true', 'clustering.pre-write.enabled' = 'true') */ "
262+
+ "SELECT * FROM test_source");
263+
List<Row> sinkRows = batchSql("SELECT * FROM test_table");
264+
assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
265+
FileStoreTable testStoreTable = paimonTable("test_table");
266+
PredicateBuilder predicateBuilder = new PredicateBuilder(testStoreTable.rowType());
267+
Predicate predicate = predicateBuilder.between(0, 100, 200);
268+
List<ManifestEntry> files = testStoreTable.store().newScan().plan().files();
269+
assertThat(files.size()).isEqualTo(10);
270+
List<ManifestEntry> filesFilter =
271+
((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
272+
.withFilter(predicate)
273+
.plan()
274+
.files();
275+
Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
276+
}
277+
250278
private List<Row> generateSinkRows() {
251279
List<Row> sinkRows = new ArrayList<>();
252280
Random random = new Random();

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,10 @@ case class PaimonSparkWriter(
303303
}
304304
}
305305
val clusteringColumns = coreOptions.clusteringColumns()
306-
if ((!coreOptions.clusteringIncrementalEnabled()) && (!clusteringColumns.isEmpty)) {
306+
if (
307+
(!coreOptions.clusteringIncrementalEnabled() || coreOptions
308+
.preClusteringEnabled()) && (!clusteringColumns.isEmpty)
309+
) {
307310
val strategy = coreOptions.clusteringStrategy(tableSchema.fields().size())
308311
val sorter = TableSorter.getSorter(table, strategy, clusteringColumns)
309312
input = sorter.sort(data)

0 commit comments

Comments
 (0)