Skip to content

Conversation

@gabotechs
Copy link
Contributor

@gabotechs gabotechs commented Jan 12, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

This is a PR from a batch of PRs that attempt to improve performance in hash joins:

It adds a building block that allows eagerly collecting data on the probe side of a hash join before the build side is finished.

Even if the intended use case is for hash joins, the new execution node is generic and is designed to work anywhere in the plan.

What changes are included in this PR?

Note

The new BufferExec node introduced in this PR is still not wired up automatically

Adds a new BufferExec node that can buffer up to a certain size in bytes for each partition eagerly performing work that otherwise would be delayed.

Schematically, it looks like this:

             ┌───────────────────────────┐
             │        BufferExec         │
             │                           │
             │┌────── Partition 0 ──────┐│
             ││            ┌────┐ ┌────┐││       ┌────┐
 ──background poll────────▶│    │ │    ├┼┼───────▶    │
             ││            └────┘ └────┘││       └────┘
             │└─────────────────────────┘│
             │┌────── Partition 1 ──────┐│
             ││     ┌────┐ ┌────┐ ┌────┐││       ┌────┐
 ──background poll─▶│    │ │    │ │    ├┼┼───────▶    │
             ││     └────┘ └────┘ └────┘││       └────┘
             │└─────────────────────────┘│
             │                           │
             │           ...             │
             │                           │
             │┌────── Partition N ──────┐│
             ││                   ┌────┐││       ┌────┐
 ──background poll───────────────▶│    ├┼┼───────▶    │
             ││                   └────┘││       └────┘
             │└─────────────────────────┘│
             └───────────────────────────┘

Are these changes tested?

yes, by new unit tests

Are there any user-facing changes?

users can import a new BufferExec execution plan in their codebase, but no internal usage is shipped yet in this PR.

@github-actions github-actions bot added core Core DataFusion crate execution Related to the execution crate proto Related to proto crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Jan 12, 2026
@gabotechs gabotechs force-pushed the buffer-exec branch 3 times, most recently from 6d6f99e to e501fac Compare January 16, 2026 10:29
github-merge-queue bot pushed a commit that referenced this pull request Jan 26, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Prerequisite for the following PRs:
- #19760
- #19761

Even if the api on the `MemoryPool` does not require `&mut self` for
growing/shrinking the reserved size, the api in `MemoryReservation`
does, making simple implementations irrepresentable without
synchronization primitives. For example, the following would require a
`Mutex` for concurrent access to the `MemoryReservation` in different
threads, even though the `MemoryPool` doesn't:

```rust
let mut stream: SendableRecordBatchStream = SendableRecordBatchStream::new();
let mem: Arc<MemoryReservation> = Arc::new(MemoryReservation::new_empty());

let mut builder = ReceiverStreamBuilder::new(10);
let tx = builder.tx();
{
    let mem = mem.clone();
    builder.spawn(async move {
        while let Some(msg) = stream.next().await {
            mem.try_grow(msg.unwrap().get_array_memory_size()); // ❌ `mem` is not mutable
            tx.send(msg).unwrap();
        }
    });
}
builder
    .build()
    .inspect_ok(|msg| mem.shrink(msg.get_array_memory_size()));  // ❌ `mem` is not mutable
```


## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

Make the methods in `MemoryReservation` require `&self` instead of `&mut
self` for allowing concurrent shrink/grows from different tasks for the
same reservation.

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

yes, by current tests

## Are there any user-facing changes?

Users can now safely call methods of `MemoryReservation` from different
tasks without synchronization primitives.

This is a backwards compatible API change, as it will work out of the
box for current users, however, depending on their clippy configuration,
they might see some new warnings about "unused muts" in their codebase.

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
@github-actions github-actions bot removed core Core DataFusion crate execution Related to the execution crate datasource Changes to the datasource crate labels Feb 1, 2026
@gabotechs gabotechs marked this pull request as ready for review February 1, 2026 11:32
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems in many ways quite similar to what RepartitionExec w/ spilling does. Have you had a chance to poke at that code? I wonder if we could share the code and give this operator spilling capabilities at the same time (would help with large batches? buffering on disk if there are no waiters on the other side?).

I'm curious to get @alamb's thoughts on this vs. his work on buffering / prefetching at the Parquet level. The advantage I see of buffering at the Parquet level is that the reader can do fancy things like planning to fetch a larger contiguous chunk of data from object storage. Maybe both are needed though? As in: you want buffering and prefetching.

@gabotechs
Copy link
Contributor Author

This seems in many ways quite similar to what RepartitionExec w/ spilling does. Have you had a chance to poke at that code?

Yes, in fact a small chunk of the code there still shows my name in the git blames. It do is similar in the sense that there is some per-partition buffering, but it looks like that code is in a more difficult situation, as it needs to be able to buffer potentially indefinitely due to the unbounded nature of RepartitionExec (correct me if I'm wrong, it's been a while since I looked at that code), whether the code in this PR can afford to have bounded channels.

At first sight I do not see a lot of opportunities for reusing code in both places due to the different requirements, but happy to listen to ideas.

Maybe both are needed though? As in: you want buffering and prefetching.

Another difference with RepartitionExec is that BufferExec will eagerly poll its children regardless of whether its stream was polled or not, and RepartitionExec will wait for the first poll to start doing work. This means that RepartitionExec does not prefetch, but BufferExec does

The advantage I see of buffering at the Parquet level is that the reader can do fancy things like planning to fetch a larger contiguous chunk of data from object storage

👍 I can see this being beneficial. My intention was to first use this in #19761, but the BufferExec node is something you are supposed to be able to place wherever you want. In fact, we do use it in more scenarios at DataDog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants