Skip to content

Commit 1046fe0

Browse files
committed
[flink] Add default parallelism for source and sink in rescale procedure, also make them configurable
1 parent a968660 commit 1046fe0

File tree

4 files changed

+73
-7
lines changed

4 files changed

+73
-7
lines changed

docs/content/flink/procedures.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,13 +757,15 @@ All available procedures are listed below.
757757
<tr>
758758
<td>rescale</td>
759759
<td>
760-
CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition')
760+
CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition', `source_parallelism` => 'source_parallelism', `sink_parallelism` => 'sink_parallelism')
761761
</td>
762762
<td>
763763
Rescale one partition of a table. Arguments:
764764
<li>identifier: The target table identifier. Cannot be empty.</li>
765765
<li>bucket_num: Resulting bucket number after rescale. The default value of argument bucket_num is the current bucket number of the table. Cannot be empty for postpone bucket tables.</li>
766766
<li>partition: What partition to rescale. For partitioned table this argument cannot be empty.</li>
767+
<li>source_parallelism: Parallelism of source operator. The default value is the current bucket number of the partition.</li>
768+
<li>partition: Parallelism of sink operator. The default value is equal to bucket_num.</li>
767769
</td>
768770
<td>
769771
CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16, `partition` => 'dt=20250217,hh=08')

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions;
2222
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
2323
import org.apache.paimon.flink.source.FlinkSourceBuilder;
24+
import org.apache.paimon.manifest.ManifestEntry;
2425
import org.apache.paimon.partition.PartitionPredicate;
2526
import org.apache.paimon.predicate.Predicate;
2627
import org.apache.paimon.table.BucketMode;
@@ -38,13 +39,16 @@
3839
import javax.annotation.Nullable;
3940

4041
import java.util.HashMap;
42+
import java.util.Iterator;
4143
import java.util.Map;
4244

4345
/** Action to rescale one partition of a table. */
4446
public class RescaleAction extends TableActionBase {
4547

4648
private @Nullable Integer bucketNum;
4749
private Map<String, String> partition = new HashMap<>();
50+
private @Nullable Integer sourceParallelism;
51+
private @Nullable Integer sinkParallelism;
4852

4953
public RescaleAction(String databaseName, String tableName, Map<String, String> catalogConfig) {
5054
super(databaseName, tableName, catalogConfig);
@@ -60,6 +64,16 @@ public RescaleAction withPartition(Map<String, String> partition) {
6064
return this;
6165
}
6266

67+
public RescaleAction withSourceParallelism(int sourceParallelism) {
68+
this.sourceParallelism = sourceParallelism;
69+
return this;
70+
}
71+
72+
public RescaleAction withSinkParallelism(int sinkParallelism) {
73+
this.sinkParallelism = sinkParallelism;
74+
return this;
75+
}
76+
6377
@Override
6478
public void build() throws Exception {
6579
Configuration flinkConf = new Configuration();
@@ -79,6 +93,8 @@ public void build() throws Exception {
7993
new FlinkSourceBuilder(fileStoreTable)
8094
.env(env)
8195
.sourceBounded(true)
96+
.sourceParallelism(
97+
sourceParallelism == null ? currentBucketNum() : sourceParallelism)
8298
.predicate(partitionPredicate)
8399
.build();
84100

@@ -92,12 +108,29 @@ public void build() throws Exception {
92108
}
93109
FileStoreTable rescaledTable =
94110
fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
95-
new FlinkSinkBuilder(rescaledTable).overwrite(partition).forRowData(source).build();
111+
new FlinkSinkBuilder(rescaledTable)
112+
.overwrite(partition)
113+
.parallelism(sinkParallelism == null ? bucketNum : sinkParallelism)
114+
.forRowData(source)
115+
.build();
96116
}
97117

98118
@Override
99119
public void run() throws Exception {
100120
build();
101121
env.execute("Rescale Postpone Bucket : " + table.fullName());
102122
}
123+
124+
private int currentBucketNum() {
125+
FileStoreTable fileStoreTable = (FileStoreTable) table;
126+
Iterator<ManifestEntry> it =
127+
fileStoreTable
128+
.newSnapshotReader()
129+
.withPartitionFilter(partition)
130+
.readFileIterator();
131+
Preconditions.checkArgument(
132+
it.hasNext(),
133+
"The specified partition does not have any data files. No need to rescale.");
134+
return it.next().totalBuckets();
135+
}
103136
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class RescaleActionFactory implements ActionFactory {
2626
public static final String IDENTIFIER = "rescale";
2727
private static final String BUCKET_NUM = "bucket_num";
2828
private static final String PARTITION = "partition";
29+
private static final String SOURCE_PARALLELISM = "source_parallelism";
30+
private static final String SINK_PARALLELISM = "sink_parallelism";
2931

3032
@Override
3133
public String identifier() {
@@ -43,10 +45,15 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
4345
if (params.has(BUCKET_NUM)) {
4446
action.withBucketNum(Integer.parseInt(params.get(BUCKET_NUM)));
4547
}
46-
4748
if (params.has(PARTITION)) {
4849
action.withPartition(getPartitions(params).get(0));
4950
}
51+
if (params.has(SOURCE_PARALLELISM)) {
52+
action.withSourceParallelism(Integer.parseInt(params.get(SOURCE_PARALLELISM)));
53+
}
54+
if (params.has(SINK_PARALLELISM)) {
55+
action.withSinkParallelism(Integer.parseInt(params.get(SINK_PARALLELISM)));
56+
}
5057

5158
return Optional.of(action);
5259
}
@@ -60,11 +67,16 @@ public void printHelp() {
6067
System.out.println(
6168
" rescale --warehouse <warehouse_path> --database <database_name> "
6269
+ "--table <table_name> [--bucket_num <bucket_num>] "
63-
+ "[--partition <partition>]");
70+
+ "[--partition <partition>] "
71+
+ "[--source_parallelism <source_parallelism>] [--sink_parallelism <sink_parallelism>]");
6472
System.out.println(
65-
"The default value of argument bucket_num is the current bucket number of the table. "
73+
"The default value of argument bucket_num is the value of 'bucket' option of the table. "
6674
+ "For postpone bucket tables, this argument must be specified.");
6775
System.out.println(
6876
"Argument partition must be specified if the table is a partitioned table.");
77+
System.out.println(
78+
"The default value of argument source_parallelism is the current bucket number of the partition.");
79+
System.out.println(
80+
"The default value of argument sink_parallelism is equal to bucket_num.");
6981
}
7082
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,26 @@ public class RescaleProcedure extends ProcedureBase {
3838
argument = {
3939
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
4040
@ArgumentHint(name = "bucket_num", type = @DataTypeHint("INT"), isOptional = true),
41-
@ArgumentHint(name = "partition", type = @DataTypeHint("STRING"), isOptional = true)
41+
@ArgumentHint(
42+
name = "partition",
43+
type = @DataTypeHint("STRING"),
44+
isOptional = true),
45+
@ArgumentHint(
46+
name = "source_parallelism",
47+
type = @DataTypeHint("INT"),
48+
isOptional = true),
49+
@ArgumentHint(
50+
name = "sink_parallelism",
51+
type = @DataTypeHint("INT"),
52+
isOptional = true)
4253
})
4354
public String[] call(
4455
ProcedureContext procedureContext,
4556
String tableId,
4657
@Nullable Integer bucketNum,
47-
@Nullable String partition)
58+
@Nullable String partition,
59+
@Nullable Integer sourceParallelism,
60+
@Nullable Integer sinkParallelism)
4861
throws Exception {
4962
Identifier identifier = Identifier.fromString(tableId);
5063
String databaseName = identifier.getDatabaseName();
@@ -57,6 +70,12 @@ public String[] call(
5770
if (partition != null) {
5871
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
5972
}
73+
if (sourceParallelism != null) {
74+
action.withSourceParallelism(sourceParallelism);
75+
}
76+
if (sinkParallelism != null) {
77+
action.withSinkParallelism(sinkParallelism);
78+
}
6079

6180
return execute(
6281
procedureContext, action, "Rescale Postpone Bucket : " + identifier.getFullName());

0 commit comments

Comments
 (0)