Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3b45cc2
Add `ConfigExtensionExt`, allowing the propagation of arbitrary [Conf…
gabotechs Aug 18, 2025
ae8b6c9
Rename propagate_distributed_option_extension to retrieve_distributed…
gabotechs Aug 19, 2025
9ac83b0
Change x-datafusion-distributed- to x-datafusion-distributed-config-
gabotechs Aug 19, 2025
e15796e
Check for full format!("{}.", T::PREFIX) in gRPC keys
gabotechs Aug 19, 2025
104e3fa
Remove double clone
gabotechs Aug 19, 2025
8576c2c
Fix tests
gabotechs Aug 19, 2025
d35ddff
Fix tests
gabotechs Aug 19, 2025
210ea86
allow collect left hashjoins
robtandy Aug 20, 2025
ff9167d
allow nested_loop_joins
robtandy Aug 20, 2025
25d24d2
Rework SessionBuilder
gabotechs Aug 21, 2025
c692a6f
Fix clippy errors
gabotechs Aug 21, 2025
10dc81a
check if plan can be divided into tasks
robtandy Aug 21, 2025
8ddef97
Merge remote-tracking branch 'origin/main' into robtandy/nested_loop_…
robtandy Aug 21, 2025
8265f88
Merge remote-tracking branch 'origin/main' into robtandy/collect_left…
robtandy Aug 21, 2025
e3bbcc4
revert change
robtandy Aug 21, 2025
8a8c718
merge previous PR
robtandy Aug 21, 2025
70c99e1
Merge branch 'main' into gabrielmusat/ergonomy-improvements
gabotechs Aug 21, 2025
35f0b08
Merge branch 'main' into gabrielmusat/ergonomy-improvements
gabotechs Aug 21, 2025
cbbcb93
add support for collect left hash joins
robtandy Aug 21, 2025
f3548f3
merge origin/main
robtandy Aug 21, 2025
9aa6e16
fmt and clippy
robtandy Aug 21, 2025
a726bff
revert change in gen-tpch.sh
robtandy Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,12 @@ impl DistributedSessionBuilder for RunOpt {
) -> Result<SessionState, DataFusionError> {
let mut builder = SessionStateBuilder::new().with_default_features();

let mut config = self
let config = self
.common
.config()?
.with_collect_statistics(!self.disable_statistics)
.with_target_partitions(self.partitions());

// FIXME: these three options are critical for the correct function of the library
// but we are not enforcing that the user sets them. They are here at the moment
// but we should figure out a way to do this better.
config
.options_mut()
.optimizer
.hash_join_single_partition_threshold = 0;
config
.options_mut()
.optimizer
.hash_join_single_partition_threshold_rows = 0;

config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
// end critical options section
let rt_builder = self.common.runtime_env_builder()?;

if self.distributed {
Expand Down
2 changes: 0 additions & 2 deletions src/common/util.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::error::Result;
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};

use std::fmt::Write;
use std::sync::Arc;

pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
let mut f = String::new();
Expand Down
10 changes: 10 additions & 0 deletions src/physical_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::stage::ExecutionStage;
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::joins::PartitionMode;
use datafusion::{
common::{
internal_datafusion_err,
Expand Down Expand Up @@ -160,6 +161,7 @@ impl DistributedPhysicalOptimizerRule {
///
/// The plans we cannot split are:
/// - NestedLoopJoinExec
/// - HashJoinExec with PartitionMode != Partitioned, like CollectLeft
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
// recursively check to see if this stages plan contains a NestedLoopJoinExec
let mut has_unsplittable_plan = false;
Expand All @@ -170,6 +172,14 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
{
has_unsplittable_plan = true;
return Ok(TreeNodeRecursion::Stop);
} else if let Some(hash_join) = f
.as_any()
.downcast_ref::<datafusion::physical_plan::joins::HashJoinExec>()
{
if hash_join.partition_mode() != &PartitionMode::Partitioned {
has_unsplittable_plan = true;
return Ok(TreeNodeRecursion::Stop);
}
}

Ok(TreeNodeRecursion::Continue)
Expand Down
17 changes: 1 addition & 16 deletions tests/tpch_validation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,7 @@ mod tests {
async fn build_state(
ctx: DistributedSessionBuilderContext,
) -> Result<SessionState, DataFusionError> {
let mut config = SessionConfig::new().with_target_partitions(3);

// FIXME: these three options are critical for the correct function of the library
// but we are not enforcing that the user sets them. They are here at the moment
// but we should figure out a way to do this better.
config
.options_mut()
.optimizer
.hash_join_single_partition_threshold = 0;
config
.options_mut()
.optimizer
.hash_join_single_partition_threshold_rows = 0;

config.options_mut().optimizer.prefer_hash_join = true;
// end critical options section
let config = SessionConfig::new().with_target_partitions(3);

let rule = DistributedPhysicalOptimizerRule::new().with_maximum_partitions_per_task(2);
Ok(SessionStateBuilder::new()
Expand Down