Skip to content

Commit 4e2c446

Browse files
committed
feat: introduce OwnedFormatOptions and OwnedCastOptions for enhanced casting flexibility
1 parent 7815732 commit 4e2c446

File tree

7 files changed

+396
-55
lines changed

7 files changed

+396
-55
lines changed

datafusion/common/src/format.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,167 @@ pub const DEFAULT_CAST_OPTIONS: CastOptions<'static> = CastOptions {
3535
format_options: DEFAULT_FORMAT_OPTIONS,
3636
};
3737

38+
/// Owned version of Arrow's `FormatOptions` with all `String` values instead of `&str`.
39+
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
40+
pub struct OwnedFormatOptions {
41+
/// String representation of null values
42+
pub null: String,
43+
/// Date format string
44+
pub date_format: Option<String>,
45+
/// Datetime format string
46+
pub datetime_format: Option<String>,
47+
/// Timestamp format string
48+
pub timestamp_format: Option<String>,
49+
/// Timestamp with timezone format string
50+
pub timestamp_tz_format: Option<String>,
51+
/// Time format string
52+
pub time_format: Option<String>,
53+
/// Duration format
54+
pub duration_format: DurationFormat,
55+
/// Include type information in formatted output
56+
pub types_info: bool,
57+
}
58+
59+
impl OwnedFormatOptions {
60+
/// Create a new `OwnedFormatOptions` with default values.
61+
pub fn new() -> Self {
62+
Self::default()
63+
}
64+
65+
/// Set the null string.
66+
pub fn with_null(mut self, null: String) -> Self {
67+
self.null = null;
68+
self
69+
}
70+
71+
/// Set the date format.
72+
pub fn with_date_format(mut self, date_format: Option<String>) -> Self {
73+
self.date_format = date_format;
74+
self
75+
}
76+
77+
/// Set the datetime format.
78+
pub fn with_datetime_format(mut self, datetime_format: Option<String>) -> Self {
79+
self.datetime_format = datetime_format;
80+
self
81+
}
82+
83+
/// Set the timestamp format.
84+
pub fn with_timestamp_format(mut self, timestamp_format: Option<String>) -> Self {
85+
self.timestamp_format = timestamp_format;
86+
self
87+
}
88+
89+
/// Set the timestamp with timezone format.
90+
pub fn with_timestamp_tz_format(
91+
mut self,
92+
timestamp_tz_format: Option<String>,
93+
) -> Self {
94+
self.timestamp_tz_format = timestamp_tz_format;
95+
self
96+
}
97+
98+
/// Set the time format.
99+
pub fn with_time_format(mut self, time_format: Option<String>) -> Self {
100+
self.time_format = time_format;
101+
self
102+
}
103+
104+
/// Set the duration format.
105+
pub fn with_duration_format(mut self, duration_format: DurationFormat) -> Self {
106+
self.duration_format = duration_format;
107+
self
108+
}
109+
110+
/// Set whether to include type information in formatted output.
111+
pub fn with_types_info(mut self, types_info: bool) -> Self {
112+
self.types_info = types_info;
113+
self
114+
}
115+
116+
/// Convert to Arrow's `FormatOptions<'a>` with borrowed references.
117+
pub fn as_arrow_options<'a>(&'a self) -> FormatOptions<'a> {
118+
FormatOptions::new()
119+
.with_null(self.null.as_str())
120+
.with_date_format(self.date_format.as_deref())
121+
.with_datetime_format(self.datetime_format.as_deref())
122+
.with_timestamp_format(self.timestamp_format.as_deref())
123+
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
124+
.with_time_format(self.time_format.as_deref())
125+
.with_duration_format(self.duration_format)
126+
.with_display_error(false)
127+
.with_types_info(self.types_info)
128+
}
129+
}
130+
131+
impl Default for OwnedFormatOptions {
132+
fn default() -> Self {
133+
Self {
134+
null: "NULL".to_string(),
135+
date_format: None,
136+
datetime_format: None,
137+
timestamp_format: None,
138+
timestamp_tz_format: None,
139+
time_format: None,
140+
duration_format: DurationFormat::Pretty,
141+
types_info: false,
142+
}
143+
}
144+
}
145+
146+
/// Owned version of Arrow's `CastOptions` with `OwnedFormatOptions`.
147+
#[derive(Debug, Clone, Default, Eq, PartialEq, Hash)]
148+
pub struct OwnedCastOptions {
149+
/// Whether to use safe casting (return errors instead of overflowing)
150+
pub safe: bool,
151+
/// Format options for string output
152+
pub format_options: OwnedFormatOptions,
153+
}
154+
155+
impl OwnedCastOptions {
156+
/// Create a new `OwnedCastOptions` with default values.
157+
pub fn new(safe: bool) -> Self {
158+
Self {
159+
safe,
160+
format_options: OwnedFormatOptions::default(),
161+
}
162+
}
163+
164+
/// Create a new `OwnedCastOptions` from Arrow `CastOptions`.
165+
pub fn from_arrow_options(options: &CastOptions<'_>) -> Self {
166+
Self {
167+
safe: options.safe,
168+
format_options: OwnedFormatOptions {
169+
null: options.format_options.null().to_string(),
170+
date_format: options.format_options.date_format().map(|s| s.to_string()),
171+
datetime_format: options
172+
.format_options
173+
.datetime_format()
174+
.map(|s| s.to_string()),
175+
timestamp_format: options
176+
.format_options
177+
.timestamp_format()
178+
.map(|s| s.to_string()),
179+
timestamp_tz_format: options
180+
.format_options
181+
.timestamp_tz_format()
182+
.map(|s| s.to_string()),
183+
time_format: options.format_options.time_format().map(|s| s.to_string()),
184+
duration_format: options.format_options.duration_format(),
185+
types_info: options.format_options.types_info(),
186+
},
187+
}
188+
}
189+
190+
/// Convert to Arrow's `CastOptions<'a>` with borrowed references.
191+
pub fn as_arrow_options<'a>(&'a self) -> CastOptions<'a> {
192+
CastOptions {
193+
safe: self.safe,
194+
format_options: self.format_options.as_arrow_options(),
195+
}
196+
}
197+
}
198+
38199
/// Output formats for controlling for Explain plans
39200
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40201
pub enum ExplainFormat {

datafusion/common/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,15 @@ pub use file_options::file_type::{
7979
DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
8080
DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION, GetExt,
8181
};
82+
pub use format::{OwnedCastOptions, OwnedFormatOptions};
8283
pub use functional_dependencies::{
8384
Constraint, Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
8485
aggregate_functional_dependencies, get_required_group_by_exprs_indices,
8586
get_target_functional_dependencies,
8687
};
8788
use hashbrown::DefaultHashBuilder;
8889
pub use join_type::{JoinConstraint, JoinSide, JoinType};
89-
pub use nested_struct::cast_column;
90+
pub use nested_struct::{cast_column, validate_field_compatibility};
9091
pub use null_equality::NullEquality;
9192
pub use param_value::ParamValues;
9293
pub use scalar::{ScalarType, ScalarValue};

datafusion/common/src/nested_struct.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ pub fn validate_struct_compatibility(
254254
Ok(())
255255
}
256256

257-
fn validate_field_compatibility(
257+
/// Validates that a source field can be cast to a target field, including
258+
/// nullability checks and nested struct compatibility.
259+
pub fn validate_field_compatibility(
258260
source_field: &Field,
259261
target_field: &Field,
260262
) -> Result<()> {

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,7 @@ mod tests {
880880

881881
// Table schema is Utf8 but file schema is StringView
882882
let table_schema =
883-
Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));
883+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
884884

885885
// Predicate should prune all row groups
886886
let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
@@ -922,7 +922,7 @@ mod tests {
922922
let batch = create_batch(vec![("c1", c1.clone())]);
923923

924924
let table_schema =
925-
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));
925+
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, true)]));
926926

927927
// Predicate should prune all row groups
928928
let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
@@ -969,7 +969,7 @@ mod tests {
969969
let table_schema = Arc::new(Schema::new(vec![Field::new(
970970
"c1",
971971
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
972-
false,
972+
true,
973973
)]));
974974
// One row should match, 2 pruned via page index, 1 pruned via filter pushdown
975975
let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(

datafusion/core/tests/parquet/expr_adapter.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() {
136136
write_parquet(batch, store.clone(), path).await;
137137

138138
let table_schema = Arc::new(Schema::new(vec![
139-
Field::new("c1", DataType::Int64, false),
139+
Field::new("c1", DataType::Int64, true),
140140
Field::new("c2", DataType::Utf8, true),
141141
]));
142142

@@ -234,9 +234,9 @@ async fn test_physical_expr_adapter_with_non_null_defaults() {
234234

235235
// Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't exist in file
236236
let table_schema = Arc::new(Schema::new(vec![
237-
Field::new("c1", DataType::Int64, false), // type differs from file (Int32 vs Int64)
238-
Field::new("c2", DataType::Utf8, true), // missing from file
239-
Field::new("c3", DataType::Int64, true), // missing from file
237+
Field::new("c1", DataType::Int64, true), // type differs from file (Int32 vs Int64)
238+
Field::new("c2", DataType::Utf8, true), // missing from file
239+
Field::new("c3", DataType::Int64, true), // missing from file
240240
]));
241241

242242
let mut cfg = SessionConfig::new()
@@ -343,7 +343,7 @@ async fn test_physical_expr_adapter_factory_reuse_across_tables() {
343343

344344
// Table schema has additional columns that don't exist in files
345345
let table_schema = Arc::new(Schema::new(vec![
346-
Field::new("c1", DataType::Int64, false),
346+
Field::new("c1", DataType::Int64, true),
347347
Field::new("c2", DataType::Utf8, true), // missing from files
348348
]));
349349

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -498,12 +498,13 @@ impl DefaultPhysicalExprAdapterRewriter {
498498
}
499499
}
500500

501-
let cast_expr = Arc::new(CastColumnExpr::new(
501+
let cast_expr = Arc::new(CastColumnExpr::new_with_schema(
502502
Arc::new(column),
503503
Arc::new(actual_physical_field.clone()),
504504
Arc::new(logical_field.clone()),
505505
None,
506-
));
506+
Arc::clone(&self.physical_file_schema),
507+
)?);
507508

508509
Ok(Transformed::yes(cast_expr))
509510
}
@@ -720,12 +721,15 @@ mod tests {
720721
println!("Rewritten expression: {result}");
721722

722723
let expected = expressions::BinaryExpr::new(
723-
Arc::new(CastColumnExpr::new(
724-
Arc::new(Column::new("a", 0)),
725-
Arc::new(Field::new("a", DataType::Int32, false)),
726-
Arc::new(Field::new("a", DataType::Int64, false)),
727-
None,
728-
)),
724+
Arc::new(
725+
CastColumnExpr::new(
726+
Arc::new(Column::new("a", 0)),
727+
Arc::new(Field::new("a", DataType::Int32, false)),
728+
Arc::new(Field::new("a", DataType::Int64, false)),
729+
None,
730+
)
731+
.unwrap(),
732+
),
729733
Operator::Plus,
730734
Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))),
731735
);
@@ -830,12 +834,15 @@ mod tests {
830834
false,
831835
));
832836

833-
let expected = Arc::new(CastColumnExpr::new(
834-
Arc::new(Column::new("data", 0)),
835-
physical_field,
836-
logical_field,
837-
None,
838-
)) as Arc<dyn PhysicalExpr>;
837+
let expected = Arc::new(
838+
CastColumnExpr::new(
839+
Arc::new(Column::new("data", 0)),
840+
physical_field,
841+
logical_field,
842+
None,
843+
)
844+
.unwrap(),
845+
) as Arc<dyn PhysicalExpr>;
839846

840847
assert_eq!(result.to_string(), expected.to_string());
841848
}

0 commit comments

Comments
 (0)