Skip to content

Commit 8f7e7fe

Browse files
committed
Revert "Fix Schema Duplication Errors in Self‑Referential INTERSECT/EXCEPT by Requalifying Input Sides (apache#18814)"
This reverts commit a3a020f.
1 parent 74ea766 commit 8f7e7fe

File tree

4 files changed

+25
-165
lines changed

4 files changed

+25
-165
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 15 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,15 +1352,6 @@ impl LogicalPlanBuilder {
13521352
);
13531353
}
13541354

1355-
// Requalify sides if needed to avoid duplicate qualified field names
1356-
// (e.g., when both sides reference the same table)
1357-
let left_builder = LogicalPlanBuilder::from(left_plan);
1358-
let right_builder = LogicalPlanBuilder::from(right_plan);
1359-
let (left_builder, right_builder, _requalified) =
1360-
requalify_sides_if_needed(left_builder, right_builder)?;
1361-
let left_plan = left_builder.build()?;
1362-
let right_plan = right_builder.build()?;
1363-
13641355
let join_keys = left_plan
13651356
.schema()
13661357
.fields()
@@ -1740,61 +1731,23 @@ pub fn requalify_sides_if_needed(
17401731
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
17411732
let left_cols = left.schema().columns();
17421733
let right_cols = right.schema().columns();
1743-
1744-
// Requalify if merging the schemas would cause an error during join.
1745-
// This can happen in several cases:
1746-
// 1. Duplicate qualified fields: both sides have same relation.name
1747-
// 2. Duplicate unqualified fields: both sides have same unqualified name
1748-
// 3. Ambiguous reference: one side qualified, other unqualified, same name
1749-
//
1750-
// Implementation note: This uses a simple O(n*m) nested loop rather than
1751-
// a HashMap-based O(n+m) approach. The nested loop is preferred because:
1752-
// - Schemas are typically small (in TPCH benchmark, max is 16 columns),
1753-
// so n*m is negligible
1754-
// - Early return on first conflict makes common case very fast
1755-
// - Code is simpler and easier to reason about
1756-
// - Called only during plan construction, not in execution hot path
1757-
for l in &left_cols {
1758-
for r in &right_cols {
1759-
if l.name != r.name {
1760-
continue;
1761-
}
1762-
1763-
// Same name - check if this would cause a conflict
1764-
match (&l.relation, &r.relation) {
1765-
// Both qualified with same relation - duplicate qualified field
1766-
(Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1767-
return Ok((
1768-
left.alias(TableReference::bare("left"))?,
1769-
right.alias(TableReference::bare("right"))?,
1770-
true,
1771-
));
1772-
}
1773-
// Both unqualified - duplicate unqualified field
1774-
(None, None) => {
1775-
return Ok((
1776-
left.alias(TableReference::bare("left"))?,
1777-
right.alias(TableReference::bare("right"))?,
1778-
true,
1779-
));
1780-
}
1781-
// One qualified, one not - ambiguous reference
1782-
(Some(_), None) | (None, Some(_)) => {
1783-
return Ok((
1784-
left.alias(TableReference::bare("left"))?,
1785-
right.alias(TableReference::bare("right"))?,
1786-
true,
1787-
));
1788-
}
1789-
// Different qualifiers - OK, no conflict
1790-
_ => {}
1791-
}
1792-
}
1734+
if left_cols.iter().any(|l| {
1735+
right_cols.iter().any(|r| {
1736+
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1737+
})
1738+
}) {
1739+
// These names have no connection to the original plan, but they'll make the columns
1740+
// (mostly) unique.
1741+
Ok((
1742+
left.alias(TableReference::bare("left"))?,
1743+
right.alias(TableReference::bare("right"))?,
1744+
true,
1745+
))
1746+
} else {
1747+
Ok((left, right, false))
17931748
}
1794-
1795-
// No conflicts found
1796-
Ok((left, right, false))
17971749
}
1750+
17981751
/// Add additional "synthetic" group by expressions based on functional
17991752
/// dependencies.
18001753
///

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,17 +269,15 @@ fn intersect() -> Result<()> {
269269

270270
assert_snapshot!(
271271
format!("{plan}"),
272-
@r"
273-
LeftSemi Join: left.col_int32 = test.col_int32, left.col_utf8 = test.col_utf8
274-
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
275-
LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8
276-
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
277-
SubqueryAlias: left
278-
TableScan: test projection=[col_int32, col_utf8]
279-
SubqueryAlias: right
280-
TableScan: test projection=[col_int32, col_utf8]
272+
@r#"
273+
LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8
274+
Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
275+
LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8
276+
Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
277+
TableScan: test projection=[col_int32, col_utf8]
281278
TableScan: test projection=[col_int32, col_utf8]
282-
"
279+
TableScan: test projection=[col_int32, col_utf8]
280+
"#
283281
);
284282
Ok(())
285283
}

datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async fn intersect_rels(
8181
rel,
8282
consumer.consume_rel(input).await?,
8383
is_all,
84-
)?;
84+
)?
8585
}
8686

8787
Ok(rel)
@@ -95,8 +95,7 @@ async fn except_rels(
9595
let mut rel = consumer.consume_rel(&rels[0]).await?;
9696

9797
for input in &rels[1..] {
98-
rel =
99-
LogicalPlanBuilder::except(rel, consumer.consume_rel(input).await?, is_all)?;
98+
rel = LogicalPlanBuilder::except(rel, consumer.consume_rel(input).await?, is_all)?
10099
}
101100

102101
Ok(rel)

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,96 +1164,6 @@ async fn simple_intersect_table_reuse() -> Result<()> {
11641164
Ok(())
11651165
}
11661166

1167-
#[tokio::test]
1168-
async fn self_referential_intersect() -> Result<()> {
1169-
// Test INTERSECT with the same table on both sides
1170-
// This previously failed with "Schema contains duplicate qualified field name"
1171-
// The fix ensures requalify_sides_if_needed is called in intersect_or_except
1172-
// After roundtrip through Substrait, SubqueryAlias is lost and requalification
1173-
// produces "left" and "right" aliases
1174-
// Note: INTERSECT (without ALL) includes DISTINCT, but the outer Aggregate
1175-
// is optimized away, resulting in just the **LeftSemi** join
1176-
// (LeftSemi returns rows from left that exist in right)
1177-
assert_expected_plan(
1178-
"SELECT a FROM data WHERE a > 0 INTERSECT SELECT a FROM data WHERE a < 5",
1179-
"LeftSemi Join: left.a = right.a\
1180-
\n SubqueryAlias: left\
1181-
\n Aggregate: groupBy=[[data.a]], aggr=[[]]\
1182-
\n Filter: data.a > Int64(0)\
1183-
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1184-
\n SubqueryAlias: right\
1185-
\n Filter: data.a < Int64(5)\
1186-
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1187-
true,
1188-
)
1189-
.await
1190-
}
1191-
1192-
#[tokio::test]
1193-
async fn self_referential_except() -> Result<()> {
1194-
// Test EXCEPT with the same table on both sides
1195-
// This previously failed with "Schema contains duplicate qualified field name"
1196-
// The fix ensures requalify_sides_if_needed is called in intersect_or_except
1197-
// After roundtrip through Substrait, SubqueryAlias is lost and requalification
1198-
// produces "left" and "right" aliases
1199-
// Note: EXCEPT (without ALL) includes DISTINCT, but the outer Aggregate
1200-
// is optimized away, resulting in just the **LeftAnti** join
1201-
// (LeftAnti returns rows from left that don't exist in right)
1202-
assert_expected_plan(
1203-
"SELECT a FROM data WHERE a > 0 EXCEPT SELECT a FROM data WHERE a < 5",
1204-
"LeftAnti Join: left.a = right.a\
1205-
\n SubqueryAlias: left\
1206-
\n Aggregate: groupBy=[[data.a]], aggr=[[]]\
1207-
\n Filter: data.a > Int64(0)\
1208-
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1209-
\n SubqueryAlias: right\
1210-
\n Filter: data.a < Int64(5)\
1211-
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1212-
true,
1213-
)
1214-
.await
1215-
}
1216-
1217-
#[tokio::test]
1218-
async fn self_referential_intersect_all() -> Result<()> {
1219-
// Test INTERSECT ALL with the same table on both sides
1220-
// INTERSECT ALL preserves duplicates and does not include DISTINCT
1221-
// Uses **LeftSemi** join (returns rows from left that exist in right)
1222-
// The requalification ensures no duplicate field name errors
1223-
assert_expected_plan(
1224-
"SELECT a FROM data WHERE a > 0 INTERSECT ALL SELECT a FROM data WHERE a < 5",
1225-
"LeftSemi Join: left.a = right.a\
1226-
\n SubqueryAlias: left\
1227-
\n Filter: data.a > Int64(0)\
1228-
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1229-
\n SubqueryAlias: right\
1230-
\n Filter: data.a < Int64(5)\
1231-
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1232-
true,
1233-
)
1234-
.await
1235-
}
1236-
1237-
#[tokio::test]
1238-
async fn self_referential_except_all() -> Result<()> {
1239-
// Test EXCEPT ALL with the same table on both sides
1240-
// EXCEPT ALL preserves duplicates and does not include DISTINCT
1241-
// Uses **LeftAnti** join (returns rows from left that don't exist in right)
1242-
// The requalification ensures no duplicate field name errors
1243-
assert_expected_plan(
1244-
"SELECT a FROM data WHERE a > 0 EXCEPT ALL SELECT a FROM data WHERE a < 5",
1245-
"LeftAnti Join: left.a = right.a\
1246-
\n SubqueryAlias: left\
1247-
\n Filter: data.a > Int64(0)\
1248-
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1249-
\n SubqueryAlias: right\
1250-
\n Filter: data.a < Int64(5)\
1251-
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1252-
true,
1253-
)
1254-
.await
1255-
}
1256-
12571167
#[tokio::test]
12581168
async fn simple_window_function() -> Result<()> {
12591169
roundtrip("SELECT RANK() OVER (PARTITION BY a ORDER BY b), d, sum(b) OVER (PARTITION BY a) FROM data;").await

0 commit comments

Comments
 (0)