Skip to content

Commit 5c92110

Browse files
robtandygabotechs
andauthored
Collect Left Hash Joins (#105)
* Add `ConfigExtensionExt`, allowing the propagation of arbitrary [ConfigExtension]s across network boundaries * Rename propagate_distributed_option_extension to retrieve_distributed_option_extension * Change x-datafusion-distributed- to x-datafusion-distributed-config- * Check for full format!("{}.", T::PREFIX) in gRPC keys * Remove double clone * Fix tests * Fix tests * allow collect left hashjoins * allow nested_loop_joins * Rework SessionBuilder * Fix clippy errors * check if plan can be divided into tasks * revert change * fmt and clippy * revert change in gen-tpch.sh --------- Co-authored-by: Gabriel Musat Mestre <[email protected]>
1 parent f8bb759 commit 5c92110

File tree

4 files changed

+12
-33
lines changed

4 files changed

+12
-33
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,26 +119,12 @@ impl DistributedSessionBuilder for RunOpt {
119119
) -> Result<SessionState, DataFusionError> {
120120
let mut builder = SessionStateBuilder::new().with_default_features();
121121

122-
let mut config = self
122+
let config = self
123123
.common
124124
.config()?
125125
.with_collect_statistics(!self.disable_statistics)
126126
.with_target_partitions(self.partitions());
127127

128-
// FIXME: these three options are critical for the correct function of the library
129-
// but we are not enforcing that the user sets them. They are here at the moment
130-
// but we should figure out a way to do this better.
131-
config
132-
.options_mut()
133-
.optimizer
134-
.hash_join_single_partition_threshold = 0;
135-
config
136-
.options_mut()
137-
.optimizer
138-
.hash_join_single_partition_threshold_rows = 0;
139-
140-
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
141-
// end critical options section
142128
let rt_builder = self.common.runtime_env_builder()?;
143129

144130
if self.distributed {

src/common/util.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
21
use datafusion::error::Result;
32
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
43

54
use std::fmt::Write;
6-
use std::sync::Arc;
75

86
pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
97
let mut f = String::new();

src/physical_optimizer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::stage::ExecutionStage;
44
use crate::{plan::PartitionIsolatorExec, ArrowFlightReadExec};
55
use datafusion::common::tree_node::TreeNodeRecursion;
66
use datafusion::error::DataFusionError;
7+
use datafusion::physical_plan::joins::PartitionMode;
78
use datafusion::{
89
common::{
910
internal_datafusion_err,
@@ -160,6 +161,7 @@ impl DistributedPhysicalOptimizerRule {
160161
///
161162
/// The plans we cannot split are:
162163
/// - NestedLoopJoinExec
164+
/// - HashJoinExec with PartitionMode != Partitioned, like CollectLeft
163165
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
164166
// recursively check to see if this stages plan contains a NestedLoopJoinExec
165167
let mut has_unsplittable_plan = false;
@@ -170,6 +172,14 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
170172
{
171173
has_unsplittable_plan = true;
172174
return Ok(TreeNodeRecursion::Stop);
175+
} else if let Some(hash_join) = f
176+
.as_any()
177+
.downcast_ref::<datafusion::physical_plan::joins::HashJoinExec>()
178+
{
179+
if hash_join.partition_mode() != &PartitionMode::Partitioned {
180+
has_unsplittable_plan = true;
181+
return Ok(TreeNodeRecursion::Stop);
182+
}
173183
}
174184

175185
Ok(TreeNodeRecursion::Continue)

tests/tpch_validation_test.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -132,22 +132,7 @@ mod tests {
132132
async fn build_state(
133133
ctx: DistributedSessionBuilderContext,
134134
) -> Result<SessionState, DataFusionError> {
135-
let mut config = SessionConfig::new().with_target_partitions(3);
136-
137-
// FIXME: these three options are critical for the correct function of the library
138-
// but we are not enforcing that the user sets them. They are here at the moment
139-
// but we should figure out a way to do this better.
140-
config
141-
.options_mut()
142-
.optimizer
143-
.hash_join_single_partition_threshold = 0;
144-
config
145-
.options_mut()
146-
.optimizer
147-
.hash_join_single_partition_threshold_rows = 0;
148-
149-
config.options_mut().optimizer.prefer_hash_join = true;
150-
// end critical options section
135+
let config = SessionConfig::new().with_target_partitions(3);
151136

152137
let rule = DistributedPhysicalOptimizerRule::new().with_maximum_partitions_per_task(2);
153138
Ok(SessionStateBuilder::new()

0 commit comments

Comments
 (0)