Skip to content

Reduce AsyncQueue Size in SplitSources for Bucketed Tables#27181

Open
shelton408 wants to merge 1 commit intoprestodb:masterfrom
shelton408:export-D93797024
Open

Reduce AsyncQueue Size in SplitSources for Bucketed Tables#27181
shelton408 wants to merge 1 commit intoprestodb:masterfrom
shelton408:export-D93797024

Conversation

@shelton408
Copy link
Contributor

@shelton408 shelton408 commented Feb 20, 2026

Summary:
Looking for insight from anyone with expertise in this area.

Heavily bucketed tables have been causing heap memory issues for the coordinator, leading to OOM crashes.
Each bucket for a bucketed split source creates an asyncQueue with an arrayDeque of size MaxOutstandingSplits * 2 + 1, the same size as the entire asyncQueue of a non-bucketed split source. Most of this reserved space is never touched for heavily bucketed tables.

An example that came up in our systems recently is a query against a table with 32768 buckets. The split source for the table alone created 32768 async queues which resulted in a memory usage of 571 megabytes.
(note that while this first image shows the asyncQueue size as 232, this only applies to the 32k empty asyncQueues, the other 32k use 17568 bytes)
image
Looking into the asyncQueue, we can see that the memory is primarily consumed by async queues which reserve the 2001 default slots, but only use 1 element. This uses 17376 bytes for the arrayDeque per bucket, of which 16008 bytes are used just for reserving the array.
image

This change reduces the default memory reserved by the arrayDeque when bucketing from MaxOutstandingSplits * 2 to a dynamic value based on bucket count (capping on the lower end as 8 * 2 + 1) . In the case above, the total cost per bucket would be reduced from 17376 bytes to 1504 bytes (accounting for the 1 split in the array), leading to over a 90% save in memory.

A possible downside to this change is that if we exceed the array size, the array will copy into a larger array which may be an expensive operation, however from what I've seen, tables with large bucket counts do not generate a lot of splits per bucket, and this seems to generally scale linearly, which is how I ended up with my current formula.

References:
/presto-trunk/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java Line 229
/presto-trunk/presto-hive/src/main/java/com/facebook/presto/hive/util/AsyncQueue.java Line 59

Differential Revision: D93797024

Summary by Sourcery

Adjust Hive split source configuration to reduce coordinator memory usage for heavily bucketed tables by limiting per-bucket outstanding splits.

Enhancements:

  • Pass bucket handle into split source computation so bucketed scheduling can account for bucket count.
  • Dynamically estimate outstanding splits per bucket based on bucket count with a lower bound, instead of using the global maxOutstandingSplits for each bucket.

Summary:
Heavily bucketed tables have been causing heap memory issues for the coordinator.
Each bucket creates an asyncQueue with an arrayDeque of size MaxOutstandingSplits * 2 + 1, but most of this reserved space is never touched for heavily bucketed tables.

An example that came up in our systems recently is a query against a table with 32768 buckets. The split source for the table alone created 32768 async queues which resulted in a memory usage of 571 megabytes. 
(note that while this first image shows the asyncQueue size as 232, this only applies to the 32k empty asyncQueues, the other 32k use 17568 bytes)
{F1985791448}
Looking into the asyncQueue, we can see that the memory is primarily consumed by async queues which reserve the 2001 default slots, but only use 1 element. This uses 17376 bytes per arrayDeque per bucket, of which 16008 bytes are used just for reserving the array.
{F1985791302}


This change reduces the default memory reserved by the arrayDeque when bucketing from MaxOutstandingSplits * 2 to a dynamic value based on bucket count (capping on the lower end as 8 * 2 + 1) . In the case above, the total cost per bucket would be reduced from 17376 bytes to 1504 bytes (accounting for the 1 split in the array), leading to over a 90% save in memory.

A possible downside to this change is that if we exceed the array size, the array will copy into a larger array which may be an expensive operation, however from what I've seen, tables with large bucket counts do not generate a lot of splits per bucket, and this seems to generally scale linearly, which is how I ended up with my current formula.


References:
/presto-trunk/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java Line 229
/presto-trunk/presto-hive/src/main/java/com/facebook/presto/hive/util/AsyncQueue.java Line 59

Differential Revision: D93797024
@shelton408 shelton408 requested a review from a team as a code owner February 20, 2026 00:33
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Feb 20, 2026

Reviewer's guide (collapsed on small PRs)

Reviewer's Guide

Adjusts Hive bucketed split scheduling to estimate a per-bucket outstanding-split limit based on bucket count, reducing AsyncQueue/ArrayDeque memory usage for heavily bucketed tables while preserving existing behavior for unbucketed or lightly bucketed tables.

Sequence diagram for updated bucketed Hive split scheduling

sequenceDiagram
    participant Client
    participant HiveSplitManager
    participant HiveSplitLoader
    participant HiveSplitSource

    Client->>HiveSplitManager: getSplits(splitSchedulingContext, table, session, layout, functionRegistry)
    HiveSplitManager->>HiveSplitLoader: new HiveSplitLoader(...)

    HiveSplitManager->>HiveSplitManager: computeSplitSource(splitSchedulingContext, table, session, hiveSplitLoader, splitScanRatio, bucketHandle)

    alt grouped_scheduling
        HiveSplitManager->>HiveSplitManager: bucketCount = bucketHandle.map(getReadBucketCount).orElse(1)
        HiveSplitManager->>HiveSplitManager: estimatedOutstandingSplitsPerBucket = max(8, maxOutstandingSplits / bucketCount)
        HiveSplitManager->>HiveSplitSource: bucketed(session, dbName, tableName, cacheQuotaRequirement, maxInitialSplitSize, estimatedOutstandingSplitsPerBucket, maxOutstandingSplitsSize, hiveSplitLoader, executor, splitSchedulingStrategy, schedulerUsesHostAddresses, partialAggPushdown)
    else ungroupped_scheduling
        HiveSplitManager->>HiveSplitSource: nonBucketed factory method (unchanged)
    end

    HiveSplitManager->>HiveSplitLoader: start(splitSource)
    HiveSplitManager-->>Client: HiveSplitSource
Loading

Class diagram for updated HiveSplitManager.computeSplitSource behavior

classDiagram
    class HiveSplitManager {
        - CacheQuotaRequirementProvider cacheQuotaRequirementProvider
        - int maxOutstandingSplits
        - DataSize maxOutstandingSplitsSize
        + ConnectorSplitSource getSplits(SplitSchedulingContext splitSchedulingContext, ConnectorSession session, Table table, ConnectorTableLayoutHandle layout)
        - HiveSplitSource computeSplitSource(SplitSchedulingContext splitSchedulingContext, Table table, ConnectorSession session, HiveSplitLoader hiveSplitLoader, double splitScanRatio, Optional<HiveBucketHandle> bucketHandle)
    }

    class HiveSplitSource {
        + static HiveSplitSource bucketed(ConnectorSession session, String databaseName, String tableName, CacheQuotaRequirement cacheQuotaRequirement, DataSize maxInitialSplitSize, int maxOutstandingSplits, DataSize maxOutstandingSplitsSize, HiveSplitLoader hiveSplitLoader, Executor executor, SplitSchedulingStrategy splitSchedulingStrategy, boolean schedulerUsesHostAddresses, boolean partialAggregationsPushedDown)
    }

    class HiveBucketHandle {
        + int getReadBucketCount()
    }

    class SplitSchedulingContext {
        + SplitSchedulingStrategy getSplitSchedulingStrategy()
        + boolean schedulerUsesHostAddresses()
    }

    class CacheQuotaRequirementProvider {
        + CacheQuotaRequirement getCacheQuotaRequirement(String databaseName, String tableName)
    }

    class CacheQuotaRequirement
    class HiveSplitLoader {
        + void start(HiveSplitSource hiveSplitSource)
    }

    HiveSplitManager --> HiveSplitSource : uses
    HiveSplitManager --> HiveSplitLoader : creates
    HiveSplitManager --> HiveBucketHandle : optional bucketHandle
    HiveSplitManager --> CacheQuotaRequirementProvider : uses
    CacheQuotaRequirementProvider --> CacheQuotaRequirement : creates
    HiveSplitSource --> HiveSplitLoader : uses
    HiveSplitManager --> SplitSchedulingContext : uses
    HiveSplitManager --> CacheQuotaRequirement : uses
Loading

File-Level Changes

Change Details Files
Pass bucket handle information into HiveSplitSource construction so bucketed split sources can size their async queues based on bucket count.
  • Extend computeSplitSource to accept an Optional parameter alongside existing arguments
  • Update getSplits to forward the table's bucket handle when calling computeSplitSource
presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java
Dynamically derive the max outstanding splits per bucket for grouped (bucketed) scheduling to shrink AsyncQueue backing ArrayDeque capacity on heavily bucketed tables.
  • For GROUPED_SCHEDULING, compute bucketCount from bucketHandle.readBucketCount or default to 1
  • Compute estimatedOutstandingSplitsPerBucket as max(8, maxOutstandingSplits / bucketCount) to maintain a reasonable lower bound
  • Use estimatedOutstandingSplitsPerBucket instead of maxOutstandingSplits when constructing a bucketed HiveSplitSource
presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've left some high level feedback:

  • The new estimatedOutstandingSplitsPerBucket logic changes behavior when bucketCount > maxOutstandingSplits by clamping up to 8 instead of using the (small) configured max per bucket; consider whether this is intentional and, if so, documenting or guarding against unexpectedly increasing per-bucket queue sizes in that case.
  • The hard-coded lower bound of 8 in Math.max(8, maxOutstandingSplits / bucketCount) is a bit of a magic number; consider either tying it explicitly to the AsyncQueue sizing logic (e.g., via a named constant) or explaining the rationale so future maintainers understand why this value was chosen.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `estimatedOutstandingSplitsPerBucket` logic changes behavior when `bucketCount > maxOutstandingSplits` by clamping up to 8 instead of using the (small) configured max per bucket; consider whether this is intentional and, if so, documenting or guarding against unexpectedly increasing per-bucket queue sizes in that case.
- The hard-coded lower bound of `8` in `Math.max(8, maxOutstandingSplits / bucketCount)` is a bit of a magic number; consider either tying it explicitly to the AsyncQueue sizing logic (e.g., via a named constant) or explaining the rationale so future maintainers understand why this value was chosen.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@shelton408 shelton408 changed the title Presto OSS Reduce AsyncQueue Size in SplitSources for Bucketed Tables [RFC] Presto OSS Reduce AsyncQueue Size in SplitSources for Bucketed Tables Feb 20, 2026
@shelton408 shelton408 changed the title [RFC] Presto OSS Reduce AsyncQueue Size in SplitSources for Bucketed Tables Reduce AsyncQueue Size in SplitSources for Bucketed Tables Feb 20, 2026
@steveburnett
Copy link
Contributor

Please add a release note entry as described in the Release Notes Guidelines to pass the not required but failing CI check.

Please edit the title of the PR to pass the required and failing CI check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments