Skip to content

Commit 8a8c718

Browse files
committed
merge previous PR
2 parents e3bbcc4 + 8ddef97 commit 8a8c718

File tree

2 files changed

+254
-0
lines changed

2 files changed

+254
-0
lines changed

src/common/util.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ pub fn display_plan_with_partition_in_out(plan: &dyn ExecutionPlan) -> Result<St
4646
///
4747
/// The plans we cannot split are:
4848
/// - NestedLoopJoinExec
49+
<<<<<<< HEAD
4950
/// - HashJoinExec when not in Partitioned mode
51+
=======
52+
>>>>>>> robtandy/nested_loop_joins
5053
pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
5154
// recursively check to see if this stages plan contains a NestedLoopJoinExec
5255
let mut has_unsplittable_plan = false;
@@ -57,6 +60,7 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
5760
{
5861
has_unsplittable_plan = true;
5962
return Ok(TreeNodeRecursion::Stop);
63+
<<<<<<< HEAD
6064
} else if let Some(hash_join) = f
6165
.as_any()
6266
.downcast_ref::<datafusion::physical_plan::joins::HashJoinExec>()
@@ -65,6 +69,8 @@ pub fn can_be_divided(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
6569
has_unsplittable_plan = true;
6670
return Ok(TreeNodeRecursion::Stop);
6771
}
72+
=======
73+
>>>>>>> robtandy/nested_loop_joins
6874
}
6975

7076
Ok(TreeNodeRecursion::Continue)

tests/tpch_validation_test.rs

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
mod common;
2+
3+
#[cfg(all(feature = "integration", test))]
4+
mod tests {
5+
use crate::common::{ensure_tpch_data, get_test_data_dir, get_test_tpch_query};
6+
use async_trait::async_trait;
7+
use datafusion::error::DataFusionError;
8+
use datafusion::execution::SessionStateBuilder;
9+
10+
use datafusion::prelude::{SessionConfig, SessionContext};
11+
use datafusion_distributed::test_utils::localhost::start_localhost_context;
12+
use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder};
13+
use futures::TryStreamExt;
14+
use std::error::Error;
15+
use std::sync::Arc;
16+
17+
#[tokio::test]
18+
async fn test_tpch_1() -> Result<(), Box<dyn Error>> {
19+
test_tpch_query(1).await
20+
}
21+
22+
#[tokio::test]
23+
async fn test_tpch_2() -> Result<(), Box<dyn Error>> {
24+
test_tpch_query(2).await
25+
}
26+
27+
#[tokio::test]
28+
async fn test_tpch_3() -> Result<(), Box<dyn Error>> {
29+
test_tpch_query(3).await
30+
}
31+
32+
#[tokio::test]
33+
async fn test_tpch_4() -> Result<(), Box<dyn Error>> {
34+
test_tpch_query(4).await
35+
}
36+
37+
#[tokio::test]
38+
async fn test_tpch_5() -> Result<(), Box<dyn Error>> {
39+
test_tpch_query(5).await
40+
}
41+
42+
#[tokio::test]
43+
async fn test_tpch_6() -> Result<(), Box<dyn Error>> {
44+
test_tpch_query(6).await
45+
}
46+
47+
#[tokio::test]
48+
async fn test_tpch_7() -> Result<(), Box<dyn Error>> {
49+
test_tpch_query(7).await
50+
}
51+
52+
#[tokio::test]
53+
async fn test_tpch_8() -> Result<(), Box<dyn Error>> {
54+
test_tpch_query(8).await
55+
}
56+
57+
#[tokio::test]
58+
async fn test_tpch_9() -> Result<(), Box<dyn Error>> {
59+
test_tpch_query(9).await
60+
}
61+
62+
#[tokio::test]
63+
async fn test_tpch_10() -> Result<(), Box<dyn Error>> {
64+
test_tpch_query(10).await
65+
}
66+
67+
#[tokio::test]
68+
async fn test_tpch_11() -> Result<(), Box<dyn Error>> {
69+
test_tpch_query(11).await
70+
}
71+
72+
#[tokio::test]
73+
async fn test_tpch_12() -> Result<(), Box<dyn Error>> {
74+
test_tpch_query(12).await
75+
}
76+
77+
#[tokio::test]
78+
async fn test_tpch_13() -> Result<(), Box<dyn Error>> {
79+
test_tpch_query(13).await
80+
}
81+
82+
#[tokio::test]
83+
async fn test_tpch_14() -> Result<(), Box<dyn Error>> {
84+
test_tpch_query(14).await
85+
}
86+
87+
#[tokio::test]
88+
async fn test_tpch_15() -> Result<(), Box<dyn Error>> {
89+
test_tpch_query(15).await
90+
}
91+
92+
#[tokio::test]
93+
async fn test_tpch_16() -> Result<(), Box<dyn Error>> {
94+
test_tpch_query(16).await
95+
}
96+
97+
#[tokio::test]
98+
async fn test_tpch_17() -> Result<(), Box<dyn Error>> {
99+
test_tpch_query(17).await
100+
}
101+
102+
#[tokio::test]
103+
async fn test_tpch_18() -> Result<(), Box<dyn Error>> {
104+
test_tpch_query(18).await
105+
}
106+
107+
#[tokio::test]
108+
async fn test_tpch_19() -> Result<(), Box<dyn Error>> {
109+
test_tpch_query(19).await
110+
}
111+
112+
#[tokio::test]
113+
async fn test_tpch_20() -> Result<(), Box<dyn Error>> {
114+
test_tpch_query(20).await
115+
}
116+
117+
#[tokio::test]
118+
async fn test_tpch_21() -> Result<(), Box<dyn Error>> {
119+
test_tpch_query(21).await
120+
}
121+
122+
#[tokio::test]
123+
async fn test_tpch_22() -> Result<(), Box<dyn Error>> {
124+
test_tpch_query(22).await
125+
}
126+
127+
async fn test_tpch_query(query_id: u8) -> Result<(), Box<dyn Error>> {
128+
let (ctx, _guard) = start_localhost_context(2, TestSessionBuilder).await;
129+
run_tpch_query(ctx, query_id).await
130+
}
131+
132+
#[derive(Clone)]
133+
struct TestSessionBuilder;
134+
135+
#[async_trait]
136+
impl SessionBuilder for TestSessionBuilder {
137+
fn session_state_builder(
138+
&self,
139+
builder: SessionStateBuilder,
140+
) -> Result<SessionStateBuilder, DataFusionError> {
141+
let mut config = SessionConfig::new().with_target_partitions(3);
142+
143+
// FIXME: these three options are critical for the correct function of the library
144+
// but we are not enforcing that the user sets them. They are here at the moment
145+
// but we should figure out a way to do this better.
146+
config
147+
.options_mut()
148+
.optimizer
149+
.hash_join_single_partition_threshold = 0;
150+
config
151+
.options_mut()
152+
.optimizer
153+
.hash_join_single_partition_threshold_rows = 0;
154+
155+
config.options_mut().optimizer.prefer_hash_join = true;
156+
// end critical options section
157+
let rule = DistributedPhysicalOptimizerRule::new().with_maximum_partitions_per_task(2);
158+
Ok(builder
159+
.with_config(config)
160+
.with_physical_optimizer_rule(Arc::new(rule)))
161+
}
162+
163+
async fn session_context(
164+
&self,
165+
ctx: SessionContext,
166+
) -> std::result::Result<SessionContext, DataFusionError> {
167+
Ok(ctx)
168+
}
169+
}
170+
171+
// test_non_distributed_consistency runs each TPC-H query twice - once in a distributed manner
172+
// and once in a non-distributed manner. For each query, it asserts that the results are identical.
173+
async fn run_tpch_query(ctx2: SessionContext, query_id: u8) -> Result<(), Box<dyn Error>> {
174+
ensure_tpch_data().await;
175+
let sql = get_test_tpch_query(query_id);
176+
177+
// Context 1: Non-distributed execution.
178+
let config1 = SessionConfig::new().with_target_partitions(3);
179+
let state1 = SessionStateBuilder::new()
180+
.with_default_features()
181+
.with_config(config1)
182+
.build();
183+
let ctx1 = SessionContext::new_with_state(state1);
184+
185+
// Register tables for first context
186+
for table_name in [
187+
"lineitem", "orders", "part", "partsupp", "customer", "nation", "region", "supplier",
188+
] {
189+
let query_path = get_test_data_dir().join(format!("{}.parquet", table_name));
190+
ctx1.register_parquet(
191+
table_name,
192+
query_path.to_string_lossy().as_ref(),
193+
datafusion::prelude::ParquetReadOptions::default(),
194+
)
195+
.await?;
196+
197+
ctx2.register_parquet(
198+
table_name,
199+
query_path.to_string_lossy().as_ref(),
200+
datafusion::prelude::ParquetReadOptions::default(),
201+
)
202+
.await?;
203+
}
204+
205+
// Query 15 has three queries in it, one creating the view, the second
206+
// executing, which we want to capture the output of, and the third
207+
// tearing down the view
208+
let (stream1, stream2) = if query_id == 15 {
209+
let queries: Vec<&str> = sql
210+
.split(';')
211+
.map(str::trim)
212+
.filter(|s| !s.is_empty())
213+
.collect();
214+
215+
ctx1.sql(queries[0]).await?.collect().await?;
216+
ctx2.sql(queries[0]).await?.collect().await?;
217+
let df1 = ctx1.sql(queries[1]).await?;
218+
let df2 = ctx2.sql(queries[1]).await?;
219+
220+
let stream1 = df1.execute_stream().await?;
221+
let stream2 = df2.execute_stream().await?;
222+
223+
ctx1.sql(queries[2]).await?.collect().await?;
224+
ctx2.sql(queries[2]).await?.collect().await?;
225+
(stream1, stream2)
226+
} else {
227+
let stream1 = ctx1.sql(&sql).await?.execute_stream().await?;
228+
let stream2 = ctx2.sql(&sql).await?.execute_stream().await?;
229+
230+
(stream1, stream2)
231+
};
232+
233+
let batches1 = stream1.try_collect::<Vec<_>>().await?;
234+
let batches2 = stream2.try_collect::<Vec<_>>().await?;
235+
236+
let formatted1 = arrow::util::pretty::pretty_format_batches(&batches1)?;
237+
let formatted2 = arrow::util::pretty::pretty_format_batches(&batches2)?;
238+
239+
assert_eq!(
240+
formatted1.to_string(),
241+
formatted2.to_string(),
242+
"Query {} results differ between executions",
243+
query_id
244+
);
245+
246+
Ok(())
247+
}
248+
}

0 commit comments

Comments
 (0)