diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index f3cd2bb..b2ac282 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -119,26 +119,12 @@ impl DistributedSessionBuilder for RunOpt { ) -> Result { let mut builder = SessionStateBuilder::new().with_default_features(); - let mut config = self + let config = self .common .config()? .with_collect_statistics(!self.disable_statistics) .with_target_partitions(self.partitions()); - // FIXME: these three options are critical for the correct function of the library - // but we are not enforcing that the user sets them. They are here at the moment - // but we should figure out a way to do this better. - config - .options_mut() - .optimizer - .hash_join_single_partition_threshold = 0; - config - .options_mut() - .optimizer - .hash_join_single_partition_threshold_rows = 0; - - config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; - // end critical options section let rt_builder = self.common.runtime_env_builder()?; if self.distributed { diff --git a/src/common/util.rs b/src/common/util.rs index e1508c3..085c5c2 100644 --- a/src/common/util.rs +++ b/src/common/util.rs @@ -1,9 +1,7 @@ -use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::error::Result; use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties}; use std::fmt::Write; -use std::sync::Arc; pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result { let mut f = String::new(); diff --git a/src/physical_optimizer.rs b/src/physical_optimizer.rs index 4027d38..2b3489b 100644 --- a/src/physical_optimizer.rs +++ b/src/physical_optimizer.rs @@ -4,6 +4,7 @@ use super::stage::ExecutionStage; use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec}; use datafusion::common::tree_node::TreeNodeRecursion; use datafusion::error::DataFusionError; +use datafusion::physical_plan::joins::PartitionMode; use datafusion::{ common::{ internal_datafusion_err, @@ -160,6 +161,7 @@ impl DistributedPhysicalOptimizerRule { /// /// The plans we cannot split are: /// - NestedLoopJoinExec +/// - HashJoinExec with PartitionMode != Partitioned, like CollectLeft pub fn can_be_divided(plan: &Arc) -> Result { // recursively check to see if this stages plan contains a NestedLoopJoinExec let mut has_unsplittable_plan = false; @@ -170,6 +172,14 @@ pub fn can_be_divided(plan: &Arc) -> Result { { has_unsplittable_plan = true; return Ok(TreeNodeRecursion::Stop); + } else if let Some(hash_join) = f + .as_any() + .downcast_ref::() + { + if hash_join.partition_mode() != &PartitionMode::Partitioned { + has_unsplittable_plan = true; + return Ok(TreeNodeRecursion::Stop); + } } Ok(TreeNodeRecursion::Continue) diff --git a/tests/tpch_validation_test.rs b/tests/tpch_validation_test.rs index 6c4bb0f..8e73dc4 100644 --- a/tests/tpch_validation_test.rs +++ b/tests/tpch_validation_test.rs @@ -132,22 +132,7 @@ mod tests { async fn build_state( ctx: DistributedSessionBuilderContext, ) -> Result { - let mut config = SessionConfig::new().with_target_partitions(3); - - // FIXME: these three options are critical for the correct function of the library - // but we are not enforcing that the user sets them. They are here at the moment - // but we should figure out a way to do this better. - config - .options_mut() - .optimizer - .hash_join_single_partition_threshold = 0; - config - .options_mut() - .optimizer - .hash_join_single_partition_threshold_rows = 0; - - config.options_mut().optimizer.prefer_hash_join = true; - // end critical options section + let config = SessionConfig::new().with_target_partitions(3); let rule = DistributedPhysicalOptimizerRule::new().with_maximum_partitions_per_task(2); Ok(SessionStateBuilder::new()