Skip to content

Conversation

@robtandy
Copy link
Collaborator

@robtandy robtandy commented Aug 8, 2025

Edit:

  • Moved validation out of benchmarks and into @jayshrivastava 's tpch validation tests
  • Incorporated improvements suggested in comments
  • Ignored tpch query 22, while support for NestedLoopJoinExec is added.

This PR fixes an execution logical bug and also adds a --validate flag to the benchmarks to confirm that we calculate the correct result vs the single node case.

To run the TPCH benchmark in a distributed fashion and validate it against single node execution, follow the readme in benchmarks.

Note that the particular approach to distributed execution that this library takes requires all joins to be partition joins. We do not support CollectLeft in particular. So, the following modifications to the context before planning are required for correct operation

config
       .options_mut()
       .optimizer
       .hash_join_single_partition_threshold = 0;
config
       .options_mut()
       .optimizer
       .hash_join_single_partition_threshold_rows = 0;

config.options_mut().optimizer.prefer_hash_join = true;

At the moment this is set in the benchmark crate, but we really need to make this easy for the user and not allow them to mess up these values. I'm not sure how to do this at the moment. I think we can refactor subsequent to this PR.

Regarding adding support for other Hash join modes, I think we can do that, and then use the benchmark to compare and evaluate options for execution speed.

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good! left just some minor comments, but looks pretty much there

Comment on lines 106 to 108

#[structopt(long = "validate")]
validate: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving forward with @jayshrivastava's changes in #83 for validating TPCH correctness instead of this? it might be slightly better to ensure validation there because:

  • It would be nice to touch this code as little as possible, as this is pretty much vendored code from upstream DataFusion, and if we decide to move this project there or upstream DataFusion decides to make their benchmarks crate public, it would be difficult to port because of conflicts
  • We want to ensure TPCH correctness in the CI, so it might be more suitable to do it as a mandatory test suite using Cargo test tools rather than an optional step during the benchmarks

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep i thought the same thing and i moved it out of the benchmarks and aligned with @jayshrivastava 's PR

Comment on lines 71 to 76
/*println!(
"{} Task {:?} executing partition {}",
stage.name(),
task.partition_group,
partition
);*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this maybe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +105 to +106
let (state, stage) = once_stage
.get_or_try_init(|| async {
Copy link
Collaborator

@gabotechs gabotechs Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lock the once_stage RefMut across the initialization, locking a shard in the self.stages dashmap across an asynchronous gap, which might be too much locking.

Fortunately, it's very easy to prevent this:

  • we can make the OnceCell a shared reference:
    pub(super) stages: DashMap<StageKey, Arc<OnceCell<(SessionState, Arc<ExecutionStage>)>>>,
  • and then immediately drop the reference to the dashmap entry
        let once_stage = {
            let entry = self.stages.entry(key).or_default();
            Arc::clone(&entry)
            // <- dashmap RefMut get's dropped, releasing the lock for the current shard
        };

Copy link
Collaborator Author

@robtandy robtandy Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good improvement. added.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contained some tests that tested the behavior of sharing a single execution node across multiple callers with a dashmap. It's a shame to delete them, I would have expected them to still be valid here.

If you see no path forward in keeping those tests it's fine, we can build new ones eventually.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added it back but i'm not sure of its function or necessity, as its not referenced anywhere.

))?;
stream_from_stage_task(ticket.clone(), &url, schema.clone(), &channel_manager).await
let futs = child_stage_tasks.iter().enumerate().map(|(i, task)| {
let i_capture = i;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 i_capture? It should not be necessary to capture any variables that implement the Copy trait right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye and yes, not needed!

Comment on lines 49 to 50
assert_snapshot!(physical_distributed_str,
/*assert_snapshot!(physical_distributed_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be commented? I think it should be fine to leave this uncommented, it should work.

FYI, this is using https://github.com/mitsuhiko/insta, which means that if the test fails, you can just do:

cargo insta review

And you will be prompted to accept the changes

You can install cargo insta with:

curl -LsSf https://insta.rs/install.sh | sh

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, insta is awesome! 💯 Done.

jayshrivastava added a commit that referenced this pull request Aug 13, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This struct is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Having our own module avoids introducing a large dependency, which
keeps this project closer to vanilla datafusion.

This change is meant to be useful for #89, where it's
possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries.

Informs: #90
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome! 💯 in it goes

@gabotechs gabotechs merged commit a2a8163 into main Aug 13, 2025
3 checks passed
@gabotechs gabotechs deleted the robtandy/fix_execution_bug branch August 13, 2025 09:27
jayshrivastava added a commit that referenced this pull request Aug 17, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This struct is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Having our own module avoids introducing a large dependency, which
keeps this project closer to vanilla datafusion.

This change is meant to be useful for #89, where it's
possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries.

Informs: #90
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants