Skip to content

Add coordinator results spooling#27211

Draft
tdcmeehan wants to merge 1 commit intoprestodb:masterfrom
tdcmeehan:spool
Draft

Add coordinator results spooling#27211
tdcmeehan wants to merge 1 commit intoprestodb:masterfrom
tdcmeehan:spool

Conversation

@tdcmeehan
Copy link
Contributor

@tdcmeehan tdcmeehan commented Feb 26, 2026

Description

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* ... 
* ... 

Hive Connector Changes
* ... 
* ... 

If release note is NOT required, use:

== NO RELEASE NOTE ==

Summary by Sourcery

Introduce coordinator-side result buffering that spools query output to a storage-backed buffer before delivering results to clients and integrates it into the query protocol.

New Features:

  • Add an optional CoordinatorResultBuffer that drains exchange results into a spooling output buffer and serves pages to clients after query completion.
  • Introduce a session/config property to enable coordinator output buffering per query.

Enhancements:

  • Integrate coordinator result buffering with query lifecycle, ensuring buffered data is released on success and discarded on retries or failures.
  • Track runtime metrics for pages and bytes drained into the coordinator buffer for observability.

Tests:

  • Add unit tests for CoordinatorResultBuffer covering draining, release, discard, and completion semantics.
  • Add distributed tests validating coordinator output buffering behavior, data correctness, cancellation, and metric presence, and extend test utilities to construct StatementClient with custom sessions.

@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Feb 26, 2026
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Feb 26, 2026

Reviewer's Guide

Adds an optional coordinator-side result spooling layer that buffers query output to a spooling output buffer on the coordinator, gates delivery to clients until query completion, and wires this into query lifecycle, configuration, metrics, and tests.

Class diagram for coordinator result spooling components

classDiagram
    class Query {
        - QueryManager queryManager
        - ExchangeClient exchangeClient
        - Optional~CoordinatorResultBuffer~ coordinatorResultBuffer
        - RetryCircuitBreaker retryCircuitBreaker
        - RetryConfig retryConfig
        + static Query create(Session session, String slug, QueryManager queryManager, TransactionManager transactionManager, AccessControl accessControl, SessionPropertyManager sessionPropertyManager, ExchangeClientSupplier exchangeClientSupplier, ExecutorService executor, BoundedExecutor responseExecutor, ScheduledExecutorService timeoutExecutor, BlockEncodingSerde blockEncodingSerde, RetryCircuitBreaker retryCircuitBreaker, RetryConfig retryConfig, Optional~URI~ retryUrl, OptionalLong retryExpirationEpochTime, boolean isRetryQuery, Optional~CoordinatorResultBuffer~ coordinatorResultBuffer)
        - Query(Session session, String slug, QueryManager queryManager, TransactionManager transactionManager, AccessControl accessControl, SessionPropertyManager sessionPropertyManager, ExchangeClient exchangeClient, ExecutorService executor, BoundedExecutor responseExecutor, ScheduledExecutorService timeoutExecutor, BlockEncodingSerde blockEncodingSerde, RetryCircuitBreaker retryCircuitBreaker, RetryConfig retryConfig, Optional~CoordinatorResultBuffer~ coordinatorResultBuffer)
        + void cancel()
        + synchronized void dispose()
        - ListenableFuture~?~ getFutureStateChange()
        - synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
        - SerializedPage pollNextPage()
    }

    class CoordinatorResultBuffer {
        - ExchangeClient exchangeClient
        - SpoolingOutputBuffer storageBuffer
        - RuntimeStats runtimeStats
        - OutputBufferId outputBufferId
        - Deque~SerializedPage~ readAheadBuffer
        - long readSequenceId
        - boolean released
        - boolean discarded
        - boolean exchangeClientDrained
        - boolean storageBufferComplete
        - ListenableFuture~BufferResult~ pendingStorageRead
        + CoordinatorResultBuffer(ExchangeClient exchangeClient, SpoolingOutputBuffer storageBuffer, RuntimeStats runtimeStats)
        + synchronized void drainExchangeClient()
        + synchronized SerializedPage pollPage()
        - void processStorageReadResult(BufferResult result)
        + synchronized void release()
        + synchronized void discardForRetry()
        + synchronized boolean isFinished()
        + synchronized boolean hasRemainingData()
    }

    class LocalQueryProvider {
        - QueryManager queryManager
        - TransactionManager transactionManager
        - AccessControl accessControl
        - SessionPropertyManager sessionPropertyManager
        - ExchangeClientSupplier exchangeClientSupplier
        - ExecutorService executor
        - BoundedExecutor responseExecutor
        - ScheduledExecutorService timeoutExecutor
        - RetryCircuitBreaker retryCircuitBreaker
        - RetryConfig retryConfig
        - SpoolingOutputBufferFactory spoolingOutputBufferFactory
        - ConcurrentMap~QueryId, Query~ queries
        + LocalQueryProvider(QueryManager queryManager, TransactionManager transactionManager, AccessControl accessControl, SessionPropertyManager sessionPropertyManager, ExchangeClientSupplier exchangeClientSupplier, ExecutorService executor, BoundedExecutor responseExecutor, ScheduledExecutorService timeoutExecutor, RetryCircuitBreaker retryCircuitBreaker, RetryConfig retryConfig, SpoolingOutputBufferFactory spoolingOutputBufferFactory)
        + Query getQuery(QueryId queryId, String slug, Optional~URI~ retryUrl, OptionalLong retryExpirationEpochTime, boolean isRetryQuery)
    }

    class FeaturesConfig {
        - DataSize spoolingOutputBufferThreshold
        - String spoolingOutputBufferTempStorage
        - boolean coordinatorOutputBufferingEnabled
        + boolean isCoordinatorOutputBufferingEnabled()
        + FeaturesConfig setCoordinatorOutputBufferingEnabled(boolean coordinatorOutputBufferingEnabled)
    }

    class SystemSessionProperties {
        <<final>>
        + static String COORDINATOR_OUTPUT_BUFFERING_ENABLED
        + static boolean isCoordinatorOutputBufferingEnabled(Session session)
    }

    class RuntimeStats {
        + void addMetricValue(String name, RuntimeUnit unit, long value)
    }

    class SpoolingOutputBuffer {
        + void enqueue(Lifespan lifespan, List~SerializedPage~ pages)
        + ListenableFuture~BufferResult~ get(OutputBufferId bufferId, long token, long maxSize)
        + void setNoMorePages()
        + void destroy()
    }

    class ExchangeClient {
        + SerializedPage pollPage()
        + boolean isClosed()
    }

    class BufferResult {
        + List~SerializedPage~ getSerializedPages()
        + long getNextToken()
        + boolean isBufferComplete()
    }

    class SpoolingOutputBufferFactory {
        + SpoolingOutputBuffer createSpoolingOutputBuffer(TaskId taskId, String bufferInstanceId, OutputBuffers outputBuffers, StateMachine~BufferState~ bufferState)
    }

    Query --> "0..1" CoordinatorResultBuffer : uses
    LocalQueryProvider --> Query : creates
    LocalQueryProvider --> SpoolingOutputBufferFactory : uses
    LocalQueryProvider --> CoordinatorResultBuffer : constructs
    CoordinatorResultBuffer --> ExchangeClient : wraps
    CoordinatorResultBuffer --> SpoolingOutputBuffer : delegates
    CoordinatorResultBuffer --> RuntimeStats : recordsMetrics
    CoordinatorResultBuffer --> BufferResult : readsFrom
    FeaturesConfig --> SystemSessionProperties : configDefaults
    SystemSessionProperties --> FeaturesConfig : readsDefaults
Loading

File-Level Changes

Change Details Files
Introduce coordinator-side result buffer that drains ExchangeClient into a SpoolingOutputBuffer and gates page delivery to clients
  • Add CoordinatorResultBuffer class that asynchronously drains pages from ExchangeClient into a SpoolingOutputBuffer, tracks read sequence, and exposes pollPage/release/discardForRetry/hasRemainingData/isFinished APIs
  • Track runtime metrics for pages and bytes drained through the coordinator buffer using RuntimeStats
  • Cover CoordinatorResultBuffer behaviors with unit tests for draining, release semantics, discard-on-retry, finish state, and empty/exchange failure paths
presto-main/src/main/java/com/facebook/presto/server/protocol/CoordinatorResultBuffer.java
presto-common/src/main/java/com/facebook/presto/common/RuntimeMetricName.java
presto-main/src/test/java/com/facebook/presto/server/protocol/TestCoordinatorResultBuffer.java
Wire coordinator result buffering into Query lifecycle and LocalQueryProvider with optional use based on session property
  • Extend Query to hold an Optional, drain it when waiting for data, use it as the source of pages when enabled, and ensure it is released on FINISHED and discarded on failure/dispose/retry
  • Update next-token calculation to respect remaining buffered data from the coordinator buffer even after exchange close
  • Update LocalQueryProvider to construct a SpoolingOutputBuffer via SpoolingOutputBufferFactory and wrap it in CoordinatorResultBuffer when coordinator output buffering is enabled for the session
presto-main/src/main/java/com/facebook/presto/server/protocol/Query.java
presto-main/src/main/java/com/facebook/presto/server/protocol/LocalQueryProvider.java
Add configuration and session property to enable coordinator output buffering
  • Extend FeaturesConfig with coordinatorOutputBufferingEnabled flag and corresponding @config key
  • Expose COORDINATOR_OUTPUT_BUFFERING_ENABLED system property in SystemSessionProperties, including getter and wiring to FeaturesConfig default
  • Adjust FeaturesConfig tests to cover defaults and explicit property mappings for the new flag
presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java
presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java
presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java
Add integration tests validating coordinator output buffering behavior, metrics, and correctness, and a helper for creating StatementClient in tests
  • Add TestCoordinatorOutputBuffering integration test to validate metrics population, disabled behavior, data gating until FINISHED, correctness against baseline, explain, cancellation, and failure cleanup when buffering is enabled
  • Expose a createStatementClient helper in AbstractTestingPrestoClient to support fine-grained client-side polling patterns in tests and refactor execute(Session, sql) to reuse it
presto-tests/src/test/java/com/facebook/presto/execution/TestCoordinatorOutputBuffering.java
presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestingPrestoClient.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants