Skip to content

Commit 766d08b

Browse files
authored
[core] Limit max parallelism for postpone batch write fixed bucket to avoid too many buckets (#7212)
1 parent 773dcf6 commit 766d08b

File tree

4 files changed

+30
-8
lines changed

4 files changed

+30
-8
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,6 @@
152152
<td>String</td>
153153
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
154154
</tr>
155-
<tr>
156-
<td><h5>table-read.sequence-number.enabled</h5></td>
157-
<td style="word-wrap: break-word;">false</td>
158-
<td>Boolean</td>
159-
<td>Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.</td>
160-
</tr>
161155
<tr>
162156
<td><h5>changelog.num-retained.max</h5></td>
163157
<td style="word-wrap: break-word;">(none)</td>
@@ -1031,6 +1025,12 @@
10311025
<td>Boolean</td>
10321026
<td>Whether to write the data into fixed bucket for batch writing a postpone bucket table.</td>
10331027
</tr>
1028+
<tr>
1029+
<td><h5>postpone.batch-write-fixed-bucket.max-parallelism</h5></td>
1030+
<td style="word-wrap: break-word;">2048</td>
1031+
<td>Integer</td>
1032+
<td>The number of partitions for global index.</td>
1033+
</tr>
10341034
<tr>
10351035
<td><h5>postpone.default-bucket-num</h5></td>
10361036
<td style="word-wrap: break-word;">1</td>
@@ -1314,6 +1314,12 @@
13141314
<td>Duration</td>
13151315
<td>The delay duration of stream read when scan incremental snapshots.</td>
13161316
</tr>
1317+
<tr>
1318+
<td><h5>table-read.sequence-number.enabled</h5></td>
1319+
<td style="word-wrap: break-word;">false</td>
1320+
<td>Boolean</td>
1321+
<td>Whether to include the _SEQUENCE_NUMBER field when reading the audit_log or binlog system tables. This is only valid for primary key tables.</td>
1322+
</tr>
13171323
<tr>
13181324
<td><h5>tag.automatic-completion</h5></td>
13191325
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2164,6 +2164,12 @@ public InlineElement getDescription() {
21642164
.withDescription(
21652165
"Whether to write the data into fixed bucket for batch writing a postpone bucket table.");
21662166

2167+
public static final ConfigOption<Integer> POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM =
2168+
key("postpone.batch-write-fixed-bucket.max-parallelism")
2169+
.intType()
2170+
.defaultValue(2048)
2171+
.withDescription("The number of partitions for global index.");
2172+
21672173
public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
21682174
key("postpone.default-bucket-num")
21692175
.intType()
@@ -3401,6 +3407,10 @@ public boolean postponeBatchWriteFixedBucket() {
34013407
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
34023408
}
34033409

3410+
public int postponeBatchWriteFixedBucketMaxParallelism() {
3411+
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM);
3412+
}
3413+
34043414
public int postponeDefaultBucketNum() {
34053415
return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
34063416
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ public void open() throws Exception {
7272
super.open();
7373

7474
int sinkParallelism = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
75-
this.defaultNumBuckets = sinkParallelism <= 0 ? 1 : sinkParallelism;
75+
sinkParallelism = sinkParallelism <= 0 ? 1 : sinkParallelism;
76+
this.defaultNumBuckets =
77+
Math.min(
78+
sinkParallelism,
79+
table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism());
7680

7781
TableSchema schema = table.schema();
7882
this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ case class PaimonSparkWriter(
123123
val postponePartitionBucketComputer: Option[BinaryRow => Integer] =
124124
if (postponeBatchWriteFixedBucket) {
125125
val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
126-
val defaultPostponeNumBuckets = withInitBucketCol.rdd.getNumPartitions
126+
val defaultPostponeNumBuckets = Math.min(
127+
withInitBucketCol.rdd.getNumPartitions,
128+
table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism)
127129
Some((p: BinaryRow) => knownNumBuckets.getOrDefault(p, defaultPostponeNumBuckets))
128130
} else {
129131
None

0 commit comments

Comments
 (0)