Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,14 +1074,19 @@ impl LogicalPlanBuilder {
let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;

let on = left_keys
let on: Vec<_> = left_keys
.into_iter()
.zip(right_keys)
.map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
.collect();
let join_schema =
build_join_schema(self.plan.schema(), right.schema(), &join_type)?;

// Inner type without join condition is cross join
if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
return plan_err!("join condition should not be empty");
}

Ok(Self::new(LogicalPlan::Join(Join {
left: self.plan,
right: Arc::new(right),
Expand Down
174 changes: 5 additions & 169 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,13 @@ fn transformed_limit(
fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
use JoinType::*;

fn is_no_join_condition(join: &Join) -> bool {
join.on.is_empty() && join.filter.is_none()
// Cross join is the special case of inner join where there is no join condition. see [LogicalPlanBuilder::cross_join]
fn is_cross_join(join: &Join) -> bool {
join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
}

let (left_limit, right_limit) = if is_no_join_condition(&join) {
match join.join_type {
Left | Right | Full | Inner => (Some(limit), Some(limit)),
LeftAnti | LeftSemi | LeftMark => (Some(limit), None),
RightAnti | RightSemi => (None, Some(limit)),
}
let (left_limit, right_limit) = if is_cross_join(&join) {
(Some(limit), Some(limit))
} else {
match join.join_type {
Left => (Some(limit), None),
Expand Down Expand Up @@ -861,167 +858,6 @@ mod test {
assert_optimized_plan_equal(outer_query, expected)
}

#[test]
fn limit_should_push_down_join_without_condition() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case has never occurred now.

let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;
let left_keys: Vec<&str> = Vec::new();
let right_keys: Vec<&str> = Vec::new();
let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Left,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Left Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Right,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Right Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::Full,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n Full Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::LeftSemi,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n LeftSemi Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n TableScan: test2";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::LeftAnti,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n LeftAnti Join: \
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n TableScan: test2";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1.clone())
.join(
LogicalPlanBuilder::from(table_scan_2.clone()).build()?,
JoinType::RightSemi,
(left_keys.clone(), right_keys.clone()),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n RightSemi Join: \
\n TableScan: test\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_equal(plan, expected)?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::RightAnti,
(left_keys, right_keys),
None,
)?
.limit(0, Some(1000))?
.build()?;

let expected = "Limit: skip=0, fetch=1000\
\n RightAnti Join: \
\n TableScan: test\
\n Limit: skip=0, fetch=1000\
\n TableScan: test2, fetch=1000";

assert_optimized_plan_equal(plan, expected)
}

#[test]
fn limit_should_push_down_left_outer_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Left,
(vec!["a"], vec!["a"]),
None,
)?
.limit(0, Some(1000))?
.build()?;

// Limit pushdown Not supported in Join
let expected = "Limit: skip=0, fetch=1000\
\n Left Join: test.a = test2.a\
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000\
\n TableScan: test2";

assert_optimized_plan_equal(plan, expected)
}

#[test]
fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
let table_scan_1 = test_table_scan()?;
Expand Down
18 changes: 11 additions & 7 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,11 @@ fn build_join(
_ => {
// if not correlated, group down to 1 row and left join on that (preserving row count)
LogicalPlanBuilder::from(filter_input.clone())
.join_on(sub_query_alias, JoinType::Left, None)?
.join_on(
sub_query_alias,
JoinType::Left,
vec![Expr::Literal(ScalarValue::Boolean(Some(true)))],
)?
.build()?
}
}
Expand Down Expand Up @@ -557,7 +561,7 @@ mod tests {
// it will optimize, but fail for the same reason the unoptimized query would
let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\
\n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\
Expand Down Expand Up @@ -589,7 +593,7 @@ mod tests {

let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\
\n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\
Expand Down Expand Up @@ -965,7 +969,7 @@ mod tests {

let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey < __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\
\n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\
Expand Down Expand Up @@ -996,7 +1000,7 @@ mod tests {

let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]\
\n Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]\
Expand Down Expand Up @@ -1097,8 +1101,8 @@ mod tests {

let expected = "Projection: customer.c_custkey [c_custkey:Int64]\
\n Filter: customer.c_custkey BETWEEN __scalar_sq_1.min(orders.o_custkey) AND __scalar_sq_2.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\
\n Left Join: [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N, max(orders.o_custkey):Int64;N]\
\n Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, min(orders.o_custkey):Int64;N]\
\n TableScan: customer [c_custkey:Int64, c_name:Utf8]\
\n SubqueryAlias: __scalar_sq_1 [min(orders.o_custkey):Int64;N]\
\n Projection: min(orders.o_custkey) [min(orders.o_custkey):Int64;N]\
Expand Down
5 changes: 0 additions & 5 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,11 +1452,6 @@ fn test_unnest_to_sql() {

#[test]
fn test_join_with_no_conditions() {
sql_round_trip(
GenericDialect {},
"SELECT j1.j1_id, j1.j1_string FROM j1 JOIN j2",
"SELECT j1.j1_id, j1.j1_string FROM j1 CROSS JOIN j2",
);
sql_round_trip(
GenericDialect {},
"SELECT j1.j1_id, j1.j1_string FROM j1 CROSS JOIN j2",
Expand Down
22 changes: 22 additions & 0 deletions datafusion/sqllogictest/test_files/join.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,28 @@ FROM t1
----
11 11 11

# join condition is required
# TODO: query error join condition should not be empty
# related to: https://github.com/apache/datafusion/issues/13486
statement ok
SELECT * FROM t1 JOIN t2

# join condition is required
query error join condition should not be empty
SELECT * FROM t1 LEFT JOIN t2

# join condition is required
query error join condition should not be empty
SELECT * FROM t1 RIGHT JOIN t2

# join condition is required
query error join condition should not be empty
SELECT * FROM t1 FULL JOIN t2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include also a cross join without filters like

SELECT * FROM t1 CROSS JOIN t2


# cross join no need for join condition
statement ok
SELECT * FROM t1 CROSS JOIN t2

# multiple inner joins with mixed ON clause and filter
query III rowsort
SELECT t1.t1_id, t2.t2_id, t3.t3_id
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/plans/q11.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ logical_plan
06)----------Inner Join: supplier.s_nationkey = nation.n_nationkey
07)------------Projection: partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey
08)--------------Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
09)----------------TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost], partial_filters=[Boolean(true)]
10)----------------TableScan: supplier projection=[s_suppkey, s_nationkey]
11)------------Projection: nation.n_nationkey
12)--------------Filter: nation.n_name = Utf8("GERMANY")
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ logical_plan
03)----Inner Join: revenue0.total_revenue = __scalar_sq_1.max(revenue0.total_revenue)
04)------Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue
05)--------Inner Join: supplier.s_suppkey = revenue0.supplier_no
06)----------TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
06)----------TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone], partial_filters=[Boolean(true)]
07)----------SubqueryAlias: revenue0
08)------------Projection: lineitem.l_suppkey AS supplier_no, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
09)--------------Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[sum(lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ logical_plan
07)------------Projection: customer.c_phone, customer.c_acctbal
08)--------------LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey
09)----------------Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")])
10)------------------TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")])]
10)------------------TableScan: customer projection=[c_custkey, c_phone, c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8View("13"), Utf8View("31"), Utf8View("23"), Utf8View("29"), Utf8View("30"), Utf8View("18"), Utf8View("17")]), Boolean(true)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I benchmarked to ensure the plan change won't impact the performance.

--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃    main ┃ fix_join-check ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │ 68.97ms │        70.14ms │    no change │
│ QQuery 2     │ 18.72ms │        18.55ms │    no change │
│ QQuery 3     │ 30.47ms │        29.46ms │    no change │
│ QQuery 4     │ 21.45ms │        21.77ms │    no change │
│ QQuery 5     │ 43.53ms │        41.56ms │    no change │
│ QQuery 6     │ 14.39ms │        14.88ms │    no change │
│ QQuery 7     │ 55.03ms │        55.30ms │    no change │
│ QQuery 8     │ 41.24ms │        39.45ms │    no change │
│ QQuery 9     │ 50.84ms │        53.19ms │    no change │
│ QQuery 10    │ 44.85ms │        44.83ms │    no change │
│ QQuery 11    │ 13.42ms │        13.22ms │    no change │
│ QQuery 12    │ 28.49ms │        28.20ms │    no change │
│ QQuery 13    │ 29.56ms │        29.38ms │    no change │
│ QQuery 14    │ 23.03ms │        24.40ms │ 1.06x slower │
│ QQuery 15    │ 33.59ms │        34.98ms │    no change │
│ QQuery 16    │ 13.07ms │        13.08ms │    no change │
│ QQuery 17    │ 58.00ms │        58.67ms │    no change │
│ QQuery 18    │ 76.00ms │        75.06ms │    no change │
│ QQuery 19    │ 39.99ms │        38.40ms │    no change │
│ QQuery 20    │ 29.04ms │        29.79ms │    no change │
│ QQuery 21    │ 62.14ms │        64.14ms │    no change │
│ QQuery 22    │ 13.82ms │        14.01ms │    no change │
└──────────────┴─────────┴────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary             ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (main)             │ 809.65ms │
│ Total Time (fix_join-check)   │ 812.45ms │
│ Average Time (main)           │  36.80ms │
│ Average Time (fix_join-check) │  36.93ms │
│ Queries Faster                │        0 │
│ Queries Slower                │        1 │
│ Queries with No Change        │       21 │
└───────────────────────────────┴──────────┘

11)----------------SubqueryAlias: __correlated_sq_1
12)------------------TableScan: orders projection=[o_custkey]
13)------------SubqueryAlias: __scalar_sq_2
Expand Down