Skip to content

Commit 21a0d8d

Browse files
kumarUjjawalalamb
authored andcommitted
fix: sqllogictest cannot convert <subquery> to Substrait (apache#19739)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#16281 ## Rationale for this change The sqllogictest for the substrait was failing for subquery. ``` query failed: DataFusion error: This feature is not implemented: Cannot convert <subquery> to Substrait ``` <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - added support for `ScalarSubquery` and `Exists` expressions in the Substrait producer. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7698fdc commit 21a0d8d

File tree

7 files changed

+297
-52
lines changed

7 files changed

+297
-52
lines changed

datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ pub(crate) fn try_to_substrait_field_reference(
7676
}
7777
}
7878

79+
/// Convert an outer reference column to a Substrait field reference.
80+
/// Outer reference columns reference columns from an outer query scope in correlated subqueries.
81+
/// We convert them the same way as regular columns since the subquery plan will be
82+
/// reconstructed with the proper schema context during consumption.
83+
pub fn from_outer_reference_column(
84+
col: &Column,
85+
schema: &DFSchemaRef,
86+
) -> datafusion::common::Result<Expression> {
87+
// OuterReferenceColumn is converted similarly to a regular column reference.
88+
// The schema provided should be the schema context in which the outer reference
89+
// column appears. During Substrait round-trip, the consumer will reconstruct
90+
// the outer reference based on the subquery context.
91+
let index = schema.index_of_column(col)?;
92+
substrait_field_ref(index)
93+
}
94+
7995
#[cfg(test)]
8096
mod tests {
8197
use super::*;

datafusion/substrait/src/logical_plan/producer/expr/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,17 @@ pub fn to_substrait_rex(
139139
}
140140
Expr::WindowFunction(expr) => producer.handle_window_function(expr, schema),
141141
Expr::InList(expr) => producer.handle_in_list(expr, schema),
142-
Expr::Exists(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
142+
Expr::Exists(expr) => producer.handle_exists(expr, schema),
143143
Expr::InSubquery(expr) => producer.handle_in_subquery(expr, schema),
144144
Expr::SetComparison(expr) => producer.handle_set_comparison(expr, schema),
145-
Expr::ScalarSubquery(expr) => {
146-
not_impl_err!("Cannot convert {expr:?} to Substrait")
147-
}
145+
Expr::ScalarSubquery(expr) => producer.handle_scalar_subquery(expr, schema),
148146
#[expect(deprecated)]
149147
Expr::Wildcard { .. } => not_impl_err!("Cannot convert {expr:?} to Substrait"),
150148
Expr::GroupingSet(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
151149
Expr::Placeholder(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
152150
Expr::OuterReferenceColumn(_, _) => {
151+
// OuterReferenceColumn requires tracking outer query schema context for correlated
152+
// subqueries. This is a complex feature that is not yet implemented.
153153
not_impl_err!("Cannot convert {expr:?} to Substrait")
154154
}
155155
Expr::Unnest(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),

datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::producer::SubstraitProducer;
18+
use crate::logical_plan::producer::{SubstraitProducer, negate};
1919
use datafusion::common::DFSchemaRef;
2020
use datafusion::logical_expr::expr::InList;
21-
use substrait::proto::expression::{RexType, ScalarFunction, SingularOrList};
22-
use substrait::proto::function_argument::ArgType;
23-
use substrait::proto::{Expression, FunctionArgument};
21+
use substrait::proto::Expression;
22+
use substrait::proto::expression::{RexType, SingularOrList};
2423

2524
pub fn from_in_list(
2625
producer: &mut impl SubstraitProducer,
@@ -46,20 +45,7 @@ pub fn from_in_list(
4645
};
4746

4847
if *negated {
49-
let function_anchor = producer.register_function("not".to_string());
50-
51-
#[expect(deprecated)]
52-
Ok(Expression {
53-
rex_type: Some(RexType::ScalarFunction(ScalarFunction {
54-
function_reference: function_anchor,
55-
arguments: vec![FunctionArgument {
56-
arg_type: Some(ArgType::Value(substrait_or_list)),
57-
}],
58-
output_type: None,
59-
args: vec![],
60-
options: vec![],
61-
})),
62-
})
48+
Ok(negate(producer, substrait_or_list))
6349
} else {
6450
Ok(substrait_or_list)
6551
}

datafusion/substrait/src/logical_plan/producer/expr/subquery.rs

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::producer::SubstraitProducer;
18+
use crate::logical_plan::producer::{SubstraitProducer, negate};
1919
use datafusion::common::{DFSchemaRef, substrait_err};
20-
use datafusion::logical_expr::Operator;
21-
use datafusion::logical_expr::expr::{InSubquery, SetComparison, SetQuantifier};
22-
use substrait::proto::expression::subquery::InPredicate;
20+
use datafusion::logical_expr::expr::{Exists, InSubquery, SetComparison, SetQuantifier};
21+
use datafusion::logical_expr::{Operator, Subquery};
22+
use substrait::proto::Expression;
23+
use substrait::proto::expression::RexType;
2324
use substrait::proto::expression::subquery::set_comparison::{ComparisonOp, ReductionOp};
24-
use substrait::proto::expression::{RexType, ScalarFunction};
25-
use substrait::proto::function_argument::ArgType;
26-
use substrait::proto::{Expression, FunctionArgument};
25+
use substrait::proto::expression::subquery::{InPredicate, Scalar, SetPredicate};
2726

2827
pub fn from_in_subquery(
2928
producer: &mut impl SubstraitProducer,
@@ -54,20 +53,7 @@ pub fn from_in_subquery(
5453
))),
5554
};
5655
if *negated {
57-
let function_anchor = producer.register_function("not".to_string());
58-
59-
#[expect(deprecated)]
60-
Ok(Expression {
61-
rex_type: Some(RexType::ScalarFunction(ScalarFunction {
62-
function_reference: function_anchor,
63-
arguments: vec![FunctionArgument {
64-
arg_type: Some(ArgType::Value(substrait_subquery)),
65-
}],
66-
output_type: None,
67-
args: vec![],
68-
options: vec![],
69-
})),
70-
})
56+
Ok(negate(producer, substrait_subquery))
7157
} else {
7258
Ok(substrait_subquery)
7359
}
@@ -122,3 +108,56 @@ pub fn from_set_comparison(
122108
))),
123109
})
124110
}
111+
112+
/// Convert DataFusion ScalarSubquery to Substrait Scalar subquery type
113+
pub fn from_scalar_subquery(
114+
producer: &mut impl SubstraitProducer,
115+
subquery: &Subquery,
116+
_schema: &DFSchemaRef,
117+
) -> datafusion::common::Result<Expression> {
118+
let subquery_plan = producer.handle_plan(subquery.subquery.as_ref())?;
119+
120+
Ok(Expression {
121+
rex_type: Some(RexType::Subquery(Box::new(
122+
substrait::proto::expression::Subquery {
123+
subquery_type: Some(
124+
substrait::proto::expression::subquery::SubqueryType::Scalar(
125+
Box::new(Scalar {
126+
input: Some(subquery_plan),
127+
}),
128+
),
129+
),
130+
},
131+
))),
132+
})
133+
}
134+
135+
/// Convert DataFusion Exists expression to Substrait SetPredicate subquery type
136+
pub fn from_exists(
137+
producer: &mut impl SubstraitProducer,
138+
exists: &Exists,
139+
_schema: &DFSchemaRef,
140+
) -> datafusion::common::Result<Expression> {
141+
let subquery_plan = producer.handle_plan(exists.subquery.subquery.as_ref())?;
142+
143+
let substrait_exists = Expression {
144+
rex_type: Some(RexType::Subquery(Box::new(
145+
substrait::proto::expression::Subquery {
146+
subquery_type: Some(
147+
substrait::proto::expression::subquery::SubqueryType::SetPredicate(
148+
Box::new(SetPredicate {
149+
predicate_op: substrait::proto::expression::subquery::set_predicate::PredicateOp::Exists as i32,
150+
tuples: Some(subquery_plan),
151+
}),
152+
),
153+
),
154+
},
155+
))),
156+
};
157+
158+
if exists.negated {
159+
Ok(negate(producer, substrait_exists))
160+
} else {
161+
Ok(substrait_exists)
162+
}
163+
}

datafusion/substrait/src/logical_plan/producer/substrait_producer.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
use crate::extensions::Extensions;
1919
use crate::logical_plan::producer::{
2020
from_aggregate, from_aggregate_function, from_alias, from_between, from_binary_expr,
21-
from_case, from_cast, from_column, from_distinct, from_empty_relation, from_filter,
22-
from_in_list, from_in_subquery, from_join, from_like, from_limit, from_literal,
23-
from_projection, from_repartition, from_scalar_function, from_set_comparison,
24-
from_sort, from_subquery_alias, from_table_scan, from_try_cast, from_unary_expr,
25-
from_union, from_values, from_window, from_window_function, to_substrait_rel,
26-
to_substrait_rex,
21+
from_case, from_cast, from_column, from_distinct, from_empty_relation, from_exists,
22+
from_filter, from_in_list, from_in_subquery, from_join, from_like, from_limit,
23+
from_literal, from_projection, from_repartition, from_scalar_function,
24+
from_scalar_subquery, from_set_comparison, from_sort, from_subquery_alias,
25+
from_table_scan, from_try_cast, from_unary_expr, from_union, from_values,
26+
from_window, from_window_function, to_substrait_rel, to_substrait_rex,
2727
};
2828
use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err};
2929
use datafusion::execution::SessionState;
3030
use datafusion::execution::registry::SerializerRegistry;
31+
use datafusion::logical_expr::Subquery;
3132
use datafusion::logical_expr::expr::{
32-
Alias, InList, InSubquery, SetComparison, WindowFunction,
33+
Alias, Exists, InList, InSubquery, SetComparison, WindowFunction,
3334
};
3435
use datafusion::logical_expr::{
3536
Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, Extension,
@@ -372,6 +373,21 @@ pub trait SubstraitProducer: Send + Sync + Sized {
372373
) -> datafusion::common::Result<Expression> {
373374
from_set_comparison(self, set_comparison, schema)
374375
}
376+
fn handle_scalar_subquery(
377+
&mut self,
378+
subquery: &Subquery,
379+
schema: &DFSchemaRef,
380+
) -> datafusion::common::Result<Expression> {
381+
from_scalar_subquery(self, subquery, schema)
382+
}
383+
384+
fn handle_exists(
385+
&mut self,
386+
exists: &Exists,
387+
schema: &DFSchemaRef,
388+
) -> datafusion::common::Result<Expression> {
389+
from_exists(self, exists, schema)
390+
}
375391
}
376392

377393
pub struct DefaultSubstraitProducer<'a> {

datafusion/substrait/src/logical_plan/producer/utils.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::logical_plan::producer::SubstraitProducer;
1919
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
2020
use datafusion::common::{DFSchemaRef, plan_err};
2121
use datafusion::logical_expr::SortExpr;
22-
use substrait::proto::SortField;
2322
use substrait::proto::sort_field::{SortDirection, SortKind};
23+
use substrait::proto::{Expression, SortField};
2424

2525
// Substrait wants a list of all field names, including nested fields from structs,
2626
// also from within e.g. lists and maps. However, it does not want the list and map field names
@@ -85,3 +85,28 @@ pub(crate) fn to_substrait_precision(time_unit: &TimeUnit) -> i32 {
8585
TimeUnit::Nanosecond => 9,
8686
}
8787
}
88+
89+
/// Wraps an expression with a `not()` function.
90+
pub(crate) fn negate(
91+
producer: &mut impl SubstraitProducer,
92+
expr: Expression,
93+
) -> Expression {
94+
let function_anchor = producer.register_function("not".to_string());
95+
96+
#[expect(deprecated)]
97+
Expression {
98+
rex_type: Some(substrait::proto::expression::RexType::ScalarFunction(
99+
substrait::proto::expression::ScalarFunction {
100+
function_reference: function_anchor,
101+
arguments: vec![substrait::proto::FunctionArgument {
102+
arg_type: Some(substrait::proto::function_argument::ArgType::Value(
103+
expr,
104+
)),
105+
}],
106+
output_type: None,
107+
args: vec![],
108+
options: vec![],
109+
},
110+
)),
111+
}
112+
}

0 commit comments

Comments
 (0)