Skip to content

Commit 5a4f11f

Browse files
committed
Add optional hash join buffering
1 parent 5642297 commit 5a4f11f

File tree

8 files changed

+143
-0
lines changed

8 files changed

+143
-0
lines changed

benchmarks/src/imdb/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub struct RunOpt {
9292
/// True by default.
9393
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
9494
prefer_hash_join: BoolDefaultTrue,
95+
96+
/// How many bytes to buffer on the probe side of hash joins.
97+
#[arg(long, default_value = "0")]
98+
hash_join_buffering_capacity: usize,
9599
}
96100

97101
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
306310
.config()?
307311
.with_collect_statistics(!self.disable_statistics);
308312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313+
config.options_mut().execution.hash_join_buffering_capacity =
314+
self.hash_join_buffering_capacity;
309315
let rt_builder = self.common.runtime_env_builder()?;
310316
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
311317

benchmarks/src/tpcds/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ pub struct RunOpt {
144144
/// The tables should have been created with the `--sort` option for this to have any effect.
145145
#[arg(short = 't', long = "sorted")]
146146
sorted: bool,
147+
148+
/// How many bytes to buffer on the probe side of hash joins.
149+
#[arg(long, default_value = "0")]
150+
hash_join_buffering_capacity: usize,
147151
}
148152

149153
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
162166
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
163167
config.options_mut().optimizer.enable_piecewise_merge_join =
164168
self.enable_piecewise_merge_join;
169+
config.options_mut().execution.hash_join_buffering_capacity =
170+
self.hash_join_buffering_capacity;
165171
let rt_builder = self.common.runtime_env_builder()?;
166172
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
167173
// register tables

benchmarks/src/tpch/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
/// The tables should have been created with the `--sort` option for this to have any effect.
106106
#[arg(short = 't', long = "sorted")]
107107
sorted: bool,
108+
109+
/// How many bytes to buffer on the probe side of hash joins.
110+
#[arg(long, default_value = "0")]
111+
hash_join_buffering_capacity: usize,
108112
}
109113

110114
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
123127
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124128
config.options_mut().optimizer.enable_piecewise_merge_join =
125129
self.enable_piecewise_merge_join;
130+
config.options_mut().execution.hash_join_buffering_capacity =
131+
self.hash_join_buffering_capacity;
126132
let rt_builder = self.common.runtime_env_builder()?;
127133
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
128134
// register tables

datafusion/common/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,21 @@ config_namespace! {
669669
/// # Default
670670
/// `false` — ANSI SQL mode is disabled by default.
671671
pub enable_ansi_mode: bool, default = false
672+
673+
/// How many bytes to buffer in the probe side of hash joins while the build side is
674+
/// concurrently being built.
675+
///
676+
/// Without this, hash joins will wait until the full materialization of the build side
677+
/// before polling the probe side. This is useful in scenarios where the query is not
678+
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
679+
/// latency of the query.
680+
///
681+
/// Note that when hash join buffering is enabled, the probe side will start eagerly
682+
/// polling data, not giving time for the producer side of dynamic filters to produce any
683+
/// meaningful predicate. Queries with dynamic filters might see performance degradation.
684+
///
685+
/// Disabled by default, set to a number greater than 0 for enabling it.
686+
pub hash_join_buffering_capacity: usize, default = 0
672687
}
673688
}
674689

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::JoinSide;
20+
use datafusion_common::config::ConfigOptions;
21+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22+
use datafusion_physical_plan::ExecutionPlan;
23+
use datafusion_physical_plan::buffer::BufferExec;
24+
use datafusion_physical_plan::joins::HashJoinExec;
25+
use std::sync::Arc;
26+
27+
/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28+
/// configured capacity in the probe side:
29+
///
30+
/// ```text
31+
/// ┌───────────────────┐
32+
/// │ HashJoinExec │
33+
/// └─────▲────────▲────┘
34+
/// ┌───────┘ └─────────┐
35+
/// │ │
36+
/// ┌────────────────┐ ┌─────────────────┐
37+
/// │ Build side │ + │ BufferExec │
38+
/// └────────────────┘ └────────▲────────┘
39+
/// │
40+
/// ┌────────┴────────┐
41+
/// │ Probe side │
42+
/// └─────────────────┘
43+
/// ```
44+
///
45+
/// Which allows eagerly pulling it even before the build side has completely finished.
46+
#[derive(Debug, Default)]
47+
pub struct HashJoinBuffering {}
48+
49+
impl HashJoinBuffering {
50+
pub fn new() -> Self {
51+
Self::default()
52+
}
53+
}
54+
55+
impl PhysicalOptimizerRule for HashJoinBuffering {
56+
fn optimize(
57+
&self,
58+
plan: Arc<dyn ExecutionPlan>,
59+
config: &ConfigOptions,
60+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61+
let capacity = config.execution.hash_join_buffering_capacity;
62+
if capacity == 0 {
63+
return Ok(plan);
64+
}
65+
66+
plan.transform_down(|plan| {
67+
let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else {
68+
return Ok(Transformed::no(plan));
69+
};
70+
let plan = Arc::clone(&plan);
71+
Ok(Transformed::yes(
72+
if HashJoinExec::probe_side() == JoinSide::Left {
73+
// Do not stack BufferExec nodes together.
74+
if node.left.as_any().downcast_ref::<BufferExec>().is_some() {
75+
return Ok(Transformed::no(plan));
76+
}
77+
plan.with_new_children(vec![
78+
Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79+
Arc::clone(&node.right),
80+
])?
81+
} else {
82+
// Do not stack BufferExec nodes together.
83+
if node.right.as_any().downcast_ref::<BufferExec>().is_some() {
84+
return Ok(Transformed::no(plan));
85+
}
86+
plan.with_new_children(vec![
87+
Arc::clone(&node.left),
88+
Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89+
])?
90+
},
91+
))
92+
})
93+
.data()
94+
}
95+
96+
fn name(&self) -> &str {
97+
"HashJoinBuffering"
98+
}
99+
100+
fn schema_check(&self) -> bool {
101+
true
102+
}
103+
}

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod optimizer;
3939
pub mod output_requirements;
4040
pub mod projection_pushdown;
4141
pub use datafusion_pruning as pruning;
42+
pub mod hash_join_buffering;
4243
pub mod pushdown_sort;
4344
pub mod sanity_checker;
4445
pub mod topk_aggregation;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
3535
use crate::topk_aggregation::TopKAggregation;
3636
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3737

38+
use crate::hash_join_buffering::HashJoinBuffering;
3839
use crate::limit_pushdown_past_window::LimitPushPastWindows;
3940
use crate::pushdown_sort::PushdownSort;
4041
use datafusion_common::Result;
@@ -131,6 +132,10 @@ impl PhysicalOptimizer {
131132
// This can possibly be combined with [LimitPushdown]
132133
// It needs to come after [EnforceSorting]
133134
Arc::new(LimitPushPastWindows::new()),
135+
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
136+
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
137+
// the build side is completely finished.
138+
Arc::new(HashJoinBuffering::new()),
134139
// The LimitPushdown rule tries to push limits down as far as possible,
135140
// replacing operators with fetching variants, or adding limits
136141
// past operators that support limit pushdown.

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ The following configuration settings are available:
133133
| datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. |
134134
| datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. |
135135
| datafusion.execution.enable_ansi_mode | false | Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. |
136+
| datafusion.execution.hash_join_buffering_capacity | 0 | How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. 0 by default. Set to something greater than 0 for enabling it. |
136137
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
137138
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
138139
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |

0 commit comments

Comments
 (0)