Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ All available procedures are listed below.
CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition', `scan_parallelism` => scan_parallelism, `sink_parallelism` => sink_parallelism)
</td>
<td>
Rescale one partition of a table. Arguments:
Rescale one partition of a table. For partitioned tables, different partitions can have different bucket counts after rescaling. Arguments:
<li>table: The target table identifier. Cannot be empty.</li>
<li>bucket_num: Resulting bucket number after rescale. The default value of argument bucket_num is the current bucket number of the table. Cannot be empty for postpone bucket tables.</li>
<li>partition: What partition to rescale. For partitioned table this argument cannot be empty.</li>
Expand Down
26 changes: 19 additions & 7 deletions docs/content/maintenance/rescale-bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ Please note that
- `ALTER TABLE` only modifies the table's metadata and will **NOT** reorganize or reformat existing data.
Reorganize existing data must be achieved by `INSERT OVERWRITE`.
- Rescale bucket number does not influence the read and running write jobs.
- Once the bucket number is changed, any newly scheduled `INSERT INTO` jobs which write to without-reorganized
existing table/partition will throw a `TableException` with message like
- **Partitioned tables** support per-partition bucket counts. Each partition retains its own bucket
count from its data files, and the new bucket count only applies to newly created partitions or partitions that
have been reorganized with `INSERT OVERWRITE`.
- **Unpartitioned tables** require a full rescale before writing. If you change the bucket number and attempt
to write without reorganizing the data first, a `RuntimeException` will be thrown:
```text
Try to write table/partition ... with a new bucket num ...,
Try to write table with a new bucket num ...,
but the previous bucket num is ... Please switch to batch mode,
and perform INSERT OVERWRITE to rescale current data layout first.
```
- For partitioned table, it is possible to have different bucket number for different partitions. *E.g.*
- For partitioned tables, it is possible to have different bucket numbers for different partitions. *E.g.*
```sql
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
Expand All @@ -64,6 +67,8 @@ Please note that
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;
```
After these operations, partition `dt=2022-01-01` uses 4 buckets, `dt=2022-01-02` uses 8 buckets, and any
new partitions will use the latest table-level default (8 buckets in this case).
- During overwrite period, make sure there are no other jobs writing the same table/partition.

## Use Case
Expand Down Expand Up @@ -123,8 +128,12 @@ and the job's latency keeps increasing. To improve the data freshness, users can
-- scaling out
ALTER TABLE verified_orders SET ('bucket' = '32');
```
- Switch to the batch mode and overwrite the current partition(s) to which the streaming job is writing
- Use the `rescale` procedure or switch to batch mode and overwrite the partition(s) that need rescaling
```sql
-- Option 1: Use the rescale procedure (recommended)
CALL sys.rescale(`table` => 'default.verified_orders', `bucket_num` => 32, `partition` => 'dt=2022-06-22');

-- Option 2: Manual batch overwrite
SET 'execution.runtime-mode' = 'batch';
-- suppose today is 2022-06-22
-- case 1: there is no late event which updates the historical partitions, thus overwrite today's partition is enough
Expand All @@ -144,8 +153,11 @@ and the job's latency keeps increasing. To improve the data freshness, users can
FROM verified_orders
WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');
```
- After overwrite job has finished, switch back to streaming mode. And now, the parallelism can be increased alongside with bucket number to restore the streaming job from the savepoint
( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) )
- After the overwrite job has finished, switch back to streaming mode. The parallelism can be increased alongside
the bucket number to restore the streaming job from the savepoint
( see [Start a SQL Job from a savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint) ).
Note that for partitioned tables, each partition retains its own bucket count, so only the rescaled partitions
are affected.
```sql
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.savepoint.path' = <savepointPath>;
Expand Down
5 changes: 4 additions & 1 deletion docs/content/primary-key-table/data-distribution.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ A bucket is the smallest storage unit for reads and writes, each bucket director
Configure a bucket greater than 0, using Fixed Bucket mode, according to `Math.abs(key_hashcode % numBuckets)` to compute
the bucket of record.

Rescaling buckets can only be done through offline processes, see [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}).
For partitioned tables, each partition can have its own bucket count. After a rescale operation, existing
partitions retain their original bucket count while newly created partitions use the updated table-level default.

Rescaling buckets requires an offline process. See [Rescale Bucket]({{< ref "/maintenance/rescale-bucket" >}}).
A too large number of buckets leads to too many small files, and a too small number of buckets leads to poor write performance.

## Dynamic Bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,21 @@ public int totalBuckets() {
}

public PartitionEntry merge(PartitionEntry entry) {
// Use the totalBuckets from the most recently created file. This correctly handles
// the case where a partition has been overwritten with a different bucket count: the
// newer files carry the new totalBuckets, and their creation time is always later.
long newLastCreationTime = Math.max(lastFileCreationTime, entry.lastFileCreationTime);
int newTotalBuckets =
lastFileCreationTime >= entry.lastFileCreationTime
? totalBuckets
: entry.totalBuckets;
return new PartitionEntry(
partition,
recordCount + entry.recordCount,
fileSizeInBytes + entry.fileSizeInBytes,
fileCount + entry.fileCount,
Math.max(lastFileCreationTime, entry.lastFileCreationTime),
entry.totalBuckets);
newLastCreationTime,
newTotalBuckets);
}

public Partition toPartition(InternalRowPartitionComputer computer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,20 +508,28 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) {
totalBuckets = restoredTotalBuckets;
}
if (!ignoreNumBucketCheck && totalBuckets != numBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
partInfo, numBuckets, totalBuckets));
if (partitionType.getFieldCount() > 0) {
// For partitioned tables, allow per-partition bucket counts.
// The partition's existing bucket count takes precedence over the
// table-level default. This supports rescale operations where different
// partitions may have different bucket counts.
LOG.info(
"Partition {} uses {} buckets (table default: {}). "
+ "Accepting per-partition bucket count.",
getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition),
totalBuckets,
numBuckets);
} else {
throw new RuntimeException(
String.format(
"Try to write table with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
numBuckets, totalBuckets));
}
}
return restored;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.utils.SnapshotManager;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
Expand All @@ -38,6 +38,7 @@ public class FileSystemWriteRestore implements WriteRestore {
private final SnapshotManager snapshotManager;
private final FileStoreScan scan;
private final IndexFileHandler indexFileHandler;
private final PartitionBucketMapping partitionBucketMapping;

public FileSystemWriteRestore(
CoreOptions options,
Expand All @@ -52,6 +53,7 @@ public FileSystemWriteRestore(
this.scan.dropStats();
}
}
this.partitionBucketMapping = PartitionBucketMapping.loadFromScan(scan, options.bucket());
}

@Override
Expand All @@ -75,10 +77,12 @@ public RestoreFiles restoreFiles(
return RestoreFiles.empty();
}

List<DataFileMeta> restoreFiles = new ArrayList<>();
List<ManifestEntry> entries =
scan.withSnapshot(snapshot).withPartitionBucket(partition, bucket).plan().files();
Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
List<DataFileMeta> restoreFiles = WriteRestore.extractDataFiles(entries);

Integer totalBuckets =
WriteRestore.extractTotalBuckets(entries, partition, partitionBucketMapping);

IndexFileMeta dynamicBucketIndex = null;
if (scanDynamicBucketIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.PartitionBucketMapping;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

/** Restore for write to restore data files by partition and bucket from file system. */
Expand All @@ -37,9 +39,44 @@ RestoreFiles restoreFiles(
boolean scanDynamicBucketIndex,
boolean scanDeleteVectorsIndex);

/**
* Resolves the {@code totalBuckets} for a (partition, bucket) pair given the manifest entries
* for that bucket and the table's partition-bucket mapping.
*
* <ul>
* <li>Non-empty bucket: use the value stamped on the existing data files so that
* committer-side bucket-count mismatch detection (e.g. rescale-without-overwrite) still
* fires.
* <li>Empty bucket on a partitioned table: look up the per-partition override in {@code
* mapping}; returns {@code null} if the partition uses the table default.
* <li>Empty bucket on an unpartitioned table: returns {@code null} so the write path falls
* back to {@code numBuckets} and the committer-side check still fires.
* </ul>
*/
@Nullable
static Integer extractDataFiles(List<ManifestEntry> entries, List<DataFileMeta> dataFiles) {
static Integer extractTotalBuckets(
List<ManifestEntry> entries, BinaryRow partition, PartitionBucketMapping mapping) {
if (!entries.isEmpty()) {
return entries.get(0).totalBuckets();
}
if (partition.getFieldCount() > 0) {
return mapping.resolveNumBuckets(partition);
}
return null;
}

/**
* Extracts the {@link DataFileMeta} list from the given manifest entries, validating that all
* entries agree on {@code totalBuckets}.
*
* @param entries manifest entries for a single (partition, bucket) pair
* @return the list of data files; empty if {@code entries} is empty
* @throws RuntimeException if entries carry inconsistent {@code totalBuckets} values, which
* indicates a corrupted manifest
*/
static List<DataFileMeta> extractDataFiles(List<ManifestEntry> entries) {
Integer totalBuckets = null;
List<DataFileMeta> dataFiles = new ArrayList<>();
for (ManifestEntry entry : entries) {
if (totalBuckets != null && totalBuckets != entry.totalBuckets()) {
throw new RuntimeException(
Expand All @@ -50,6 +87,6 @@ static Integer extractDataFiles(List<ManifestEntry> entries, List<DataFileMeta>
totalBuckets = entry.totalBuckets();
dataFiles.add(entry.file());
}
return totalBuckets;
return dataFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
Expand Down Expand Up @@ -252,6 +253,13 @@ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}

@Override
public TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
}

@Override
public TableCommitImpl newCommit(String commitUser) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import org.apache.paimon.table.sink.FixedBucketWriteSelector;
import org.apache.paimon.table.sink.PartitionBucketMapping;
import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
Expand Down Expand Up @@ -229,7 +230,9 @@ public Optional<Statistics> statistics() {
public Optional<WriteSelector> newWriteSelector() {
switch (bucketMode()) {
case HASH_FIXED:
return Optional.of(new FixedBucketWriteSelector(schema()));
return Optional.of(
new FixedBucketWriteSelector(
schema(), PartitionBucketMapping.loadFromTable(this)));
case BUCKET_UNAWARE:
case POSTPONE_MODE:
return Optional.empty();
Expand Down Expand Up @@ -257,7 +260,8 @@ protected CatalogEnvironment newCatalogEnvironment(String branch) {
public RowKeyExtractor createRowKeyExtractor() {
switch (bucketMode()) {
case HASH_FIXED:
return new FixedBucketRowKeyExtractor(schema());
return new FixedBucketRowKeyExtractor(
schema(), PartitionBucketMapping.loadFromTable(this));
case HASH_DYNAMIC:
case KEY_DYNAMIC:
return new DynamicBucketRowKeyExtractor(schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.AppendOnlySplitGenerator;
import org.apache.paimon.table.source.AppendTableRead;
Expand Down Expand Up @@ -140,11 +141,17 @@ public TableWriteImpl<InternalRow> newWrite(String commitUser) {

@Override
public TableWriteImpl<InternalRow> newWrite(String commitUser, @Nullable Integer writeId) {
return newWrite(commitUser, writeId, createRowKeyExtractor());
}

@Override
public TableWriteImpl<InternalRow> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
BaseAppendFileStoreWrite writer = store().newWrite(commitUser, writeId);
return new TableWriteImpl<>(
rowType(),
writer,
createRowKeyExtractor(),
rowKeyExtractor,
(record, rowKind) -> {
Preconditions.checkState(
rowKind.isAdd(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ public TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId)
return wrapped.newWrite(commitUser, writeId);
}

@Override
public TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {
return wrapped.newWrite(commitUser, writeId, rowKeyExtractor);
}

@Override
public TableCommitImpl newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ default Optional<String> comment() {

TableWriteImpl<?> newWrite(String commitUser, @Nullable Integer writeId);

/**
* Create a new write with a custom {@link RowKeyExtractor}. This is useful for scenarios like
* rescaling where the bucket assignment logic needs to be overridden.
*/
TableWriteImpl<?> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor);

@Override
TableCommitImpl newCommit(String commitUser);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
Expand Down Expand Up @@ -157,11 +158,18 @@ public TableWriteImpl<KeyValue> newWrite(String commitUser) {

@Override
public TableWriteImpl<KeyValue> newWrite(String commitUser, @Nullable Integer writeId) {
return newWrite(commitUser, writeId, createRowKeyExtractor());
}

@Override
public TableWriteImpl<KeyValue> newWrite(
String commitUser, @Nullable Integer writeId, RowKeyExtractor rowKeyExtractor) {

KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
store().newWrite(commitUser, writeId),
createRowKeyExtractor(),
rowKeyExtractor,
(record, rowKind) ->
kv.replace(
record.primaryKey(),
Expand Down
Loading
Loading