Skip to content

[core] Support upper bound in dynamic bucket mode#4974

Merged
JingsongLi merged 5 commits intoapache:masterfrom
liyubin117:max_dynamic_bucket
Feb 19, 2025
Merged

[core] Support upper bound in dynamic bucket mode#4974
JingsongLi merged 5 commits intoapache:masterfrom
liyubin117:max_dynamic_bucket

Conversation

@liyubin117
Copy link
Copy Markdown
Contributor

@liyubin117 liyubin117 commented Jan 21, 2025

Purpose

In dynamic bucket mode, unlimited buckets lead to an unpredicable number of small files, which lead to stability problems. so we should support upper bound in dynamic bucket mode.

Linked issue: close #4942

Tests

HashBucketAssignerTest
SimpleHashBucketAssignerTest

  • Use new feature, Produce 3 buckets after inserting 12 rows
image image
  • Not use new feature, Produce 6 buckets after inserting 12 rows
image image

API and Format

Documentation

docs/layouts/shortcodes/generated/core_configuration.html

@liyubin117
Copy link
Copy Markdown
Contributor Author

@JingsongLi CI passed, PTAL, thanks!

@liyubin117 liyubin117 force-pushed the max_dynamic_bucket branch 3 times, most recently from c7b7755 to df576e2 Compare January 22, 2025 10:42
@liyubin117 liyubin117 requested a review from JingsongLi January 23, 2025 05:54
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if these modifications are effective, so let me give you my suggestion:
We only need to modify PartitionIndex.asign inside:

Only code:

// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
    if (bucketFilter.test(i) && !totalBucket.contains(i)) {
        hash2Bucket.put(hash, (short) i);
        nonFullBucketInformation.put(i, 1L);
        totalBucket.add(i);
        return i;
    }
}

// 4. too many buckets, throw exception
@SuppressWarnings("OptionalGetWithoutIsPresent")
int maxBucket = totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
throw new RuntimeException(
        String.format(
                "Too more bucket %s, you should increase target bucket row number %s.",
                maxBucket, targetBucketRowNumber));

New code:

// 3. create a new bucket
for (int i = 0; i < max_buckets; i++) {
    if (bucketFilter.test(i) && !totalBucket.contains(i)) {
        hash2Bucket.put(hash, (short) i);
        nonFullBucketInformation.put(i, 1L);
        totalBucket.add(i);
        return i;
    }
}

// 4. exceed max_buckets, just pick a bucket for record.
pick a min bucket (belongs to this task) to the record.

@liyubin117
Copy link
Copy Markdown
Contributor Author

liyubin117 commented Jan 23, 2025

After offline discussion with @JingsongLi , we have reached a consesus: We can't just update PartitionIndex logic because it doesn't handle SimpleHashBucketAssigner; When the buckets are full, a random bucket is selected for writing.

@liyubin117
Copy link
Copy Markdown
Contributor Author

@JingsongLi The feature has been completed as discussed, Looking forward your review !

maxBucket, targetBucketRowNumber));
// 4. exceed buckets upper bound
int bucket =
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, totalBucket.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should find a bucket for own... Not randomly from all buckets.

Copy link
Copy Markdown
Contributor Author

@liyubin117 liyubin117 Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your kind remind.

  1. Only when a new hash arrives and the bucket limit is exceeded, a bucket is randomly selected from task-owned buckets (totalBucket instance maintains buckets owned by itself task).
  2. Then, the key-value pair of this hash and bucket will be put into the cache.
  3. After that, whenever this hash arrives, it will always find its own bucket.
        // 1. is it a key that has appeared before
        if (hash2Bucket.containsKey(hash)) {
            return hash2Bucket.get(hash);
        }

int maxBucketId =
totalBucket.isEmpty()
? 0
: totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should not invoke the stream every time, you should cache the maxBucketId.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wise consideration :)

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit cf22950 into apache:master Feb 19, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support upper bound in dynamic bucket mode

3 participants