Skip to content

Commit ff9167d

Browse files
committed
allow nested_loop_joins
1 parent b247b82 commit ff9167d

File tree

3 files changed

+50
-14
lines changed

3 files changed

+50
-14
lines changed

src/common/util.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
12
use datafusion::error::Result;
3+
use datafusion::physical_plan::joins::PartitionMode;
24
use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
35

46
use std::fmt::Write;
7+
use std::sync::Arc;
58

69
pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<String> {
710
let mut f = String::new();
@@ -34,3 +37,29 @@ pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<St
3437
visit(plan, 0, &mut f)?;
3538
Ok(f)
3639
}
40+
41+
/// Returns a boolean indicating if this stage can be divided into more than one task.
42+
///
43+
/// Some Plan nodes need to materialize all partitions inorder to execute such as
44+
/// NestedLoopJoinExec. Rewriting the plan to accommodate dividing it into tasks
45+
/// would result in redundant work.
46+
///
47+
/// The plans we cannot split are:
48+
/// - NestedLoopJoinExec
49+
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
50+
// recursively check to see if this stages plan contains a NestedLoopJoinExec
51+
let mut has_unsplittable_plan = false;
52+
let search = |f: &Arc<dyn ExecutionPlan>| {
53+
if f.as_any()
54+
.downcast_ref::<datafusion::physical_plan::joins::NestedLoopJoinExec>()
55+
.is_some()
56+
{
57+
has_unsplittable_plan = true;
58+
return Ok(TreeNodeRecursion::Stop);
59+
}
60+
61+
Ok(TreeNodeRecursion::Continue)
62+
};
63+
plan.apply(search)?;
64+
Ok(!has_unsplittable_plan)
65+
}

src/test_utils/tpch.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,18 @@ where
160160

161161
macro_rules! must_generate_tpch_table {
162162
($generator:ident, $arrow:ident, $name:literal, $data_dir:expr) => {
163-
generate_table(
164-
// TODO: Consider adjusting the partitions and batch sizes.
165-
$arrow::new($generator::new(SCALE_FACTOR, 1, 1)).with_batch_size(1000),
166-
$name,
167-
$data_dir,
168-
)
169-
.expect(concat!("Failed to generate ", $name, " table"));
163+
let data_dir = $data_dir.join(format!("{}.parquet", $name));
164+
fs::create_dir_all(data_dir.clone()).expect("Failed to create data directory");
165+
// create three partitions for the table
166+
(1..=3).for_each(|part| {
167+
generate_table(
168+
// TODO: Consider adjusting the partitions and batch sizes.
169+
$arrow::new($generator::new(SCALE_FACTOR, part, 3)).with_batch_size(1000),
170+
&format!("{}.parquet", part),
171+
&data_dir.clone().into_boxed_path(),
172+
)
173+
.expect(concat!("Failed to generate ", $name, " table"));
174+
});
170175
};
171176
}
172177

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ mod tests {
66
use async_trait::async_trait;
77
use datafusion::error::DataFusionError;
88
use datafusion::execution::SessionStateBuilder;
9+
use datafusion::physical_plan::displayable;
910
use datafusion::prelude::{SessionConfig, SessionContext};
1011
use datafusion_distributed::test_utils::localhost::start_localhost_context;
11-
use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder};
12+
use datafusion_distributed::{
13+
display_stage_graphviz, DistributedPhysicalOptimizerRule, ExecutionStage, SessionBuilder,
14+
};
1215
use futures::TryStreamExt;
1316
use std::error::Error;
1417
use std::sync::Arc;
@@ -119,8 +122,6 @@ mod tests {
119122
}
120123

121124
#[tokio::test]
122-
// TODO: Add support for NestedLoopJoinExec to support query 22.
123-
#[ignore]
124125
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
125126
test_tpch_query(22).await
126127
}
@@ -155,7 +156,6 @@ mod tests {
155156

156157
config.options_mut().optimizer.prefer_hash_join = true;
157158
// end critical options section
158-
159159
let rule = DistributedPhysicalOptimizerRule::new().with_maximum_partitions_per_task(2);
160160
Ok(builder
161161
.with_config(config)
@@ -174,7 +174,6 @@ mod tests {
174174
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
175175
async fn run_tpch_query(ctx2: SessionContext, query_id: u8) -> Result<(), Box<dyn Error>> {
176176
ensure_tpch_data().await;
177-
178177
let sql = get_test_tpch_query(query_id);
179178

180179
// Context 1: Non-distributed execution.
@@ -205,19 +204,21 @@ mod tests {
205204
.await?;
206205
}
207206

207+
// Query 15 has three queries in it, one creating the view, the second
208+
// executing, which we want to capture the output of, and the third
209+
// tearing down the view
208210
let (stream1, stream2) = if query_id == 15 {
209211
let queries: Vec<&str> = sql
210212
.split(';')
211213
.map(str::trim)
212214
.filter(|s| !s.is_empty())
213215
.collect();
214216

215-
println!("queryies: {:?}", queries);
216-
217217
ctx1.sql(queries[0]).await?.collect().await?;
218218
ctx2.sql(queries[0]).await?.collect().await?;
219219
let df1 = ctx1.sql(queries[1]).await?;
220220
let df2 = ctx2.sql(queries[1]).await?;
221+
221222
let stream1 = df1.execute_stream().await?;
222223
let stream2 = df2.execute_stream().await?;
223224

@@ -227,6 +228,7 @@ mod tests {
227228
} else {
228229
let stream1 = ctx1.sql(&sql).await?.execute_stream().await?;
229230
let stream2 = ctx2.sql(&sql).await?.execute_stream().await?;
231+
230232
(stream1, stream2)
231233
};
232234

0 commit comments

Comments
 (0)