Skip to content

[core][flink] Introduce postpone bucket tables.#5095

Merged
JingsongLi merged 12 commits intoapache:masterfrom
tsreaper:postpone
Feb 20, 2025
Merged

[core][flink] Introduce postpone bucket tables.#5095
JingsongLi merged 12 commits intoapache:masterfrom
tsreaper:postpone

Conversation

@tsreaper
Copy link
Copy Markdown
Contributor

@tsreaper tsreaper commented Feb 17, 2025

Purpose

In this PR, we're introducing the postpone bucket mode.

Postpone bucket mode is configured by 'bucket' = '-2'. This mode aims to solve the difficulty to determine a fixed number of buckets and support different buckets for different partitions.

Currently, only Flink supports this mode.

When writing records into the table, all records will first be stored in the bucket-postpone directory of each partition and are not available to readers.

To move the records into the correct bucket and make them readable, you need to run a compaction job.

Finally, when you feel that the bucket number of some partition is too small, you can also run a rescale job.

Tests

  • org.apache.paimon.flink.PostponeBucketTableITCase

API and Format

Introduce a new storage format.

Documentation

Document is also added.

CALL sys.compact_postpone_bucket(`table` => 'identifier', `default_bucket_num` => bucket_num, `parallelism` => parallelism)
</td>
<td>
Compact postpone bucket tables, which distributes records in bucket--2 directory into real bucket directories. Arguments:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can name this directory to a meaningful name. Maybe bucket-postpone?


@Override
public void processElement(StreamRecord<InternalRow> element) throws Exception {
write.write(element.getValue(), BucketMode.POSTPONE_BUCKET);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only buckets, but we also want to avoid scanning old files in this mode. Currently, in order to obtain the latest sequenceNumbers, the primary key table still needs to scan old files. We need to consider removing this logic.

<tr>
<td>compact_postpone_bucket</td>
<td>
CALL sys.compact_postpone_bucket(`table` => 'identifier', `default_bucket_num` => bucket_num, `parallelism` => parallelism)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge this into compact procedure.

<tr>
<td>compact_postpone_bucket</td>
<td>
CALL sys.compact_postpone_bucket(`table` => 'identifier', `default_bucket_num` => bucket_num, `parallelism` => parallelism)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default_bucket_num can be a separate table option.

<tr>
<td>rescale_postpone_bucket</td>
<td>
CALL sys.rescale_postpone_bucket(`table` => 'identifier', `bucket_num` => bucket_num, `partition` => 'partition')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have an unify rescale procedure.


But please note that this may also cause data duplication.

## Postpone Bucket
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about also support append queue table?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there is no plan for append queue table.

import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.table.BucketMode;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should modify the description of the config BUCKET in this class.

checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() == ChangelogProducer.LOOKUP,
"Currently, postpone bucket tables (bucket = -2) only supports none or lookup changelog producer");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

sink.doCommit(written.union(sourcePair.getRight()), commitUser);
}

List<String> ret = new ArrayList<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List ret = new ArrayList<>(partitions.size());

public class RescalePostponeBucketAction extends TableActionBase {

private final int bucketNum;
private Map<String, String> partition = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not support multi partitions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this feature is needed, others can implement it later.


/**
* Action to compact postpone bucket tables, which distributes records in {@code bucket = -2}
* directory into real bucket directories.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the directory should not named bucket = -2, it should be a meaningful name.

<li>partition: What partition to rescale. For partitioned table this argument cannot be empty.</li>
</td>
<td>
CALL sys.rescale_postpone_bucket(`table` => 'default.T', `bucket_num` => 16, `partition` => 'dt=20250217,hh=08')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rescale


public boolean writeOnly() {
return options.get(WRITE_ONLY);
return options.get(WRITE_ONLY) || options.get(BUCKET) == BucketMode.POSTPONE_BUCKET;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this

this.onlyReadRealBuckets = onlyReadRealBuckets;

if (onlyReadRealBuckets) {
super.withBucketFilter(bucket -> bucket >= 0);
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will overwrite old bucket filter, or be overwrited new bucket filter, I see you handle withBucketFilter too, but it looks not so safe.. maybe we should add a method to skip negative buckets...

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 3c0fc7d into apache:master Feb 20, 2025
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants