Skip to content

Commit 3bd48fc

Browse files
committed
Add optional hash join buffering
1 parent 69221c6 commit 3bd48fc

File tree

8 files changed

+144
-1
lines changed

8 files changed

+144
-1
lines changed

benchmarks/src/imdb/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub struct RunOpt {
9292
/// True by default.
9393
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
9494
prefer_hash_join: BoolDefaultTrue,
95+
96+
/// How many bytes to buffer on the probe side of hash joins.
97+
#[arg(long, default_value = "0")]
98+
hash_join_buffering_capacity: usize,
9599
}
96100

97101
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
306310
.config()?
307311
.with_collect_statistics(!self.disable_statistics);
308312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313+
config.options_mut().execution.hash_join_buffering_capacity =
314+
self.hash_join_buffering_capacity;
309315
let rt_builder = self.common.runtime_env_builder()?;
310316
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
311317

benchmarks/src/tpcds/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ pub struct RunOpt {
144144
/// The tables should have been created with the `--sort` option for this to have any effect.
145145
#[arg(short = 't', long = "sorted")]
146146
sorted: bool,
147+
148+
/// How many bytes to buffer on the probe side of hash joins.
149+
#[arg(long, default_value = "0")]
150+
hash_join_buffering_capacity: usize,
147151
}
148152

149153
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
162166
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
163167
config.options_mut().optimizer.enable_piecewise_merge_join =
164168
self.enable_piecewise_merge_join;
169+
config.options_mut().execution.hash_join_buffering_capacity =
170+
self.hash_join_buffering_capacity;
165171
let rt_builder = self.common.runtime_env_builder()?;
166172
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
167173
// register tables

benchmarks/src/tpch/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
/// The tables should have been created with the `--sort` option for this to have any effect.
106106
#[arg(short = 't', long = "sorted")]
107107
sorted: bool,
108+
109+
/// How many bytes to buffer on the probe side of hash joins.
110+
#[arg(long, default_value = "0")]
111+
hash_join_buffering_capacity: usize,
108112
}
109113

110114
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
123127
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124128
config.options_mut().optimizer.enable_piecewise_merge_join =
125129
self.enable_piecewise_merge_join;
130+
config.options_mut().execution.hash_join_buffering_capacity =
131+
self.hash_join_buffering_capacity;
126132
let rt_builder = self.common.runtime_env_builder()?;
127133
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
128134
// register tables

datafusion/common/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,21 @@ config_namespace! {
669669
/// # Default
670670
/// `false` — ANSI SQL mode is disabled by default.
671671
pub enable_ansi_mode: bool, default = false
672+
673+
/// How many bytes to buffer in the probe side of hash joins while the build side is
674+
/// concurrently being built.
675+
///
676+
/// Without this, hash joins will wait until the full materialization of the build side
677+
/// before polling the probe side. This is useful in scenarios where the query is not
678+
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
679+
/// latency of the query.
680+
///
681+
/// Note that when hash join buffering is enabled, the probe side will start eagerly
682+
/// polling data, not giving time for the producer side of dynamic filters to produce any
683+
/// meaningful predicate. Queries with dynamic filters might see performance degradation.
684+
///
685+
/// Disabled by default, set to a number greater than 0 for enabling it.
686+
pub hash_join_buffering_capacity: usize, default = 0
672687
}
673688
}
674689

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::JoinSide;
20+
use datafusion_common::config::ConfigOptions;
21+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22+
use datafusion_physical_plan::ExecutionPlan;
23+
use datafusion_physical_plan::buffer::BufferExec;
24+
use datafusion_physical_plan::joins::HashJoinExec;
25+
use std::sync::Arc;
26+
27+
/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28+
/// configured capacity in the probe side:
29+
///
30+
/// ```text
31+
/// ┌───────────────────┐
32+
/// │ HashJoinExec │
33+
/// └─────▲────────▲────┘
34+
/// ┌───────┘ └─────────┐
35+
/// │ │
36+
/// ┌────────────────┐ ┌─────────────────┐
37+
/// │ Build side │ + │ BufferExec │
38+
/// └────────────────┘ └────────▲────────┘
39+
/// │
40+
/// ┌────────┴────────┐
41+
/// │ Probe side │
42+
/// └─────────────────┘
43+
/// ```
44+
///
45+
/// Which allows eagerly pulling it even before the build side has completely finished.
46+
#[derive(Debug, Default)]
47+
pub struct HashJoinBuffering {}
48+
49+
impl HashJoinBuffering {
50+
pub fn new() -> Self {
51+
Self::default()
52+
}
53+
}
54+
55+
impl PhysicalOptimizerRule for HashJoinBuffering {
56+
fn optimize(
57+
&self,
58+
plan: Arc<dyn ExecutionPlan>,
59+
config: &ConfigOptions,
60+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61+
let capacity = config.execution.hash_join_buffering_capacity;
62+
if capacity == 0 {
63+
return Ok(plan);
64+
}
65+
66+
plan.transform_down(|plan| {
67+
let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else {
68+
return Ok(Transformed::no(plan));
69+
};
70+
let plan = Arc::clone(&plan);
71+
Ok(Transformed::yes(
72+
if HashJoinExec::probe_side() == JoinSide::Left {
73+
// Do not stack BufferExec nodes together.
74+
if node.left.as_any().downcast_ref::<BufferExec>().is_some() {
75+
return Ok(Transformed::no(plan));
76+
}
77+
plan.with_new_children(vec![
78+
Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79+
Arc::clone(&node.right),
80+
])?
81+
} else {
82+
// Do not stack BufferExec nodes together.
83+
if node.right.as_any().downcast_ref::<BufferExec>().is_some() {
84+
return Ok(Transformed::no(plan));
85+
}
86+
plan.with_new_children(vec![
87+
Arc::clone(&node.left),
88+
Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89+
])?
90+
},
91+
))
92+
})
93+
.data()
94+
}
95+
96+
fn name(&self) -> &str {
97+
"HashJoinBuffering"
98+
}
99+
100+
fn schema_check(&self) -> bool {
101+
true
102+
}
103+
}

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod optimizer;
3939
pub mod output_requirements;
4040
pub mod projection_pushdown;
4141
pub use datafusion_pruning as pruning;
42+
pub mod hash_join_buffering;
4243
pub mod pushdown_sort;
4344
pub mod sanity_checker;
4445
pub mod topk_aggregation;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
3535
use crate::topk_aggregation::TopKAggregation;
3636
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3737

38+
use crate::hash_join_buffering::HashJoinBuffering;
3839
use crate::limit_pushdown_past_window::LimitPushPastWindows;
3940
use crate::pushdown_sort::PushdownSort;
4041
use datafusion_common::Result;
@@ -131,6 +132,10 @@ impl PhysicalOptimizer {
131132
// This can possibly be combined with [LimitPushdown]
132133
// It needs to come after [EnforceSorting]
133134
Arc::new(LimitPushPastWindows::new()),
135+
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
136+
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
137+
// the build side is completely finished.
138+
Arc::new(HashJoinBuffering::new()),
134139
// The LimitPushdown rule tries to push limits down as far as possible,
135140
// replacing operators with fetching variants, or adding limits
136141
// past operators that support limit pushdown.

0 commit comments

Comments
 (0)