Skip to content

Commit 4010a55

Browse files
Add support for nested lists in substrait consumer (apache#20953)
## Rationale for this change Adds support for nested array expressions to the substrait consumer. Defined in [algebra.proto.](https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L1162) ## What changes are included in this PR? Implements the previously unimplemented `consume_nested` for `NestedType::List`. ## Are these changes tested? Yes, unit tests match the testing pattern for substrait literals in `consumer/expr/literal.rs`. Snapshot test is added for `make_array()` path. ## Are there any user-facing changes? User's will now be able to send nested list expressions. This change is purely additive all previous consumable Substrait plans will continue to work.
1 parent 4ae19eb commit 4010a55

File tree

5 files changed

+257
-4
lines changed

5 files changed

+257
-4
lines changed

datafusion/substrait/src/logical_plan/consumer/expr/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod field_reference;
2121
mod function_arguments;
2222
mod if_then;
2323
mod literal;
24+
mod nested;
2425
mod scalar_function;
2526
mod singular_or_list;
2627
mod subquery;
@@ -32,6 +33,7 @@ pub use field_reference::*;
3233
pub use function_arguments::*;
3334
pub use if_then::*;
3435
pub use literal::*;
36+
pub use nested::*;
3537
pub use scalar_function::*;
3638
pub use singular_or_list::*;
3739
pub use subquery::*;
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use datafusion::common::{DFSchema, not_impl_err, substrait_err};
20+
use datafusion::execution::FunctionRegistry;
21+
use datafusion::logical_expr::Expr;
22+
use substrait::proto::expression::Nested;
23+
use substrait::proto::expression::nested::NestedType;
24+
25+
/// Converts a Substrait [Nested] expression into a DataFusion [Expr].
26+
///
27+
/// Substrait Nested expressions represent complex type constructors (list, struct, map)
28+
/// where elements are full expressions rather than just literals. This is used by
29+
/// producers that emit `Nested { list: ... }` for array construction, as opposed to
30+
/// `Literal { list: ... }` which only supports scalar values.
31+
pub async fn from_nested(
32+
consumer: &impl SubstraitConsumer,
33+
nested: &Nested,
34+
input_schema: &DFSchema,
35+
) -> datafusion::common::Result<Expr> {
36+
let Some(nested_type) = &nested.nested_type else {
37+
return substrait_err!("Nested expression requires a nested_type");
38+
};
39+
40+
match nested_type {
41+
NestedType::List(list) => {
42+
if list.values.is_empty() {
43+
return substrait_err!(
44+
"Empty Nested lists are not supported; use Literal.empty_list instead"
45+
);
46+
}
47+
48+
let mut args = Vec::with_capacity(list.values.len());
49+
for value in &list.values {
50+
args.push(consumer.consume_expression(value, input_schema).await?);
51+
}
52+
53+
let make_array_udf = consumer.get_function_registry().udf("make_array")?;
54+
Ok(Expr::ScalarFunction(
55+
datafusion::logical_expr::expr::ScalarFunction::new_udf(
56+
make_array_udf,
57+
args,
58+
),
59+
))
60+
}
61+
NestedType::Struct(_) => {
62+
not_impl_err!("Nested struct expressions are not yet supported")
63+
}
64+
NestedType::Map(_) => {
65+
not_impl_err!("Nested map expressions are not yet supported")
66+
}
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
use crate::logical_plan::consumer::utils::tests::test_consumer;
74+
use substrait::proto::expression::Literal;
75+
use substrait::proto::expression::nested::List;
76+
use substrait::proto::{self, Expression};
77+
78+
fn make_i64_literal(value: i64) -> Expression {
79+
Expression {
80+
rex_type: Some(proto::expression::RexType::Literal(Literal {
81+
nullable: false,
82+
type_variation_reference: 0,
83+
literal_type: Some(proto::expression::literal::LiteralType::I64(value)),
84+
})),
85+
}
86+
}
87+
88+
#[tokio::test]
89+
async fn nested_list_with_literals() -> datafusion::common::Result<()> {
90+
let consumer = test_consumer();
91+
let schema = DFSchema::empty();
92+
let nested = Nested {
93+
nullable: false,
94+
type_variation_reference: 0,
95+
nested_type: Some(NestedType::List(List {
96+
values: vec![
97+
make_i64_literal(1),
98+
make_i64_literal(2),
99+
make_i64_literal(3),
100+
],
101+
})),
102+
};
103+
104+
let expr = from_nested(&consumer, &nested, &schema).await?;
105+
assert_eq!(
106+
format!("{expr}"),
107+
"make_array(Int64(1), Int64(2), Int64(3))"
108+
);
109+
110+
Ok(())
111+
}
112+
113+
#[tokio::test]
114+
async fn nested_list_empty_rejected() -> datafusion::common::Result<()> {
115+
let consumer = test_consumer();
116+
let schema = DFSchema::empty();
117+
let nested = Nested {
118+
nullable: true,
119+
type_variation_reference: 0,
120+
nested_type: Some(NestedType::List(List { values: vec![] })),
121+
};
122+
123+
let result = from_nested(&consumer, &nested, &schema).await;
124+
assert!(result.is_err());
125+
assert!(
126+
result
127+
.unwrap_err()
128+
.to_string()
129+
.contains("Empty Nested lists are not supported")
130+
);
131+
132+
Ok(())
133+
}
134+
135+
#[tokio::test]
136+
async fn nested_missing_type() -> datafusion::common::Result<()> {
137+
let consumer = test_consumer();
138+
let schema = DFSchema::empty();
139+
let nested = Nested {
140+
nullable: false,
141+
type_variation_reference: 0,
142+
nested_type: None,
143+
};
144+
145+
let result = from_nested(&consumer, &nested, &schema).await;
146+
assert!(result.is_err());
147+
assert!(result.unwrap_err().to_string().contains("nested_type"));
148+
149+
Ok(())
150+
}
151+
}

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use super::{
1919
from_aggregate_rel, from_cast, from_cross_rel, from_exchange_rel, from_fetch_rel,
2020
from_field_reference, from_filter_rel, from_if_then, from_join_rel, from_literal,
21-
from_project_rel, from_read_rel, from_scalar_function, from_set_rel,
21+
from_nested, from_project_rel, from_read_rel, from_scalar_function, from_set_rel,
2222
from_singular_or_list, from_sort_rel, from_subquery, from_substrait_rel,
2323
from_substrait_rex, from_window_function,
2424
};
@@ -350,10 +350,10 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
350350

351351
async fn consume_nested(
352352
&self,
353-
_expr: &Nested,
354-
_input_schema: &DFSchema,
353+
expr: &Nested,
354+
input_schema: &DFSchema,
355355
) -> datafusion::common::Result<Expr> {
356-
not_impl_err!("Nested expression not supported")
356+
from_nested(self, expr, input_schema).await
357357
}
358358

359359
async fn consume_enum(

datafusion/substrait/tests/cases/logical_plans.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,27 @@ mod tests {
270270

271271
Ok(())
272272
}
273+
274+
#[tokio::test]
275+
async fn nested_list_expressions() -> Result<()> {
276+
// Tests that a Substrait Nested list expression containing non-literal
277+
// expressions (column references) uses the make_array UDF.
278+
let proto_plan =
279+
read_json("tests/testdata/test_plans/nested_list_expressions.substrait.json");
280+
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
281+
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
282+
283+
assert_snapshot!(
284+
plan,
285+
@r"
286+
Projection: make_array(DATA.a, DATA.b) AS my_list
287+
TableScan: DATA
288+
"
289+
);
290+
291+
// Trigger execution to ensure plan validity
292+
DataFrame::new(ctx.state(), plan).show().await?;
293+
294+
Ok(())
295+
}
273296
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"relations": [
3+
{
4+
"root": {
5+
"input": {
6+
"project": {
7+
"common": {
8+
"emit": {
9+
"outputMapping": [2]
10+
}
11+
},
12+
"input": {
13+
"read": {
14+
"common": {
15+
"direct": {}
16+
},
17+
"baseSchema": {
18+
"names": ["a", "b"],
19+
"struct": {
20+
"types": [
21+
{
22+
"i32": {
23+
"nullability": "NULLABILITY_NULLABLE"
24+
}
25+
},
26+
{
27+
"i32": {
28+
"nullability": "NULLABILITY_NULLABLE"
29+
}
30+
}
31+
],
32+
"nullability": "NULLABILITY_REQUIRED"
33+
}
34+
},
35+
"namedTable": {
36+
"names": ["DATA"]
37+
}
38+
}
39+
},
40+
"expressions": [
41+
{
42+
"nested": {
43+
"nullable": false,
44+
"list": {
45+
"values": [
46+
{
47+
"selection": {
48+
"directReference": {
49+
"structField": {
50+
"field": 0
51+
}
52+
},
53+
"rootReference": {}
54+
}
55+
},
56+
{
57+
"selection": {
58+
"directReference": {
59+
"structField": {
60+
"field": 1
61+
}
62+
},
63+
"rootReference": {}
64+
}
65+
}
66+
]
67+
}
68+
}
69+
}
70+
]
71+
}
72+
},
73+
"names": ["my_list"]
74+
}
75+
}
76+
]
77+
}

0 commit comments

Comments
 (0)