diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a8e7fd76d037..c08738c183e4 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -37,7 +37,7 @@ use arrow::compute::kernels::cast_utils::{ parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month, }; use arrow::datatypes::{DataType, Field}; -use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference}; +use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference}; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use sqlparser::ast::NullTreatment; @@ -252,6 +252,7 @@ pub fn exists(subquery: Arc) -> Expr { subquery: Subquery { subquery, outer_ref_columns, + spans: Spans::new(), }, negated: false, }) @@ -264,6 +265,7 @@ pub fn not_exists(subquery: Arc) -> Expr { subquery: Subquery { subquery, outer_ref_columns, + spans: Spans::new(), }, negated: true, }) @@ -277,6 +279,7 @@ pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { Subquery { subquery, outer_ref_columns, + spans: Spans::new(), }, false, )) @@ -290,6 +293,7 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { Subquery { subquery, outer_ref_columns, + spans: Spans::new(), }, true, )) @@ -301,6 +305,7 @@ pub fn scalar_subquery(subquery: Arc) -> Expr { Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + spans: Spans::new(), }) } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 318640282fe1..a349c83a4934 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -30,7 +30,7 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field}; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema, - Result, TableReference, + Result, Spans, TableReference, }; use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer; use datafusion_functions_window_common::field::WindowUDFFieldArgs; @@ -617,6 +617,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result { self.assert_no_expressions(expr)?; let input = self.only_input(inputs)?; @@ -947,6 +949,7 @@ impl LogicalPlan { Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(subquery), outer_ref_columns: outer_ref_columns.clone(), + spans: spans.clone(), })) } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { @@ -3615,6 +3618,8 @@ pub struct Subquery { pub subquery: Arc, /// The outer references used in the subquery pub outer_ref_columns: Vec, + /// Span information for subquery projection columns + pub spans: Spans, } impl Normalizeable for Subquery { @@ -3649,6 +3654,7 @@ impl Subquery { Subquery { subquery: plan, outer_ref_columns: self.outer_ref_columns.clone(), + spans: Spans::new(), } } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index dfc18c74c70a..87b49a3a2e1b 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -159,10 +159,12 @@ impl TreeNode for LogicalPlan { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + spans, }) => subquery.map_elements(f)?.update_data(|subquery| { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, + spans, }) }), LogicalPlan::SubqueryAlias(SubqueryAlias { diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 538ef98ac7be..c9c0b7a3b789 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -311,12 +311,14 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { Expr::ScalarSubquery(Subquery { subquery, outer_ref_columns, + spans, }) => { let new_plan = analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data; Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { subquery: Arc::new(new_plan), outer_ref_columns, + spans, }))) } Expr::Exists(Exists { subquery, negated }) => { @@ -329,6 +331,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + spans: subquery.spans, }, negated, }))) @@ -352,6 +355,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { let new_subquery = Subquery { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, + spans: subquery.spans, }; Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( Box::new(expr.cast_to(&common_type, self.schema)?), @@ -1049,7 +1053,7 @@ mod test { use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TransformedResult, TreeNode}; - use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue}; + use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue, Spans}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort}; use datafusion_expr::test::function_stub::avg_udaf; @@ -2089,6 +2093,7 @@ mod test { Subquery { subquery: empty_int32, outer_ref_columns: vec![], + spans: Spans::new(), }, false, )); @@ -2114,6 +2119,7 @@ mod test { Subquery { subquery: empty_int64, outer_ref_columns: vec![], + spans: Spans::new(), }, false, )); @@ -2138,6 +2144,7 @@ mod test { Subquery { subquery: empty_inside, outer_ref_columns: vec![], + spans: Spans::new(), }, false, )); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 3a8aef267be5..33f10400d341 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -731,9 +731,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "Invalid (non-executable) plan after Analyzer\ - \ncaused by\ - \nError during planning: Scalar subquery should only return one column"; + let expected = "Error during planning: Scalar subquery should only return one column, but found 4: orders.o_orderkey, orders.o_custkey, orders.o_orderstatus, orders.o_totalprice"; assert_analyzer_check_err(vec![], plan, expected); Ok(()) } @@ -793,9 +791,7 @@ mod tests { .project(vec![col("customer.c_custkey")])? .build()?; - let expected = "Invalid (non-executable) plan after Analyzer\ - \ncaused by\ - \nError during planning: Scalar subquery should only return one column"; + let expected = "Error during planning: Scalar subquery should only return one column, but found 2: orders.o_custkey, orders.o_orderkey"; assert_analyzer_check_err(vec![], plan, expected); Ok(()) } diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 481f024787fe..225c5d74c2ab 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -16,12 +16,11 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{DFSchema, Result}; -use datafusion_expr::expr::Exists; -use datafusion_expr::expr::InSubquery; -use datafusion_expr::{Expr, Subquery}; +use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans}; +use datafusion_expr::expr::{Exists, InSubquery}; +use datafusion_expr::{Expr, LogicalPlan, Subquery}; use sqlparser::ast::Expr as SQLExpr; -use sqlparser::ast::Query; +use sqlparser::ast::{Query, SelectItem, SetExpr}; use std::sync::Arc; impl SqlToRel<'_, S> { @@ -41,6 +40,7 @@ impl SqlToRel<'_, S> { subquery: Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + spans: Spans::new(), }, negated, })) @@ -56,15 +56,37 @@ impl SqlToRel<'_, S> { ) -> Result { let old_outer_query_schema = planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + + let mut spans = Spans::new(); + if let SetExpr::Select(select) = subquery.body.as_ref() { + for item in &select.projection { + if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item { + if let Some(span) = Span::try_from_sqlparser_span(ident.span) { + spans.add_span(span); + } + } + } + } + let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.set_outer_query_schema(old_outer_query_schema); - let expr = Box::new(self.sql_to_expr(expr, input_schema, planner_context)?); + + self.validate_single_column( + &sub_plan, + spans.clone(), + "Too many columns! The subquery should only return one column", + "Select only one column in the subquery", + )?; + + let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?; + Ok(Expr::InSubquery(InSubquery::new( - expr, + Box::new(expr_obj), Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + spans, }, negated, ))) @@ -78,12 +100,72 @@ impl SqlToRel<'_, S> { ) -> Result { let old_outer_query_schema = planner_context.set_outer_query_schema(Some(input_schema.clone().into())); + let mut spans = Spans::new(); + if let SetExpr::Select(select) = subquery.body.as_ref() { + for item in &select.projection { + if let SelectItem::ExprWithAlias { alias, .. } = item { + if let Some(span) = Span::try_from_sqlparser_span(alias.span) { + spans.add_span(span); + } + } + } + } let sub_plan = self.query_to_plan(subquery, planner_context)?; let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.set_outer_query_schema(old_outer_query_schema); + + self.validate_single_column( + &sub_plan, + spans.clone(), + "Too many columns! The subquery should only return one column", + "Select only one column in the subquery", + )?; + Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(sub_plan), outer_ref_columns, + spans, })) } + + fn validate_single_column( + &self, + sub_plan: &LogicalPlan, + spans: Spans, + error_message: &str, + help_message: &str, + ) -> Result<()> { + if sub_plan.schema().fields().len() > 1 { + let sub_schema = sub_plan.schema(); + let field_names = sub_schema.field_names(); + + plan_err!("{}: {}", error_message, field_names.join(", ")).map_err(|err| { + let diagnostic = self.build_multi_column_diagnostic( + spans, + error_message, + help_message, + ); + err.with_diagnostic(diagnostic) + }) + } else { + Ok(()) + } + } + + fn build_multi_column_diagnostic( + &self, + spans: Spans, + error_message: &str, + help_message: &str, + ) -> Diagnostic { + let full_span = Span::union_iter(spans.0.iter().cloned()); + let mut diagnostic = Diagnostic::new_error(error_message, full_span); + + for (i, span) in spans.iter().skip(1).enumerate() { + diagnostic.add_note(format!("Extra column {}", i + 1), Some(*span)); + } + + diagnostic.add_help(help_message, None); + diagnostic + } } diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 800dd151a124..8078261d9152 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -21,7 +21,7 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, TableReference, + not_impl_err, plan_err, DFSchema, Diagnostic, Result, Span, Spans, TableReference, }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::{expr::Unnest, Expr, LogicalPlan, LogicalPlanBuilder}; @@ -211,6 +211,7 @@ impl SqlToRel<'_, S> { LogicalPlan::Subquery(Subquery { subquery: input, outer_ref_columns, + spans: Spans::new(), }), alias, ) @@ -218,6 +219,7 @@ impl SqlToRel<'_, S> { plan => Ok(LogicalPlan::Subquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + spans: Spans::new(), })), } } diff --git a/datafusion/sql/tests/cases/diagnostic.rs b/datafusion/sql/tests/cases/diagnostic.rs index d70484f718c8..5481f046e70a 100644 --- a/datafusion/sql/tests/cases/diagnostic.rs +++ b/datafusion/sql/tests/cases/diagnostic.rs @@ -286,3 +286,71 @@ fn test_invalid_function() -> Result<()> { assert_eq!(diag.span, Some(spans["whole"])); Ok(()) } +#[test] +fn test_scalar_subquery_multiple_columns() -> Result<(), Box> { + let query = "SELECT (SELECT 1 AS /*x*/x/*x*/, 2 AS /*y*/y/*y*/) AS col"; + let spans = get_spans(query); + let diag = do_query(query); + + assert_eq!( + diag.message, + "Too many columns! The subquery should only return one column" + ); + + let expected_span = Some(Span { + start: spans["x"].start, + end: spans["y"].end, + }); + assert_eq!(diag.span, expected_span); + assert_eq!( + diag.notes + .iter() + .map(|n| (n.message.as_str(), n.span)) + .collect::>(), + vec![("Extra column 1", Some(spans["y"]))] + ); + assert_eq!( + diag.helps + .iter() + .map(|h| h.message.as_str()) + .collect::>(), + vec!["Select only one column in the subquery"] + ); + + Ok(()) +} + +#[test] +fn test_in_subquery_multiple_columns() -> Result<(), Box> { + // This query uses an IN subquery with multiple columns - this should trigger an error + let query = "SELECT * FROM person WHERE id IN (SELECT /*id*/id/*id*/, /*first*/first_name/*first*/ FROM person)"; + let spans = get_spans(query); + let diag = do_query(query); + + assert_eq!( + diag.message, + "Too many columns! The subquery should only return one column" + ); + + let expected_span = Some(Span { + start: spans["id"].start, + end: spans["first"].end, + }); + assert_eq!(diag.span, expected_span); + assert_eq!( + diag.notes + .iter() + .map(|n| (n.message.as_str(), n.span)) + .collect::>(), + vec![("Extra column 1", Some(spans["first"]))] + ); + assert_eq!( + diag.helps + .iter() + .map(|h| h.message.as_str()) + .collect::>(), + vec!["Select only one column in the subquery"] + ); + + Ok(()) +} diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 5a722c2288ac..4c1565c7f033 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -438,7 +438,7 @@ logical_plan 08)----------TableScan: t1 projection=[t1_int] #invalid_scalar_subquery -statement error DataFusion error: Invalid \(non-executable\) plan after Analyzer\ncaused by\nError during planning: Scalar subquery should only return one column, but found 2: t2.t2_id, t2.t2_name +statement error DataFusion error: Error during planning: Too many columns! The subquery should only return one column: t2.t2_id, t2.t2_name SELECT t1_id, t1_name, t1_int, (select t2_id, t2_name FROM t2 WHERE t2.t2_id = t1.t1_int) FROM t1 #subquery_not_allowed diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 60e71ca39d33..24eb23ded5a7 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -24,7 +24,8 @@ use datafusion::arrow::datatypes::{ }; use datafusion::common::{ not_impl_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, - substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, TableReference, + substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, Spans, + TableReference, }; use datafusion::datasource::provider_as_source; use datafusion::logical_expr::expr::{Exists, InSubquery, Sort, WindowFunctionParams}; @@ -2280,6 +2281,7 @@ pub async fn from_subquery( subquery: Subquery { subquery: Arc::new(haystack_expr), outer_ref_columns: outer_refs, + spans: Spans::new(), }, negated: false, })) @@ -2298,6 +2300,7 @@ pub async fn from_subquery( Ok(Expr::ScalarSubquery(Subquery { subquery: Arc::new(plan), outer_ref_columns, + spans: Spans::new(), })) } SubqueryType::SetPredicate(predicate) => { @@ -2313,6 +2316,7 @@ pub async fn from_subquery( Subquery { subquery: Arc::new(plan), outer_ref_columns, + spans: Spans::new(), }, false, )))