Skip to content
Merged
5 changes: 5 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
subquery: Subquery {
subquery,
outer_ref_columns,
spans: None,
},
negated: false,
})
Expand All @@ -264,6 +265,7 @@ pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
subquery: Subquery {
subquery,
outer_ref_columns,
spans: None,
},
negated: true,
})
Expand All @@ -277,6 +279,7 @@ pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Subquery {
subquery,
outer_ref_columns,
spans: None,
},
false,
))
Expand All @@ -290,6 +293,7 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Subquery {
subquery,
outer_ref_columns,
spans: None,
},
true,
))
Expand All @@ -301,6 +305,7 @@ pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
spans: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
Ok(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: None,
})
}

Expand Down
10 changes: 8 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, TableReference,
UnnestOptions,
};
use indexmap::IndexSet;
Expand Down Expand Up @@ -940,14 +940,17 @@ impl LogicalPlan {
}))
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
outer_ref_columns,
spans,
..
}) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
let subquery = LogicalPlanBuilder::from(input).build()?;
Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(subquery),
outer_ref_columns: outer_ref_columns.clone(),
spans: spans.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
Expand Down Expand Up @@ -3617,6 +3620,8 @@ pub struct Subquery {
pub subquery: Arc<LogicalPlan>,
/// The outer references used in the subquery
pub outer_ref_columns: Vec<Expr>,
/// Span information for subquery projection columns
pub spans: Option<Spans>,
}

impl Normalizeable for Subquery {
Expand Down Expand Up @@ -3651,6 +3656,7 @@ impl Subquery {
Subquery {
subquery: plan,
outer_ref_columns: self.outer_ref_columns.clone(),
spans: None,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ impl TreeNode for LogicalPlan {
LogicalPlan::Subquery(Subquery {
subquery,
outer_ref_columns,
spans,
}) => subquery.map_elements(f)?.update_data(|subquery| {
LogicalPlan::Subquery(Subquery {
subquery,
outer_ref_columns,
spans,
})
}),
LogicalPlan::SubqueryAlias(SubqueryAlias {
Expand Down
7 changes: 7 additions & 0 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
spans,
}) => {
let new_plan =
analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data;
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns,
spans,
})))
}
Expr::Exists(Exists { subquery, negated }) => {
Expand All @@ -329,6 +331,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
subquery: Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: subquery.spans,
},
negated,
})))
Expand All @@ -352,6 +355,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
let new_subquery = Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: subquery.spans,
};
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
Box::new(expr.cast_to(&common_type, self.schema)?),
Expand Down Expand Up @@ -2089,6 +2093,7 @@ mod test {
Subquery {
subquery: empty_int32,
outer_ref_columns: vec![],
spans: None,
},
false,
));
Expand All @@ -2114,6 +2119,7 @@ mod test {
Subquery {
subquery: empty_int64,
outer_ref_columns: vec![],
spans: None,
},
false,
));
Expand All @@ -2138,6 +2144,7 @@ mod test {
Subquery {
subquery: empty_inside,
outer_ref_columns: vec![],
spans: None,
},
false,
));
Expand Down
112 changes: 105 additions & 7 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::{Expr, Subquery};
use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::{Expr, LogicalPlan, Subquery};
use sqlparser::ast::Expr as SQLExpr;
use sqlparser::ast::Query;
use sqlparser::ast::{Query, SelectItem, SetExpr};
use std::sync::Arc;

impl<S: ContextProvider> SqlToRel<'_, S> {
Expand All @@ -41,6 +40,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
subquery: Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: None,
},
negated,
}))
Expand All @@ -56,15 +56,37 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item {
if let Some(span) = Span::try_from_sqlparser_span(ident.span) {
spans.add_span(span);
}
}
}
}

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
let expr = Box::new(self.sql_to_expr(expr, input_schema, planner_context)?);

self.validate_single_column(
&sub_plan,
spans.clone(),
"IN subquery should only return one column",
"Select only one column in the IN subquery",
)?;

let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;

Ok(Expr::InSubquery(InSubquery::new(
expr,
Box::new(expr_obj),
Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: Some(spans),
},
negated,
)))
Expand All @@ -78,12 +100,88 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::ExprWithAlias { alias, .. } = item {
if let Some(span) = Span::try_from_sqlparser_span(alias.span) {
spans.add_span(span);
}
}
}
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);

self.validate_single_column(
&sub_plan,
spans.clone(),
"Scalar subquery should only return one column",
"Select only one column in the subquery",
)?;

Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: Some(spans),
}))
}

fn validate_single_column(
&self,
sub_plan: &LogicalPlan,
spans: Spans,
error_message: &str,
help_message: &str,
) -> Result<()> {
if sub_plan.schema().fields().len() > 1 {
let sub_schema = sub_plan.schema();
let fields = sub_schema.fields();
let field_names = sub_schema.field_names();

plan_err!("{}: {}", error_message, field_names.join(", ")).map_err(|err| {
let diagnostic = self.build_multi_column_diagnostic(
fields.len(),
&field_names,
spans,
error_message,
help_message,
);
err.with_diagnostic(diagnostic)
})
} else {
Ok(())
}
}

fn build_multi_column_diagnostic(
&self,
column_count: usize,
column_names: &[String],
spans: Spans,
error_message: &str,
help_message: &str,
) -> Diagnostic {
let primary_span = spans.first().clone();
let columns_info = format!(
"Found {} columns: {}",
column_count,
column_names.join(", ")
);

let mut diagnostic = Diagnostic::new_error(error_message, primary_span);
if spans.0.len() > 1 {
diagnostic.add_note(columns_info, primary_span);
for (i, span) in spans.iter().skip(1).enumerate() {
diagnostic
.add_note(format!("Extra column {}", i + 1), Some(span.clone()));
}
} else {
diagnostic.add_note(columns_info, primary_span);
}

diagnostic.add_help(help_message, None);
diagnostic
}
}
2 changes: 2 additions & 0 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
LogicalPlan::Subquery(Subquery {
subquery: input,
outer_ref_columns,
spans: None,
}),
alias,
)
}
plan => Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(plan),
outer_ref_columns,
spans: None,
})),
}
}
Expand Down
Loading