Skip to content

Commit e39ffa6

Browse files
authored
extract struct expressions to folders based on spark grouping (#1216)
1 parent 5f1e998 commit e39ffa6

File tree

4 files changed

+152
-100
lines changed

4 files changed

+152
-100
lines changed

native/spark-expr/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub use schema_adapter::SparkSchemaAdapterFactory;
4747
pub mod spark_hash;
4848
mod stddev;
4949
pub use stddev::Stddev;
50-
mod structs;
50+
mod struct_funcs;
5151
mod sum_decimal;
5252
pub use sum_decimal::SumDecimal;
5353
mod negative;
@@ -72,7 +72,8 @@ pub use error::{SparkError, SparkResult};
7272
pub use if_expr::IfExpr;
7373
pub use list::{ArrayInsert, GetArrayStructFields, ListExtract};
7474
pub use regexp::RLike;
75-
pub use structs::{CreateNamedStruct, GetStructField};
75+
pub use struct_funcs::*;
76+
7677
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr};
7778
pub use to_json::ToJson;
7879

native/spark-expr/src/structs.rs renamed to native/spark-expr/src/struct_funcs/create_named_struct.rs

Lines changed: 2 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
// under the License.
1717

1818
use arrow::record_batch::RecordBatch;
19-
use arrow_array::{Array, StructArray};
19+
use arrow_array::StructArray;
2020
use arrow_schema::{DataType, Field, Schema};
2121
use datafusion::logical_expr::ColumnarValue;
22-
use datafusion_common::{DataFusionError, Result as DataFusionResult, ScalarValue};
22+
use datafusion_common::Result as DataFusionResult;
2323
use datafusion_physical_expr::PhysicalExpr;
2424
use std::{
2525
any::Any,
@@ -106,102 +106,6 @@ impl Display for CreateNamedStruct {
106106
}
107107
}
108108

109-
#[derive(Debug, Eq)]
110-
pub struct GetStructField {
111-
child: Arc<dyn PhysicalExpr>,
112-
ordinal: usize,
113-
}
114-
115-
impl Hash for GetStructField {
116-
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
117-
self.child.hash(state);
118-
self.ordinal.hash(state);
119-
}
120-
}
121-
impl PartialEq for GetStructField {
122-
fn eq(&self, other: &Self) -> bool {
123-
self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
124-
}
125-
}
126-
127-
impl GetStructField {
128-
pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
129-
Self { child, ordinal }
130-
}
131-
132-
fn child_field(&self, input_schema: &Schema) -> DataFusionResult<Arc<Field>> {
133-
match self.child.data_type(input_schema)? {
134-
DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
135-
data_type => Err(DataFusionError::Plan(format!(
136-
"Expect struct field, got {:?}",
137-
data_type
138-
))),
139-
}
140-
}
141-
}
142-
143-
impl PhysicalExpr for GetStructField {
144-
fn as_any(&self) -> &dyn Any {
145-
self
146-
}
147-
148-
fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
149-
Ok(self.child_field(input_schema)?.data_type().clone())
150-
}
151-
152-
fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
153-
Ok(self.child_field(input_schema)?.is_nullable())
154-
}
155-
156-
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
157-
let child_value = self.child.evaluate(batch)?;
158-
159-
match child_value {
160-
ColumnarValue::Array(array) => {
161-
let struct_array = array
162-
.as_any()
163-
.downcast_ref::<StructArray>()
164-
.expect("A struct is expected");
165-
166-
Ok(ColumnarValue::Array(Arc::clone(
167-
struct_array.column(self.ordinal),
168-
)))
169-
}
170-
ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => Ok(ColumnarValue::Array(
171-
Arc::clone(struct_array.column(self.ordinal)),
172-
)),
173-
value => Err(DataFusionError::Execution(format!(
174-
"Expected a struct array, got {:?}",
175-
value
176-
))),
177-
}
178-
}
179-
180-
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
181-
vec![&self.child]
182-
}
183-
184-
fn with_new_children(
185-
self: Arc<Self>,
186-
children: Vec<Arc<dyn PhysicalExpr>>,
187-
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
188-
Ok(Arc::new(GetStructField::new(
189-
Arc::clone(&children[0]),
190-
self.ordinal,
191-
)))
192-
}
193-
}
194-
195-
impl Display for GetStructField {
196-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
197-
write!(
198-
f,
199-
"GetStructField [child: {:?}, ordinal: {:?}]",
200-
self.child, self.ordinal
201-
)
202-
}
203-
}
204-
205109
#[cfg(test)]
206110
mod test {
207111
use super::CreateNamedStruct;
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::record_batch::RecordBatch;
19+
use arrow_array::{Array, StructArray};
20+
use arrow_schema::{DataType, Field, Schema};
21+
use datafusion::logical_expr::ColumnarValue;
22+
use datafusion_common::{DataFusionError, Result as DataFusionResult, ScalarValue};
23+
use datafusion_physical_expr::PhysicalExpr;
24+
use std::{
25+
any::Any,
26+
fmt::{Display, Formatter},
27+
hash::Hash,
28+
sync::Arc,
29+
};
30+
31+
#[derive(Debug, Eq)]
32+
pub struct GetStructField {
33+
child: Arc<dyn PhysicalExpr>,
34+
ordinal: usize,
35+
}
36+
37+
impl Hash for GetStructField {
38+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
39+
self.child.hash(state);
40+
self.ordinal.hash(state);
41+
}
42+
}
43+
impl PartialEq for GetStructField {
44+
fn eq(&self, other: &Self) -> bool {
45+
self.child.eq(&other.child) && self.ordinal.eq(&other.ordinal)
46+
}
47+
}
48+
49+
impl GetStructField {
50+
pub fn new(child: Arc<dyn PhysicalExpr>, ordinal: usize) -> Self {
51+
Self { child, ordinal }
52+
}
53+
54+
fn child_field(&self, input_schema: &Schema) -> DataFusionResult<Arc<Field>> {
55+
match self.child.data_type(input_schema)? {
56+
DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])),
57+
data_type => Err(DataFusionError::Plan(format!(
58+
"Expect struct field, got {:?}",
59+
data_type
60+
))),
61+
}
62+
}
63+
}
64+
65+
impl PhysicalExpr for GetStructField {
66+
fn as_any(&self) -> &dyn Any {
67+
self
68+
}
69+
70+
fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> {
71+
Ok(self.child_field(input_schema)?.data_type().clone())
72+
}
73+
74+
fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> {
75+
Ok(self.child_field(input_schema)?.is_nullable())
76+
}
77+
78+
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
79+
let child_value = self.child.evaluate(batch)?;
80+
81+
match child_value {
82+
ColumnarValue::Array(array) => {
83+
let struct_array = array
84+
.as_any()
85+
.downcast_ref::<StructArray>()
86+
.expect("A struct is expected");
87+
88+
Ok(ColumnarValue::Array(Arc::clone(
89+
struct_array.column(self.ordinal),
90+
)))
91+
}
92+
ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => Ok(ColumnarValue::Array(
93+
Arc::clone(struct_array.column(self.ordinal)),
94+
)),
95+
value => Err(DataFusionError::Execution(format!(
96+
"Expected a struct array, got {:?}",
97+
value
98+
))),
99+
}
100+
}
101+
102+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
103+
vec![&self.child]
104+
}
105+
106+
fn with_new_children(
107+
self: Arc<Self>,
108+
children: Vec<Arc<dyn PhysicalExpr>>,
109+
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
110+
Ok(Arc::new(GetStructField::new(
111+
Arc::clone(&children[0]),
112+
self.ordinal,
113+
)))
114+
}
115+
}
116+
117+
impl Display for GetStructField {
118+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
119+
write!(
120+
f,
121+
"GetStructField [child: {:?}, ordinal: {:?}]",
122+
self.child, self.ordinal
123+
)
124+
}
125+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
mod create_named_struct;
19+
mod get_struct_field;
20+
21+
pub use create_named_struct::CreateNamedStruct;
22+
pub use get_struct_field::GetStructField;

0 commit comments

Comments
 (0)