Skip to content
Merged
7 changes: 6 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{
parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month,
};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference};
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use sqlparser::ast::NullTreatment;
Expand Down Expand Up @@ -252,6 +252,7 @@ pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
subquery: Subquery {
subquery,
outer_ref_columns,
spans: Spans::new(),
},
negated: false,
})
Expand All @@ -264,6 +265,7 @@ pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
subquery: Subquery {
subquery,
outer_ref_columns,
spans: Spans::new(),
},
negated: true,
})
Expand All @@ -277,6 +279,7 @@ pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Subquery {
subquery,
outer_ref_columns,
spans: Spans::new(),
},
false,
))
Expand All @@ -290,6 +293,7 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
Subquery {
subquery,
outer_ref_columns,
spans: Spans::new(),
},
true,
))
Expand All @@ -301,6 +305,7 @@ pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
spans: Spans::new(),
})
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema,
Result, TableReference,
Result, Spans, TableReference,
};
use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
use datafusion_functions_window_common::field::WindowUDFFieldArgs;
Expand Down Expand Up @@ -608,6 +608,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
Ok(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: Spans::new(),
})
}

Expand Down
10 changes: 8 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, TableReference,
UnnestOptions,
};
use indexmap::IndexSet;
Expand Down Expand Up @@ -940,14 +940,17 @@ impl LogicalPlan {
}))
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
outer_ref_columns,
spans,
..
}) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
let subquery = LogicalPlanBuilder::from(input).build()?;
Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(subquery),
outer_ref_columns: outer_ref_columns.clone(),
spans: spans.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
Expand Down Expand Up @@ -3617,6 +3620,8 @@ pub struct Subquery {
pub subquery: Arc<LogicalPlan>,
/// The outer references used in the subquery
pub outer_ref_columns: Vec<Expr>,
/// Span information for subquery projection columns
pub spans: Spans,
}

impl Normalizeable for Subquery {
Expand Down Expand Up @@ -3651,6 +3656,7 @@ impl Subquery {
Subquery {
subquery: plan,
outer_ref_columns: self.outer_ref_columns.clone(),
spans: Spans::new(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ impl TreeNode for LogicalPlan {
LogicalPlan::Subquery(Subquery {
subquery,
outer_ref_columns,
spans,
}) => subquery.map_elements(f)?.update_data(|subquery| {
LogicalPlan::Subquery(Subquery {
subquery,
outer_ref_columns,
spans,
})
}),
LogicalPlan::SubqueryAlias(SubqueryAlias {
Expand Down
9 changes: 8 additions & 1 deletion datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
spans,
}) => {
let new_plan =
analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data;
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns,
spans,
})))
}
Expr::Exists(Exists { subquery, negated }) => {
Expand All @@ -329,6 +331,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
subquery: Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: subquery.spans,
},
negated,
})))
Expand All @@ -352,6 +355,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
let new_subquery = Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
spans: subquery.spans,
};
Ok(Transformed::yes(Expr::InSubquery(InSubquery::new(
Box::new(expr.cast_to(&common_type, self.schema)?),
Expand Down Expand Up @@ -1049,7 +1053,7 @@ mod test {
use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue, Spans};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort};
use datafusion_expr::test::function_stub::avg_udaf;
Expand Down Expand Up @@ -2089,6 +2093,7 @@ mod test {
Subquery {
subquery: empty_int32,
outer_ref_columns: vec![],
spans: Spans::new(),
},
false,
));
Expand All @@ -2114,6 +2119,7 @@ mod test {
Subquery {
subquery: empty_int64,
outer_ref_columns: vec![],
spans: Spans::new(),
},
false,
));
Expand All @@ -2138,6 +2144,7 @@ mod test {
Subquery {
subquery: empty_inside,
outer_ref_columns: vec![],
spans: Spans::new(),
},
false,
));
Expand Down
96 changes: 89 additions & 7 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
use datafusion_expr::{Expr, Subquery};
use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::{Expr, LogicalPlan, Subquery};
use sqlparser::ast::Expr as SQLExpr;
use sqlparser::ast::Query;
use sqlparser::ast::{Query, SelectItem, SetExpr};
use std::sync::Arc;

impl<S: ContextProvider> SqlToRel<'_, S> {
Expand All @@ -41,6 +40,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
subquery: Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: Spans::new(),
},
negated,
}))
Expand All @@ -56,15 +56,37 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item {
if let Some(span) = Span::try_from_sqlparser_span(ident.span) {
spans.add_span(span);
}
}
}
}

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
let expr = Box::new(self.sql_to_expr(expr, input_schema, planner_context)?);

self.validate_single_column(
&sub_plan,
spans.clone(),
"Too many columns! The subquery should only return one column",
"Select only one column in the subquery",
)?;

let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;

Ok(Expr::InSubquery(InSubquery::new(
expr,
Box::new(expr_obj),
Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: spans,
},
negated,
)))
Expand All @@ -78,12 +100,72 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
if let SelectItem::ExprWithAlias { alias, .. } = item {
if let Some(span) = Span::try_from_sqlparser_span(alias.span) {
spans.add_span(span);
}
}
}
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);

self.validate_single_column(
&sub_plan,
spans.clone(),
"Too many columns! The subquery should only return one column",
"Select only one column in the subquery",
)?;

Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
spans: spans,
}))
}

fn validate_single_column(
&self,
sub_plan: &LogicalPlan,
spans: Spans,
error_message: &str,
help_message: &str,
) -> Result<()> {
if sub_plan.schema().fields().len() > 1 {
let sub_schema = sub_plan.schema();
let field_names = sub_schema.field_names();

plan_err!("{}: {}", error_message, field_names.join(", ")).map_err(|err| {
let diagnostic = self.build_multi_column_diagnostic(
spans,
error_message,
help_message,
);
err.with_diagnostic(diagnostic)
})
} else {
Ok(())
}
}

fn build_multi_column_diagnostic(
&self,
spans: Spans,
error_message: &str,
help_message: &str,
) -> Diagnostic {
let full_span = Span::union_iter(spans.0.iter().cloned());
let mut diagnostic = Diagnostic::new_error(error_message, full_span);

for (i, span) in spans.iter().skip(1).enumerate() {
diagnostic.add_note(format!("Extra column {}", i + 1), Some(span.clone()));
}

diagnostic.add_help(help_message, None);
diagnostic
}
}
4 changes: 3 additions & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel};

use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, TableReference,
not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference,
};
use datafusion_expr::builder::subquery_alias;
use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder};
Expand Down Expand Up @@ -211,13 +211,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
LogicalPlan::Subquery(Subquery {
subquery: input,
outer_ref_columns,
spans: Spans::new(),
}),
alias,
)
}
plan => Ok(LogicalPlan::Subquery(Subquery {
subquery: Arc::new(plan),
outer_ref_columns,
spans: Spans::new(),
})),
}
}
Expand Down
Loading
Loading