Skip to content

Commit cbbcb93

Browse files
committed
add support for collect left hash joins
2 parents 8a8c718 + 35f0b08 commit cbbcb93

18 files changed

+266
-357
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::{MemTable, TableProvider};
3838
use datafusion::error::{DataFusionError, Result};
39-
use datafusion::execution::SessionStateBuilder;
39+
use datafusion::execution::{SessionState, SessionStateBuilder};
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4141
use datafusion::physical_plan::{collect, displayable};
4242
use datafusion::prelude::*;
4343

4444
use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
4545
use datafusion_distributed::test_utils::localhost::start_localhost_context;
46-
use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder};
46+
use datafusion_distributed::{
47+
DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext,
48+
};
4749
use log::info;
4850
use structopt::StructOpt;
4951

@@ -110,31 +112,19 @@ pub struct RunOpt {
110112
}
111113

112114
#[async_trait]
113-
impl SessionBuilder for RunOpt {
114-
fn session_state_builder(
115+
impl DistributedSessionBuilder for RunOpt {
116+
async fn build_session_state(
115117
&self,
116-
mut builder: SessionStateBuilder,
117-
) -> Result<SessionStateBuilder, DataFusionError> {
118-
let mut config = self
118+
_ctx: DistributedSessionBuilderContext,
119+
) -> Result<SessionState, DataFusionError> {
120+
let mut builder = SessionStateBuilder::new().with_default_features();
121+
122+
let config = self
119123
.common
120124
.config()?
121125
.with_collect_statistics(!self.disable_statistics)
122126
.with_target_partitions(self.partitions());
123127

124-
// // FIXME: these three options are critical for the correct function of the library
125-
// // but we are not enforcing that the user sets them. They are here at the moment
126-
// // but we should figure out a way to do this better.
127-
// config
128-
// .options_mut()
129-
// .optimizer
130-
// .hash_join_single_partition_threshold = 0;
131-
// config
132-
// .options_mut()
133-
// .optimizer
134-
// .hash_join_single_partition_threshold_rows = 0;
135-
//
136-
// config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
137-
// end critical options section
138128
let rt_builder = self.common.runtime_env_builder()?;
139129

140130
if self.distributed {
@@ -145,17 +135,14 @@ impl SessionBuilder for RunOpt {
145135
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
146136
}
147137

148-
Ok(builder
138+
let state = builder
149139
.with_config(config)
150-
.with_runtime_env(rt_builder.build_arc()?))
151-
}
140+
.with_runtime_env(rt_builder.build_arc()?)
141+
.build();
152142

153-
async fn session_context(
154-
&self,
155-
ctx: SessionContext,
156-
) -> std::result::Result<SessionContext, DataFusionError> {
143+
let ctx = SessionContext::from(state);
157144
self.register_tables(&ctx).await?;
158-
Ok(ctx)
145+
Ok(ctx.state())
159146
}
160147
}
161148

src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1+
#[allow(unused)]
12
pub mod ttl_map;
23
pub mod util;

src/common/ttl_map.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use dashmap::{DashMap, Entry};
2727
use datafusion::error::DataFusionError;
2828
use std::collections::HashSet;
2929
use std::hash::Hash;
30-
use std::mem;
3130
use std::sync::atomic::AtomicU64;
3231
#[cfg(test)]
3332
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
@@ -94,7 +93,7 @@ where
9493
shard.insert(key);
9594
}
9695
BucketOp::Clear => {
97-
let keys_to_delete = mem::replace(&mut shard, HashSet::new());
96+
let keys_to_delete = std::mem::take(&mut shard);
9897
for key in keys_to_delete {
9998
data.remove(&key);
10099
}
@@ -253,14 +252,14 @@ where
253252

254253
/// run_gc_loop will continuously clear expired entries from the map, checking every `period`. The
255254
/// function terminates if `shutdown` is signalled.
256-
async fn run_gc_loop(time: Arc<AtomicU64>, period: Duration, buckets: &Vec<Bucket<K>>) {
255+
async fn run_gc_loop(time: Arc<AtomicU64>, period: Duration, buckets: &[Bucket<K>]) {
257256
loop {
258257
tokio::time::sleep(period).await;
259258
Self::gc(time.clone(), buckets);
260259
}
261260
}
262261

263-
fn gc(time: Arc<AtomicU64>, buckets: &Vec<Bucket<K>>) {
262+
fn gc(time: Arc<AtomicU64>, buckets: &[Bucket<K>]) {
264263
let index = time.load(std::sync::atomic::Ordering::SeqCst) % buckets.len() as u64;
265264
buckets[index as usize].clear();
266265
time.fetch_add(1, std::sync::atomic::Ordering::SeqCst);

src/common/util.rs

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -37,44 +37,3 @@ pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<St
3737
visit(plan, 0, &mut f)?;
3838
Ok(f)
3939
}
40-
41-
/// Returns a boolean indicating if this stage can be divided into more than one task.
42-
///
43-
/// Some Plan nodes need to materialize all partitions inorder to execute such as
44-
/// NestedLoopJoinExec. Rewriting the plan to accommodate dividing it into tasks
45-
/// would result in redundant work.
46-
///
47-
/// The plans we cannot split are:
48-
/// - NestedLoopJoinExec
49-
<<<<<<< HEAD
50-
/// - HashJoinExec when not in Partitioned mode
51-
=======
52-
>>>>>>> robtandy/nested_loop_joins
53-
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
54-
// recursively check to see if this stages plan contains a NestedLoopJoinExec
55-
let mut has_unsplittable_plan = false;
56-
let search = |f: &Arc<dyn ExecutionPlan>| {
57-
if f.as_any()
58-
.downcast_ref::<datafusion::physical_plan::joins::NestedLoopJoinExec>()
59-
.is_some()
60-
{
61-
has_unsplittable_plan = true;
62-
return Ok(TreeNodeRecursion::Stop);
63-
<<<<<<< HEAD
64-
} else if let Some(hash_join) = f
65-
.as_any()
66-
.downcast_ref::<datafusion::physical_plan::joins::HashJoinExec>()
67-
{
68-
if hash_join.partition_mode() != &PartitionMode::Partitioned {
69-
has_unsplittable_plan = true;
70-
return Ok(TreeNodeRecursion::Stop);
71-
}
72-
=======
73-
>>>>>>> robtandy/nested_loop_joins
74-
}
75-
76-
Ok(TreeNodeRecursion::Continue)
77-
};
78-
plan.apply(search)?;
79-
Ok(!has_unsplittable_plan)
80-
}

0 commit comments

Comments
 (0)