Skip to content

Commit 63ae2b9

Browse files
authored
Support wildcard select on multiple column using joins (apache#4840)
1 parent f9b72f4 commit 63ae2b9

File tree

4 files changed

+93
-8
lines changed

4 files changed

+93
-8
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,8 +1042,9 @@ pub fn project(
10421042
Expr::Wildcard => {
10431043
projected_expr.extend(expand_wildcard(input_schema, &plan)?)
10441044
}
1045-
Expr::QualifiedWildcard { ref qualifier } => projected_expr
1046-
.extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?),
1045+
Expr::QualifiedWildcard { ref qualifier } => {
1046+
projected_expr.extend(expand_qualified_wildcard(qualifier, input_schema)?)
1047+
}
10471048
_ => projected_expr
10481049
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
10491050
}

datafusion/expr/src/utils.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,23 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
150150
let using_columns = plan.using_columns()?;
151151
let columns_to_skip = using_columns
152152
.into_iter()
153-
// For each USING JOIN condition, only expand to one column in projection
153+
// For each USING JOIN condition, only expand to one of each join column in projection
154154
.flat_map(|cols| {
155155
let mut cols = cols.into_iter().collect::<Vec<_>>();
156156
// sort join columns to make sure we consistently keep the same
157157
// qualified column
158158
cols.sort();
159-
cols.into_iter().skip(1)
159+
let mut out_column_names: HashSet<String> = HashSet::new();
160+
cols.into_iter()
161+
.filter_map(|c| {
162+
if out_column_names.contains(&c.name) {
163+
Some(c)
164+
} else {
165+
out_column_names.insert(c.name);
166+
None
167+
}
168+
})
169+
.collect::<Vec<_>>()
160170
})
161171
.collect::<HashSet<_>>();
162172

@@ -186,7 +196,6 @@ pub fn expand_wildcard(schema: &DFSchema, plan: &LogicalPlan) -> Result<Vec<Expr
186196
pub fn expand_qualified_wildcard(
187197
qualifier: &str,
188198
schema: &DFSchema,
189-
plan: &LogicalPlan,
190199
) -> Result<Vec<Expr>> {
191200
let qualified_fields: Vec<DFField> = schema
192201
.fields_with_qualified(qualifier)
@@ -198,9 +207,14 @@ pub fn expand_qualified_wildcard(
198207
"Invalid qualifier {qualifier}"
199208
)));
200209
}
201-
let qualifier_schema =
210+
let qualified_schema =
202211
DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?;
203-
expand_wildcard(&qualifier_schema, plan)
212+
// if qualified, allow all columns in output (i.e. ignore using column check)
213+
Ok(qualified_schema
214+
.fields()
215+
.iter()
216+
.map(|f| Expr::Column(f.qualified_column()))
217+
.collect::<Vec<Expr>>())
204218
}
205219

206220
/// (expr, "is the SortExpr for window (either comes from PARTITION BY or ORDER BY columns)")

datafusion/sql/src/select.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
368368

369369
let qualifier = format!("{object_name}");
370370
// do not expand from outer schema
371-
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
371+
expand_qualified_wildcard(&qualifier, plan.schema().as_ref())
372372
}
373373
}
374374
}

datafusion/sql/tests/integration_test.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,76 @@ fn join_with_ambiguous_column() {
395395
quick_test(sql, expected);
396396
}
397397

398+
#[test]
399+
fn using_join_multiple_keys() {
400+
let sql = "SELECT * FROM person a join person b using (id, age)";
401+
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
402+
b.first_name, b.last_name, b.state, b.salary, b.birth_date, b.😀\
403+
\n Inner Join: Using a.id = b.id, a.age = b.age\
404+
\n SubqueryAlias: a\
405+
\n TableScan: person\
406+
\n SubqueryAlias: b\
407+
\n TableScan: person";
408+
quick_test(sql, expected);
409+
}
410+
411+
#[test]
412+
fn using_join_multiple_keys_subquery() {
413+
let sql =
414+
"SELECT age FROM (SELECT * FROM person a join person b using (id, age, state))";
415+
let expected = "Projection: a.age\
416+
\n Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
417+
b.first_name, b.last_name, b.salary, b.birth_date, b.😀\
418+
\n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
419+
\n SubqueryAlias: a\
420+
\n TableScan: person\
421+
\n SubqueryAlias: b\
422+
\n TableScan: person";
423+
quick_test(sql, expected);
424+
}
425+
426+
#[test]
427+
fn using_join_multiple_keys_qualified_wildcard_select() {
428+
let sql = "SELECT a.* FROM person a join person b using (id, age)";
429+
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀\
430+
\n Inner Join: Using a.id = b.id, a.age = b.age\
431+
\n SubqueryAlias: a\
432+
\n TableScan: person\
433+
\n SubqueryAlias: b\
434+
\n TableScan: person";
435+
quick_test(sql, expected);
436+
}
437+
438+
#[test]
439+
fn using_join_multiple_keys_select_all_columns() {
440+
let sql = "SELECT a.*, b.* FROM person a join person b using (id, age)";
441+
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
442+
b.id, b.first_name, b.last_name, b.age, b.state, b.salary, b.birth_date, b.😀\
443+
\n Inner Join: Using a.id = b.id, a.age = b.age\
444+
\n SubqueryAlias: a\
445+
\n TableScan: person\
446+
\n SubqueryAlias: b\
447+
\n TableScan: person";
448+
quick_test(sql, expected);
449+
}
450+
451+
#[test]
452+
fn using_join_multiple_keys_multiple_joins() {
453+
let sql = "SELECT * FROM person a join person b using (id, age, state) join person c using (id, age, state)";
454+
let expected = "Projection: a.id, a.first_name, a.last_name, a.age, a.state, a.salary, a.birth_date, a.😀, \
455+
b.first_name, b.last_name, b.salary, b.birth_date, b.😀, \
456+
c.first_name, c.last_name, c.salary, c.birth_date, c.😀\
457+
\n Inner Join: Using a.id = c.id, a.age = c.age, a.state = c.state\
458+
\n Inner Join: Using a.id = b.id, a.age = b.age, a.state = b.state\
459+
\n SubqueryAlias: a\
460+
\n TableScan: person\
461+
\n SubqueryAlias: b\
462+
\n TableScan: person\
463+
\n SubqueryAlias: c\
464+
\n TableScan: person";
465+
quick_test(sql, expected);
466+
}
467+
398468
#[test]
399469
fn select_with_having() {
400470
let sql = "SELECT id, age

0 commit comments

Comments
 (0)