Skip to content

Commit d2551e7

Browse files
committed
Add optional hash join buffering
1 parent 5642297 commit d2551e7

File tree

19 files changed

+158
-9
lines changed

19 files changed

+158
-9
lines changed

benchmarks/src/imdb/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub struct RunOpt {
9292
/// True by default.
9393
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
9494
prefer_hash_join: BoolDefaultTrue,
95+
96+
/// How many bytes to buffer on the probe side of hash joins.
97+
#[arg(long, default_value = "0")]
98+
hash_join_buffering_capacity: usize,
9599
}
96100

97101
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
306310
.config()?
307311
.with_collect_statistics(!self.disable_statistics);
308312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313+
config.options_mut().execution.hash_join_buffering_capacity =
314+
self.hash_join_buffering_capacity;
309315
let rt_builder = self.common.runtime_env_builder()?;
310316
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
311317

@@ -527,6 +533,7 @@ mod tests {
527533
output_path: None,
528534
disable_statistics: false,
529535
prefer_hash_join: true,
536+
hash_join_buffering_capacity: 0,
530537
};
531538
opt.register_tables(&ctx).await?;
532539
let queries = get_query_sql(map_query_id_to_str(query))?;
@@ -563,6 +570,7 @@ mod tests {
563570
output_path: None,
564571
disable_statistics: false,
565572
prefer_hash_join: true,
573+
hash_join_buffering_capacity: 0,
566574
};
567575
opt.register_tables(&ctx).await?;
568576
let queries = get_query_sql(map_query_id_to_str(query))?;

benchmarks/src/tpcds/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ pub struct RunOpt {
144144
/// The tables should have been created with the `--sort` option for this to have any effect.
145145
#[arg(short = 't', long = "sorted")]
146146
sorted: bool,
147+
148+
/// How many bytes to buffer on the probe side of hash joins.
149+
#[arg(long, default_value = "0")]
150+
hash_join_buffering_capacity: usize,
147151
}
148152

149153
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
162166
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
163167
config.options_mut().optimizer.enable_piecewise_merge_join =
164168
self.enable_piecewise_merge_join;
169+
config.options_mut().execution.hash_join_buffering_capacity =
170+
self.hash_join_buffering_capacity;
165171
let rt_builder = self.common.runtime_env_builder()?;
166172
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
167173
// register tables

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
/// The tables should have been created with the `--sort` option for this to have any effect.
106106
#[arg(short = 't', long = "sorted")]
107107
sorted: bool,
108+
109+
/// How many bytes to buffer on the probe side of hash joins.
110+
#[arg(long, default_value = "0")]
111+
hash_join_buffering_capacity: usize,
108112
}
109113

110114
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
123127
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124128
config.options_mut().optimizer.enable_piecewise_merge_join =
125129
self.enable_piecewise_merge_join;
130+
config.options_mut().execution.hash_join_buffering_capacity =
131+
self.hash_join_buffering_capacity;
126132
let rt_builder = self.common.runtime_env_builder()?;
127133
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
128134
// register tables
@@ -392,6 +398,7 @@ mod tests {
392398
prefer_hash_join: true,
393399
enable_piecewise_merge_join: false,
394400
sorted: false,
401+
hash_join_buffering_capacity: 0,
395402
};
396403
opt.register_tables(&ctx).await?;
397404
let queries = get_query_sql(query)?;
@@ -430,6 +437,7 @@ mod tests {
430437
prefer_hash_join: true,
431438
enable_piecewise_merge_join: false,
432439
sorted: false,
440+
hash_join_buffering_capacity: 0,
433441
};
434442
opt.register_tables(&ctx).await?;
435443
let queries = get_query_sql(query)?;

datafusion/common/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,21 @@ config_namespace! {
669669
/// # Default
670670
/// `false` — ANSI SQL mode is disabled by default.
671671
pub enable_ansi_mode: bool, default = false
672+
673+
/// How many bytes to buffer in the probe side of hash joins while the build side is
674+
/// concurrently being built.
675+
///
676+
/// Without this, hash joins will wait until the full materialization of the build side
677+
/// before polling the probe side. This is useful in scenarios where the query is not
678+
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
679+
/// latency of the query.
680+
///
681+
/// Note that when hash join buffering is enabled, the probe side will start eagerly
682+
/// polling data, not giving time for the producer side of dynamic filters to produce any
683+
/// meaningful predicate. Queries with dynamic filters might see performance degradation.
684+
///
685+
/// Disabled by default, set to a number greater than 0 for enabling it.
686+
pub hash_join_buffering_capacity: usize, default = 0
672687
}
673688
}
674689

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::JoinSide;
20+
use datafusion_common::config::ConfigOptions;
21+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22+
use datafusion_physical_plan::ExecutionPlan;
23+
use datafusion_physical_plan::buffer::BufferExec;
24+
use datafusion_physical_plan::joins::HashJoinExec;
25+
use std::sync::Arc;
26+
27+
/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28+
/// configured capacity in the probe side:
29+
///
30+
/// ```text
31+
/// ┌───────────────────┐
32+
/// │ HashJoinExec │
33+
/// └─────▲────────▲────┘
34+
/// ┌───────┘ └─────────┐
35+
/// │ │
36+
/// ┌────────────────┐ ┌─────────────────┐
37+
/// │ Build side │ + │ BufferExec │
38+
/// └────────────────┘ └────────▲────────┘
39+
/// │
40+
/// ┌────────┴────────┐
41+
/// │ Probe side │
42+
/// └─────────────────┘
43+
/// ```
44+
///
45+
/// Which allows eagerly pulling it even before the build side has completely finished.
46+
#[derive(Debug, Default)]
47+
pub struct HashJoinBuffering {}
48+
49+
impl HashJoinBuffering {
50+
pub fn new() -> Self {
51+
Self::default()
52+
}
53+
}
54+
55+
impl PhysicalOptimizerRule for HashJoinBuffering {
56+
fn optimize(
57+
&self,
58+
plan: Arc<dyn ExecutionPlan>,
59+
config: &ConfigOptions,
60+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61+
let capacity = config.execution.hash_join_buffering_capacity;
62+
if capacity == 0 {
63+
return Ok(plan);
64+
}
65+
66+
plan.transform_down(|plan| {
67+
let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else {
68+
return Ok(Transformed::no(plan));
69+
};
70+
let plan = Arc::clone(&plan);
71+
Ok(Transformed::yes(
72+
if HashJoinExec::probe_side() == JoinSide::Left {
73+
// Do not stack BufferExec nodes together.
74+
if node.left.as_any().downcast_ref::<BufferExec>().is_some() {
75+
return Ok(Transformed::no(plan));
76+
}
77+
plan.with_new_children(vec![
78+
Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79+
Arc::clone(&node.right),
80+
])?
81+
} else {
82+
// Do not stack BufferExec nodes together.
83+
if node.right.as_any().downcast_ref::<BufferExec>().is_some() {
84+
return Ok(Transformed::no(plan));
85+
}
86+
plan.with_new_children(vec![
87+
Arc::clone(&node.left),
88+
Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89+
])?
90+
},
91+
))
92+
})
93+
.data()
94+
}
95+
96+
fn name(&self) -> &str {
97+
"HashJoinBuffering"
98+
}
99+
100+
fn schema_check(&self) -> bool {
101+
true
102+
}
103+
}

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod optimizer;
3939
pub mod output_requirements;
4040
pub mod projection_pushdown;
4141
pub use datafusion_pruning as pruning;
42+
pub mod hash_join_buffering;
4243
pub mod pushdown_sort;
4344
pub mod sanity_checker;
4445
pub mod topk_aggregation;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
3535
use crate::topk_aggregation::TopKAggregation;
3636
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3737

38+
use crate::hash_join_buffering::HashJoinBuffering;
3839
use crate::limit_pushdown_past_window::LimitPushPastWindows;
3940
use crate::pushdown_sort::PushdownSort;
4041
use datafusion_common::Result;
@@ -131,6 +132,10 @@ impl PhysicalOptimizer {
131132
// This can possibly be combined with [LimitPushdown]
132133
// It needs to come after [EnforceSorting]
133134
Arc::new(LimitPushPastWindows::new()),
135+
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
136+
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
137+
// the build side is completely finished.
138+
Arc::new(HashJoinBuffering::new()),
134139
// The LimitPushdown rule tries to push limits down as far as possible,
135140
// replacing operators with fetching variants, or adding limits
136141
// past operators that support limit pushdown.

datafusion/sqllogictest/test_files/datetime/arith_date_time.slt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,3 @@ SELECT '2001-09-28'::date / '03:00'::time
113113

114114
query error Invalid timestamp arithmetic operation
115115
SELECT '2001-09-28'::date % '03:00'::time
116-

datafusion/sqllogictest/test_files/datetime/arith_timestamp_duration.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,4 @@ query error Invalid timestamp arithmetic operation
144144
SELECT '2001-09-28T01:00:00'::timestamp % arrow_cast(12345, 'Duration(Second)');
145145

146146
query error Invalid timestamp arithmetic operation
147-
SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)');
147+
SELECT '2001-09-28T01:00:00'::timestamp / arrow_cast(12345, 'Duration(Second)');

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
239239
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
240240
physical_plan after LimitAggregation SAME TEXT AS ABOVE
241241
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
242+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
242243
physical_plan after LimitPushdown SAME TEXT AS ABOVE
243244
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
244245
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -319,6 +320,7 @@ physical_plan after OutputRequirements
319320
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]]
320321
physical_plan after LimitAggregation SAME TEXT AS ABOVE
321322
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
323+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
322324
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]]
323325
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
324326
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -363,6 +365,7 @@ physical_plan after OutputRequirements
363365
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
364366
physical_plan after LimitAggregation SAME TEXT AS ABOVE
365367
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
368+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
366369
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
367370
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
368371
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -600,6 +603,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
600603
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
601604
physical_plan after LimitAggregation SAME TEXT AS ABOVE
602605
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
606+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
603607
physical_plan after LimitPushdown SAME TEXT AS ABOVE
604608
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
605609
physical_plan after PushdownSort SAME TEXT AS ABOVE

0 commit comments

Comments
 (0)