File tree Expand file tree Collapse file tree 2 files changed +7
-0
lines changed
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink Expand file tree Collapse file tree 2 files changed +7
-0
lines changed Original file line number Diff line number Diff line change @@ -137,10 +137,15 @@ public FlinkSinkBuilder parallelism(@Nullable Integer parallelism) {
137137
138138 /** Clustering the input data if possible. */
139139 public FlinkSinkBuilder clusteringIfPossible (
140+ boolean clusteringIncremental ,
140141 String clusteringColumns ,
141142 String clusteringStrategy ,
142143 boolean sortInCluster ,
143144 int sampleFactor ) {
145+ // If incremental clustering is enabled, the clustering will be skipped.
146+ if (clusteringIncremental ) {
147+ return this ;
148+ }
144149 // The clustering will be skipped if the clustering columns are empty or the execution
145150 // mode is STREAMING or the table type is illegal.
146151 List <String > columns = CoreOptions .clusteringColumns (clusteringColumns );
Original file line number Diff line number Diff line change 4444
4545import static org .apache .paimon .CoreOptions .CHANGELOG_PRODUCER ;
4646import static org .apache .paimon .CoreOptions .CLUSTERING_COLUMNS ;
47+ import static org .apache .paimon .CoreOptions .CLUSTERING_INCREMENTAL ;
4748import static org .apache .paimon .CoreOptions .CLUSTERING_STRATEGY ;
4849import static org .apache .paimon .CoreOptions .LOG_CHANGELOG_MODE ;
4950import static org .apache .paimon .CoreOptions .MERGE_ENGINE ;
@@ -136,6 +137,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
136137 dataStream .getExecutionEnvironment (),
137138 dataStream .getTransformation ()))
138139 .clusteringIfPossible (
140+ conf .get (CLUSTERING_INCREMENTAL ),
139141 conf .get (CLUSTERING_COLUMNS ),
140142 conf .get (CLUSTERING_STRATEGY ),
141143 conf .get (CLUSTERING_SORT_IN_CLUSTER ),
You can’t perform that action at this time.
0 commit comments