From e0fb0c2f4a6843941bc3512cb5e446965114735f Mon Sep 17 00:00:00 2001 From: Mike Dias Date: Mon, 2 Mar 2026 21:06:57 +1100 Subject: [PATCH 1/5] Support per-partition buckets --- docs/docs/flink/procedures.md | 2 +- docs/docs/maintenance/rescale-bucket.md | 26 +- .../primary-key-table/data-distribution.md | 21 +- .../operation/AbstractFileStoreWrite.java | 36 ++- .../privilege/PrivilegedFileStoreTable.java | 8 + .../paimon/table/AbstractFileStoreTable.java | 8 +- .../table/AppendOnlyFileStoreTable.java | 9 +- .../paimon/table/DelegatedFileStoreTable.java | 6 + .../apache/paimon/table/FileStoreTable.java | 7 + .../table/PrimaryKeyFileStoreTable.java | 10 +- .../paimon/table/RescaleFileStoreTable.java | 92 +++++++ .../sink/FixedBucketRowKeyExtractor.java | 37 ++- .../table/sink/FixedBucketWriteSelector.java | 7 +- .../table/sink/PartitionBucketMapping.java | 130 ++++++++++ .../paimon/table/sink/RowKeyExtractor.java | 22 +- .../paimon/operation/FileStoreCommitTest.java | 42 +++ .../sink/FixedBucketRowKeyExtractorTest.java | 47 +++- .../sink/PartitionBucketMappingTest.java | 66 +++++ .../CdcRecordKeyAndBucketExtractorTest.java | 21 +- .../paimon/flink/action/CompactAction.java | 15 +- .../paimon/flink/action/RescaleAction.java | 6 +- .../paimon/flink/sink/FlinkSinkBuilder.java | 5 +- .../flink/sink/RowDataChannelComputer.java | 11 +- .../sink/RowDataKeyAndBucketExtractor.java | 57 ---- .../paimon/flink/ReadWriteTableITCase.java | 34 ++- .../flink/action/RescaleActionITCase.java | 244 ++++++++++++++++++ .../sink/RowDataChannelComputerTest.java | 8 +- 27 files changed, 821 insertions(+), 156 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/RescaleFileStoreTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/sink/PartitionBucketMapping.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/sink/PartitionBucketMappingTest.java delete mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataKeyAndBucketExtractor.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RescaleActionITCase.java diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md index 1e34926ae385..766cb5d105a9 100644 --- a/docs/docs/flink/procedures.md +++ b/docs/docs/flink/procedures.md @@ -847,7 +847,7 @@ All available procedures are listed below. CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition', `scan_parallelism` => scan_parallelism, `sink_parallelism` => sink_parallelism) - Rescale one partition of a table. Arguments: + Rescale one partition of a table. For partitioned tables, different partitions can have different bucket counts after rescaling. Arguments:
  • table: The target table identifier. Cannot be empty.
  • bucket_num: Resulting bucket number after rescale. The default value of argument bucket_num is the current bucket number of the table. Cannot be empty for postpone bucket tables.
  • partition: What partition to rescale. For partitioned table this argument cannot be empty.
  • diff --git a/docs/docs/maintenance/rescale-bucket.md b/docs/docs/maintenance/rescale-bucket.md index 1a304525345e..74889941f7ff 100644 --- a/docs/docs/maintenance/rescale-bucket.md +++ b/docs/docs/maintenance/rescale-bucket.md @@ -45,14 +45,17 @@ Please note that - `ALTER TABLE` only modifies the table's metadata and will **NOT** reorganize or reformat existing data. Reorganize existing data must be achieved by `INSERT OVERWRITE`. - Rescale bucket number does not influence the read and running write jobs. -- Once the bucket number is changed, any newly scheduled `INSERT INTO` jobs which write to without-reorganized - existing table/partition will throw a `TableException` with message like +- **Partitioned tables** support per-partition bucket counts. Each partition retains its own bucket + count from its data files, and the new bucket count only applies to newly created partitions or partitions that + have been reorganized with `INSERT OVERWRITE`. +- **Unpartitioned tables** require a full rescale before writing. If you change the bucket number and attempt + to write without reorganizing the data first, a `RuntimeException` will be thrown: ```text - Try to write table/partition ... with a new bucket num ..., + Try to write table with a new bucket num ..., but the previous bucket num is ... Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first. ``` -- For partitioned table, it is possible to have different bucket number for different partitions. *E.g.* +- For partitioned tables, it is possible to have different bucket numbers for different partitions. *E.g.* ```sql ALTER TABLE my_table SET ('bucket' = '4'); INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01') @@ -62,6 +65,8 @@ Please note that INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02') SELECT * FROM ...; ``` + After these operations, partition `dt=2022-01-01` uses 4 buckets, `dt=2022-01-02` uses 8 buckets, and any + new partitions will use the latest table-level default (8 buckets in this case). - During overwrite period, make sure there are no other jobs writing the same table/partition. ## Use Case @@ -121,8 +126,12 @@ and the job's latency keeps increasing. To improve the data freshness, users can -- scaling out ALTER TABLE verified_orders SET ('bucket' = '32'); ``` -- Switch to the batch mode and overwrite the current partition(s) to which the streaming job is writing +- Use the `rescale` procedure or switch to batch mode and overwrite the partition(s) that need rescaling ```sql + -- Option 1: Use the rescale procedure (recommended) + CALL sys.rescale(`table` => 'default.verified_orders', `bucket_num` => 32, `partition` => 'dt=2022-06-22'); + + -- Option 2: Manual batch overwrite SET 'execution.runtime-mode' = 'batch'; -- suppose today is 2022-06-22 -- case 1: there is no late event which updates the historical partitions, thus overwrite today's partition is enough @@ -142,8 +151,11 @@ and the job's latency keeps increasing. To improve the data freshness, users can FROM verified_orders WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22'); ``` -- After overwrite job has finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint -( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) ) +- After the overwrite job has finished, switch back to streaming mode. The parallelism can be increased alongside + the bucket number to restore the streaming job from the savepoint + ( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) ). + Note that for partitioned tables, each partition retains its own bucket count, so only the rescaled partitions + are affected. ```sql SET 'execution.runtime-mode' = 'streaming'; SET 'execution.savepoint.path' = ; diff --git a/docs/docs/primary-key-table/data-distribution.md b/docs/docs/primary-key-table/data-distribution.md index 8b74724ff0e4..30ef3a1ddef9 100644 --- a/docs/docs/primary-key-table/data-distribution.md +++ b/docs/docs/primary-key-table/data-distribution.md @@ -1,8 +1,10 @@ --- title: "Data Distribution" -sidebar_position: 2 +weight: 2 +type: docs +aliases: +- /primary-key-table/data-distribution.html --- -