Skip to content

Commit cf025ab

Browse files
committed
[flink] disable clustering in writing if incremental clustering enabled
1 parent 2e85151 commit cf025ab

File tree

2 files changed

+7
-0
lines changed

2 files changed

+7
-0
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,15 @@ public FlinkSinkBuilder parallelism(@Nullable Integer parallelism) {
137137

138138
/** Clustering the input data if possible. */
139139
public FlinkSinkBuilder clusteringIfPossible(
140+
boolean clusteringIncremental,
140141
String clusteringColumns,
141142
String clusteringStrategy,
142143
boolean sortInCluster,
143144
int sampleFactor) {
145+
// If incremental clustering is enabled, the clustering will be skipped.
146+
if (clusteringIncremental) {
147+
return this;
148+
}
144149
// The clustering will be skipped if the clustering columns are empty or the execution
145150
// mode is STREAMING or the table type is illegal.
146151
List<String> columns = CoreOptions.clusteringColumns(clusteringColumns);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
4646
import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
47+
import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
4748
import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
4849
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
4950
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
@@ -136,6 +137,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
136137
dataStream.getExecutionEnvironment(),
137138
dataStream.getTransformation()))
138139
.clusteringIfPossible(
140+
conf.get(CLUSTERING_INCREMENTAL),
139141
conf.get(CLUSTERING_COLUMNS),
140142
conf.get(CLUSTERING_STRATEGY),
141143
conf.get(CLUSTERING_SORT_IN_CLUSTER),

0 commit comments

Comments
 (0)