Skip to content

Commit d1b50d7

Browse files
committed
physical-expr: Normalize CastColumnExpr with default format options
Simplify CastColumnExpr constructor to ensure format options are always present by using the FormatOptionsSlot trait. Add new_with_schema method for cases requiring full schema. Update schema_rewriter.rs to properly wrap Schema references in Arc. Add default format options fallback in serialization for CastColumnExpr to protobuf.
1 parent daf45f7 commit d1b50d7

File tree

4 files changed

+136
-21
lines changed

4 files changed

+136
-21
lines changed

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
471471
Arc::new(physical_field.clone()),
472472
Arc::new(logical_field.clone()),
473473
None,
474-
Arc::clone(&self.physical_file_schema),
474+
Arc::new(self.physical_file_schema.clone()),
475475
)?);
476476

477477
Ok(Transformed::yes(cast_expr))

datafusion/physical-expr/src/expressions/cast_column.rs

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ use arrow::{
2222
compute::{CastOptions, can_cast_types},
2323
datatypes::{DataType, FieldRef, Schema},
2424
record_batch::RecordBatch,
25+
util::display::FormatOptions as ArrowFormatOptions,
2526
};
2627
use datafusion_common::{
27-
Result, ScalarValue, format::DEFAULT_CAST_OPTIONS,
28+
Result, ScalarValue,
29+
format::DEFAULT_CAST_OPTIONS,
2830
nested_struct::{cast_column, validate_struct_compatibility},
2931
plan_err,
3032
};
@@ -82,37 +84,94 @@ impl Hash for CastColumnExpr {
8284
}
8385
}
8486

87+
trait FormatOptionsSlot {
88+
fn ensure_present(&mut self, default: ArrowFormatOptions<'static>);
89+
}
90+
91+
impl FormatOptionsSlot for ArrowFormatOptions<'static> {
92+
fn ensure_present(&mut self, _default: ArrowFormatOptions<'static>) {}
93+
}
94+
95+
impl FormatOptionsSlot for Option<ArrowFormatOptions<'static>> {
96+
fn ensure_present(&mut self, default: ArrowFormatOptions<'static>) {
97+
if self.is_none() {
98+
*self = Some(default);
99+
}
100+
}
101+
}
102+
85103
impl CastColumnExpr {
86104
/// Create a new [`CastColumnExpr`].
87105
///
88-
/// This constructor assumes `expr` is a column expression and validates it
89-
/// against a single-field schema derived from `input_field`. If the
90-
/// expression depends on a broader schema (for example, computed
91-
/// expressions), use [`Self::new_with_schema`] instead.
106+
/// This constructor ensures that format options are populated with defaults,
107+
/// normalizing the CastOptions for consistent behavior during serialization
108+
/// and evaluation.
92109
pub fn new(
93110
expr: Arc<dyn PhysicalExpr>,
94111
input_field: FieldRef,
95112
target_field: FieldRef,
96113
cast_options: Option<CastOptions<'static>>,
97114
) -> Result<Self> {
98-
let input_schema = Arc::new(Schema::new(vec![input_field.as_ref().clone()]));
99-
Self::new_with_schema(
115+
let mut cast_options = cast_options.unwrap_or(DEFAULT_CAST_OPTIONS);
116+
cast_options
117+
.format_options
118+
.ensure_present(DEFAULT_CAST_OPTIONS.format_options.clone());
119+
let input_schema = Schema::new(vec![input_field.as_ref().clone()]);
120+
let expr_data_type = expr.data_type(&input_schema)?;
121+
if input_field.data_type() != &expr_data_type {
122+
return plan_err!(
123+
"CastColumnExpr input field data type '{}' does not match expression data type '{}'",
124+
input_field.data_type(),
125+
expr_data_type
126+
);
127+
}
128+
129+
match (input_field.data_type(), target_field.data_type()) {
130+
(DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
131+
validate_struct_compatibility(source_fields, target_fields)?;
132+
}
133+
(_, DataType::Struct(_)) => {
134+
return plan_err!(
135+
"CastColumnExpr cannot cast non-struct input '{}' to struct target '{}'",
136+
input_field.data_type(),
137+
target_field.data_type()
138+
);
139+
}
140+
_ => {
141+
if !can_cast_types(input_field.data_type(), target_field.data_type()) {
142+
return plan_err!(
143+
"CastColumnExpr cannot cast input type '{}' to target type '{}'",
144+
input_field.data_type(),
145+
target_field.data_type()
146+
);
147+
}
148+
}
149+
}
150+
151+
Ok(Self {
100152
expr,
101153
input_field,
102154
target_field,
103155
cast_options,
104-
input_schema,
105-
)
156+
input_schema: Arc::new(input_schema),
157+
})
106158
}
107159

108-
/// Create a new [`CastColumnExpr`] using the full input schema.
160+
/// Create a new [`CastColumnExpr`] with a specific input schema.
161+
///
162+
/// This constructor is useful when the expression depends on multiple
163+
/// fields from a broader schema.
109164
pub fn new_with_schema(
110165
expr: Arc<dyn PhysicalExpr>,
111166
input_field: FieldRef,
112167
target_field: FieldRef,
113168
cast_options: Option<CastOptions<'static>>,
114169
input_schema: Arc<Schema>,
115170
) -> Result<Self> {
171+
let mut cast_options = cast_options.unwrap_or(DEFAULT_CAST_OPTIONS);
172+
cast_options
173+
.format_options
174+
.ensure_present(DEFAULT_CAST_OPTIONS.format_options.clone());
116175
let expr_data_type = expr.data_type(input_schema.as_ref())?;
117176
if input_field.data_type() != &expr_data_type {
118177
return plan_err!(
@@ -148,7 +207,7 @@ impl CastColumnExpr {
148207
expr,
149208
input_field,
150209
target_field,
151-
cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
210+
cast_options,
152211
input_schema,
153212
})
154213
}

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use arrow::datatypes::Schema;
2323
use arrow::ipc::writer::StreamWriter;
2424
use arrow::util::display::{DurationFormat, FormatOptions as ArrowFormatOptions};
2525
use datafusion_common::{
26-
DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
26+
DataFusionError, Result, format::DEFAULT_CAST_OPTIONS, internal_datafusion_err,
27+
internal_err, not_impl_err,
2728
};
2829
use datafusion_datasource::file_scan_config::FileScanConfig;
2930
use datafusion_datasource::file_sink_config::FileSink;
@@ -375,12 +376,10 @@ pub fn serialize_physical_expr(
375376
})
376377
} else if let Some(cast_column) = expr.downcast_ref::<CastColumnExpr>() {
377378
let cast_options = serialize_cast_options(cast_column.cast_options())?;
378-
let format_options = cast_options
379-
.format_options
380-
.clone()
381-
.ok_or_else(|| {
382-
internal_datafusion_err!("Missing format options for cast column")
383-
})?;
379+
let format_options = match cast_options.format_options.clone() {
380+
Some(format_options) => format_options,
381+
None => serialize_format_options(&DEFAULT_CAST_OPTIONS.format_options)?,
382+
};
384383
Ok(protobuf::PhysicalExprNode {
385384
expr_type: Some(protobuf::physical_expr_node::ExprType::CastColumn(
386385
Box::new(protobuf::PhysicalCastColumnNode {

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions;
108108
use datafusion_common::parsers::CompressionTypeVariant;
109109
use datafusion_common::stats::Precision;
110110
use datafusion_common::{
111-
DataFusionError, NullEquality, Result, UnnestOptions, internal_datafusion_err,
112-
internal_err, not_impl_err,
111+
DataFusionError, NullEquality, Result, UnnestOptions, format::DEFAULT_CAST_OPTIONS,
112+
internal_datafusion_err, internal_err, not_impl_err,
113113
};
114114
use datafusion_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
115115
use datafusion_expr::{
@@ -118,6 +118,20 @@ use datafusion_expr::{
118118
WindowFrame, WindowFrameBound, WindowUDF,
119119
};
120120
use datafusion_functions_aggregate::average::avg_udaf;
121+
122+
trait FormatOptionsSlot {
123+
fn clear(&mut self);
124+
}
125+
126+
impl FormatOptionsSlot for FormatOptions<'static> {
127+
fn clear(&mut self) {}
128+
}
129+
130+
impl FormatOptionsSlot for Option<FormatOptions<'static>> {
131+
fn clear(&mut self) {
132+
*self = None;
133+
}
134+
}
121135
use datafusion_functions_aggregate::nth_value::nth_value_udaf;
122136
use datafusion_functions_aggregate::string_agg::string_agg_udaf;
123137
use datafusion_proto::physical_plan::{
@@ -264,6 +278,49 @@ fn roundtrip_cast_column_expr() -> Result<()> {
264278
Ok(())
265279
}
266280

281+
#[test]
282+
fn roundtrip_cast_column_expr_with_missing_format_options() -> Result<()> {
283+
let input_field = Field::new("a", DataType::Int32, true);
284+
let target_field = Field::new("a", DataType::Int64, true);
285+
286+
let mut cast_options = CastOptions {
287+
safe: true,
288+
format_options: DEFAULT_CAST_OPTIONS.format_options.clone(),
289+
};
290+
cast_options.format_options.clear();
291+
let expr: Arc<dyn PhysicalExpr> = Arc::new(CastColumnExpr::new(
292+
Arc::new(Column::new("a", 0)),
293+
Arc::new(input_field.clone()),
294+
Arc::new(target_field.clone()),
295+
Some(cast_options),
296+
)?);
297+
298+
let ctx = SessionContext::new();
299+
let codec = DefaultPhysicalExtensionCodec {};
300+
let proto = datafusion_proto::physical_plan::to_proto::serialize_physical_expr(
301+
&expr, &codec,
302+
)?;
303+
let input_schema = Schema::new(vec![input_field.clone()]);
304+
let round_trip = datafusion_proto::physical_plan::from_proto::parse_physical_expr(
305+
&proto,
306+
&ctx.task_ctx(),
307+
&input_schema,
308+
&codec,
309+
)?;
310+
311+
let cast_expr = round_trip
312+
.as_any()
313+
.downcast_ref::<CastColumnExpr>()
314+
.ok_or_else(|| internal_datafusion_err!("Expected CastColumnExpr"))?;
315+
316+
assert_eq!(
317+
cast_expr.cast_options().format_options,
318+
DEFAULT_CAST_OPTIONS.format_options
319+
);
320+
321+
Ok(())
322+
}
323+
267324
#[test]
268325
fn roundtrip_cast_column_expr_with_target_field_change() -> Result<()> {
269326
let mut input_metadata = HashMap::new();

0 commit comments

Comments
 (0)