Skip to content

Commit 3935ba5

Browse files
committed
codex dump
1 parent 944f7f2 commit 3935ba5

File tree

12 files changed

+437
-545
lines changed

12 files changed

+437
-545
lines changed

datafusion-examples/examples/custom_data_source/custom_file_casts.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
3232
use datafusion::execution::context::SessionContext;
3333
use datafusion::execution::object_store::ObjectStoreUrl;
3434
use datafusion::parquet::arrow::ArrowWriter;
35-
use datafusion::physical_expr::expressions::{CastColumnExpr, CastExpr};
35+
use datafusion::physical_expr::expressions::CastExpr;
3636
use datafusion::physical_expr::PhysicalExpr;
3737
use datafusion::prelude::SessionConfig;
3838
use datafusion_physical_expr_adapter::{
@@ -192,19 +192,6 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
192192
);
193193
}
194194
}
195-
if let Some(cast) = expr.as_any().downcast_ref::<CastColumnExpr>() {
196-
let input_data_type =
197-
cast.expr().data_type(&self.physical_file_schema)?;
198-
let output_data_type = cast.data_type(&self.physical_file_schema)?;
199-
if !CastExpr::check_bigger_cast(
200-
cast.target_field().data_type(),
201-
&input_data_type,
202-
) {
203-
return not_impl_err!(
204-
"Unsupported CAST from {input_data_type} to {output_data_type}"
205-
);
206-
}
207-
}
208195
Ok(Transformed::no(expr))
209196
})
210197
.data()

datafusion/common/src/nested_struct.rs

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,55 @@ fn cast_struct_column(
5757
if let Some(source_struct) = source_col.as_any().downcast_ref::<StructArray>() {
5858
validate_struct_compatibility(source_struct.fields(), target_fields)?;
5959

60+
let positional = source_struct
61+
.fields()
62+
.iter()
63+
.enumerate()
64+
.all(|(idx, field)| field.name().as_str() == format!("c{idx}"))
65+
&& source_struct.num_columns() == target_fields.len();
66+
67+
let ordered_match = source_struct
68+
.fields()
69+
.iter()
70+
.map(|f| f.name())
71+
.eq(target_fields.iter().map(|f| f.name()));
72+
let source_names: Vec<&str> = source_struct
73+
.fields()
74+
.iter()
75+
.map(|f| f.name().as_str())
76+
.collect();
77+
let has_all_target_names = target_fields
78+
.iter()
79+
.all(|f| source_names.contains(&f.name().as_str()));
80+
let ordered_mismatch = !ordered_match;
81+
let use_positional_mapping = positional
82+
|| (source_struct.num_columns() == target_fields.len()
83+
&& ordered_mismatch
84+
&& has_all_target_names);
85+
6086
let mut fields: Vec<Arc<Field>> = Vec::with_capacity(target_fields.len());
6187
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(target_fields.len());
6288
let num_rows = source_col.len();
6389

64-
for target_child_field in target_fields {
90+
for (idx, target_child_field) in target_fields.iter().enumerate() {
6591
fields.push(Arc::clone(target_child_field));
66-
match source_struct.column_by_name(target_child_field.name()) {
92+
let maybe_source_child_col = if use_positional_mapping {
93+
Some(Arc::clone(source_struct.column(idx)))
94+
} else {
95+
source_struct
96+
.column_by_name(target_child_field.name())
97+
.map(Arc::clone)
98+
};
99+
match maybe_source_child_col {
67100
Some(source_child_col) => {
68101
let adapted_child =
69-
cast_column(source_child_col, target_child_field, cast_options)
102+
cast_column(&source_child_col, target_child_field, cast_options)
70103
.map_err(|e| {
71-
e.context(format!(
72-
"While casting struct field '{}'",
73-
target_child_field.name()
74-
))
75-
})?;
104+
e.context(format!(
105+
"While casting struct field '{}'",
106+
target_child_field.name()
107+
))
108+
})?;
76109
arrays.push(adapted_child);
77110
}
78111
None => {

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ impl ScalarUDFImpl for DummyUDF {
100100
}
101101
}
102102

103+
fn make_cast(name: &str, index: usize) -> Arc<CastExpr> {
104+
Arc::new(CastExpr::new(
105+
Arc::new(Column::new(name, index)),
106+
Arc::new(Field::new(name, DataType::Int32, true)),
107+
Arc::new(Field::new(name, DataType::Float32, true)),
108+
None,
109+
))
110+
}
111+
103112
#[test]
104113
fn test_update_matching_exprs() -> Result<()> {
105114
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
@@ -108,11 +117,7 @@ fn test_update_matching_exprs() -> Result<()> {
108117
Operator::Divide,
109118
Arc::new(Column::new("e", 5)),
110119
)),
111-
Arc::new(CastExpr::new(
112-
Arc::new(Column::new("a", 3)),
113-
DataType::Float32,
114-
None,
115-
)),
120+
make_cast("a", 3),
116121
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
117122
Arc::new(ScalarFunctionExpr::new(
118123
"scalar_expr",
@@ -174,11 +179,7 @@ fn test_update_matching_exprs() -> Result<()> {
174179
Operator::Divide,
175180
Arc::new(Column::new("e", 4)),
176181
)),
177-
Arc::new(CastExpr::new(
178-
Arc::new(Column::new("a", 0)),
179-
DataType::Float32,
180-
None,
181-
)),
182+
make_cast("a", 0),
182183
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))),
183184
Arc::new(ScalarFunctionExpr::new(
184185
"scalar_expr",
@@ -247,11 +248,7 @@ fn test_update_projected_exprs() -> Result<()> {
247248
Operator::Divide,
248249
Arc::new(Column::new("e", 5)),
249250
)),
250-
Arc::new(CastExpr::new(
251-
Arc::new(Column::new("a", 3)),
252-
DataType::Float32,
253-
None,
254-
)),
251+
make_cast("a", 3),
255252
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
256253
Arc::new(ScalarFunctionExpr::new(
257254
"scalar_expr",
@@ -313,11 +310,7 @@ fn test_update_projected_exprs() -> Result<()> {
313310
Operator::Divide,
314311
Arc::new(Column::new("e", 4)),
315312
)),
316-
Arc::new(CastExpr::new(
317-
Arc::new(Column::new("a", 0)),
318-
DataType::Float32,
319-
None,
320-
)),
313+
make_cast("a", 0),
321314
Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))),
322315
Arc::new(ScalarFunctionExpr::new(
323316
"scalar_expr",

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ use datafusion_common::{
2828
Result, ScalarValue,
2929
};
3030
use datafusion_functions::core::getfield::GetFieldFunc;
31-
use datafusion_physical_expr::expressions::CastColumnExpr;
3231
use datafusion_physical_expr::{
33-
expressions::{self, Column},
32+
expressions::{self, CastExpr, Column},
3433
ScalarFunctionExpr,
3534
};
3635
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -437,7 +436,7 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
437436
}
438437
}
439438

440-
let cast_expr = Arc::new(CastColumnExpr::new(
439+
let cast_expr = Arc::new(CastExpr::new(
441440
Arc::new(column),
442441
Arc::new(physical_field.clone()),
443442
Arc::new(logical_field.clone()),
@@ -496,7 +495,7 @@ mod tests {
496495
let result = adapter.rewrite(column_expr).unwrap();
497496

498497
// Should be wrapped in a cast expression
499-
assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
498+
assert!(result.as_any().downcast_ref::<CastExpr>().is_some());
500499
}
501500

502501
#[test]
@@ -527,7 +526,7 @@ mod tests {
527526
println!("Rewritten expression: {result}");
528527

529528
let expected = expressions::BinaryExpr::new(
530-
Arc::new(CastColumnExpr::new(
529+
Arc::new(CastExpr::new(
531530
Arc::new(Column::new("a", 0)),
532531
Arc::new(Field::new("a", DataType::Int32, false)),
533532
Arc::new(Field::new("a", DataType::Int64, false)),
@@ -611,7 +610,7 @@ mod tests {
611610

612611
let result = adapter.rewrite(column_expr).unwrap();
613612

614-
let expected = Arc::new(CastColumnExpr::new(
613+
let expected = Arc::new(CastExpr::new(
615614
Arc::new(Column::new("data", 0)),
616615
Arc::new(Field::new(
617616
"data",

datafusion/physical-expr/src/equivalence/properties/dependency.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ mod tests {
390390
convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr,
391391
};
392392
use crate::equivalence::{convert_to_sort_exprs, ProjectionMapping};
393-
use crate::expressions::{col, BinaryExpr, CastExpr, Column};
393+
use crate::expressions::{cast_with_options, col, BinaryExpr, Column};
394394
use crate::projection::tests::output_schema;
395395
use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr};
396396

@@ -941,7 +941,7 @@ mod tests {
941941
let col_a = col("a", schema.as_ref())?;
942942
let col_b = col("b", schema.as_ref())?;
943943
let col_c = col("c", schema.as_ref())?;
944-
let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _;
944+
let cast_c = cast_with_options(col_c, schema.as_ref(), DataType::Date32, None)?;
945945

946946
let cases = vec![
947947
TestCase {

0 commit comments

Comments
 (0)